You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/13 03:56:36 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2974: [feature][connector][fake] Support mutil splits for fake source connector

hailin0 commented on code in PR #2974:
URL: https://github.com/apache/incubator-seatunnel/pull/2974#discussion_r994087873


##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java:
##########
@@ -19,24 +19,27 @@
 
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
 
 @Slf4j
-public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-
-    private final SingleSplitReaderContext context;
+public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
 
+    private final SourceReader.Context context;
+    private final Deque<FakeSourceSplit> splits = new LinkedList<>();

Review Comment:
   ```suggestion
       private final Queue<FakeSourceSplit> splits = new LinkedList<>();
   ```



##########
seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java:
##########
@@ -52,16 +55,46 @@ public void close() {
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
-        // Generate a random number of rows to emit.
-        List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
-        for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
-            output.collect(seaTunnelRow);
-        }
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            log.info("Closed the bounded fake source");
-            context.signalNoMoreElement();
+        synchronized (output.getCheckpointLock()) {
+            FakeSourceSplit split = splits.poll();
+            if (null != split) {
+                // Generate a random number of rows to emit.
+                List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+                for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
+                    output.collect(seaTunnelRow);
+                }
+            } else {
+                if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                    // signal to the source that we have reached the end of the data.
+                    log.info("Closed the bounded fake source");
+                    context.signalNoMoreElement();
+                }
+                if (!noMoreSplit) {
+                    log.info("wait split!");
+                }
+                Thread.sleep(1000L);
+            }
+
         }

Review Comment:
   move sleep to synchronized block outside?
   
   ```suggestion
               }
   
           }
           Thread.sleep(1000L);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org