You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/10/15 09:59:13 UTC

[incubator-seatunnel] branch dev updated: [improve][connector][fake] supports setting the number of split rows and reading interval (#3098)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new efabe6af7 [improve][connector][fake] supports setting the number of split rows and reading interval (#3098)
efabe6af7 is described below

commit efabe6af7f189b2836e13251ab0ea7583dc4c287
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Sat Oct 15 17:59:04 2022 +0800

    [improve][connector][fake] supports setting the number of split rows and reading interval (#3098)
---
 docs/en/connector-v2/source/FakeSource.md          | 30 ++++++++-----
 .../seatunnel/fake/config/FakeConfig.java          | 12 ++++++
 .../seatunnel/fake/source/FakeDataGenerator.java   |  4 +-
 .../seatunnel/fake/source/FakeSource.java          | 15 ++++---
 .../seatunnel/fake/source/FakeSourceReader.java    | 34 ++++++++++-----
 .../seatunnel/fake/source/FakeSourceSplit.java     |  2 +
 .../fake/source/FakeSourceSplitEnumerator.java     | 50 ++++++++++++++++------
 .../seatunnel/fake/state/FakeSourceState.java      |  9 ++++
 .../FakeDataGeneratorTest.java                     |  2 +-
 .../resources/assertion/fakesource_to_assert.conf  |  5 ++-
 10 files changed, 118 insertions(+), 45 deletions(-)

diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md
index 47f604090..c3cd1912c 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -18,15 +18,17 @@ just for some test cases such as type conversion or connector new feature testin
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| schema         | config | yes      | -             |
-| row.num        | int    | no       | 5             |
-| map.size       | int    | no       | 5             |
-| array.size     | int    | no       | 5             |
-| bytes.length   | int    | no       | 5             |
-| string.length  | int    | no       | 5             |
-| common-options |        | no       | -             |
+| name                | type   | required | default value |
+|---------------------|--------|----------|---------------|
+| schema              | config | yes      | -             |
+| row.num             | int    | no       | 5             |
+| split.num           | int    | no       | 1             |
+| split.read-interval | long   | no       | 1             |
+| map.size            | int    | no       | 5             |
+| array.size          | int    | no       | 5             |
+| bytes.length        | int    | no       | 5             |
+| string.length       | int    | no       | 5             |
+| common-options      |        | no       | -             |
 
 ### schema [config]
 
@@ -81,7 +83,15 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 
 ### row.num
 
-Total num of data that connector generated
+The total number of data generated per degree of parallelism
+
+### split.num
+
+the number of splits generated by the enumerator for each degree of parallelism
+
+### split.read-interval
+
+The interval(mills) between two split reads in a reader
 
 ### map.size
 
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 7a79c9a30..eb04a92ef 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -28,6 +28,8 @@ import java.io.Serializable;
 @Getter
 public class FakeConfig implements Serializable {
     public static final String ROW_NUM = "row.num";
+    public static final String SPLIT_NUM = "split.num";
+    public static final String SPLIT_READ_INTERVAL = "split.read-interval";
     public static final String MAP_SIZE = "map.size";
     public static final String ARRAY_SIZE = "array.size";
     public static final String BYTES_LENGTH = "bytes.length";
@@ -40,6 +42,10 @@ public class FakeConfig implements Serializable {
     @Builder.Default
     private int rowNum = DEFAULT_ROW_NUM;
     @Builder.Default
+    private int splitNum = 1;
+    @Builder.Default
+    private int splitReadInterval = 1;
+    @Builder.Default
     private int mapSize = DEFAULT_MAP_SIZE;
     @Builder.Default
     private int arraySize = DEFAULT_ARRAY_SIZE;
@@ -53,6 +59,12 @@ public class FakeConfig implements Serializable {
         if (config.hasPath(ROW_NUM)) {
             builder.rowNum(config.getInt(ROW_NUM));
         }
+        if (config.hasPath(SPLIT_NUM)) {
+            builder.splitNum(config.getInt(SPLIT_NUM));
+        }
+        if (config.hasPath(SPLIT_READ_INTERVAL)) {
+            builder.splitReadInterval(config.getInt(SPLIT_READ_INTERVAL));
+        }
         if (config.hasPath(MAP_SIZE)) {
             builder.mapSize(config.getInt(MAP_SIZE));
         }
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index c2a7251f0..085bd158a 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -58,9 +58,9 @@ public class FakeDataGenerator {
         return new SeaTunnelRow(randomRow.toArray());
     }
 
-    public List<SeaTunnelRow> generateFakedRows() {
+    public List<SeaTunnelRow> generateFakedRows(int rowNum) {
         ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
-        for (int i = 0; i < fakeConfig.getRowNum(); i++) {
+        for (int i = 0; i < rowNum; i++) {
             seaTunnelRows.add(randomRow());
         }
         return seaTunnelRows;
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index e715a42e1..16275840e 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -27,15 +27,16 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
-import java.io.Serializable;
+import java.util.Collections;
 
 @AutoService(SeaTunnelSource.class)
-public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeSourceState> {
 
     private JobContext jobContext;
     private SeaTunnelSchema schema;
@@ -52,18 +53,18 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
     }
 
     @Override
-    public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
-        return new FakeSourceSplitEnumerator(enumeratorContext);
+    public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
+        return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, Collections.emptySet());
     }
 
     @Override
-    public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
-        return new FakeSourceSplitEnumerator(enumeratorContext);
+    public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeSourceState checkpointState) throws Exception {
+        return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, checkpointState.getAssignedSplits());
     }
 
     @Override
     public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
-        return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
+        return new FakeSourceReader(readerContext, schema, fakeConfig);
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 2d0aa6512..e7b75eab7 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -21,9 +21,12 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.LinkedList;
@@ -34,12 +37,16 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
 
     private final SourceReader.Context context;
     private final Deque<FakeSourceSplit> splits = new LinkedList<>();
+
+    private final FakeConfig config;
     private final FakeDataGenerator fakeDataGenerator;
-    boolean noMoreSplit;
+    private volatile boolean noMoreSplit;
+    private volatile long latestTimestamp = 0;
 
-    public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
+    public FakeSourceReader(SourceReader.Context context, SeaTunnelSchema schema, FakeConfig fakeConfig) {
         this.context = context;
-        this.fakeDataGenerator = fakeDataGenerator;
+        this.config = fakeConfig;
+        this.fakeDataGenerator = new FakeDataGenerator(schema, fakeConfig);
     }
 
     @Override
@@ -53,27 +60,32 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
     }
 
     @Override
-    @SuppressWarnings("magicnumber")
+    @SuppressWarnings("MagicNumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
+        long currentTimestamp = Instant.now().toEpochMilli();
+        if (currentTimestamp <= latestTimestamp + config.getSplitReadInterval()) {
+            return;
+        }
+        latestTimestamp = currentTimestamp;
         synchronized (output.getCheckpointLock()) {
             FakeSourceSplit split = splits.poll();
             if (null != split) {
                 // Generate a random number of rows to emit.
-                List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+                List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum());
                 for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
                     output.collect(seaTunnelRow);
                 }
+                log.info("{} rows of data have been generated in split({}). Generation time: {}", split.getRowNum(), split.splitId(), latestTimestamp);
             } else {
-                if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
-                    // signal to the source that we have reached the end of the data.
-                    log.info("Closed the bounded fake source");
-                    context.signalNoMoreElement();
-                }
                 if (!noMoreSplit) {
                     log.info("wait split!");
                 }
             }
-
+        }
+        if (splits.isEmpty() && noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            log.info("Closed the bounded fake source");
+            context.signalNoMoreElement();
         }
         Thread.sleep(1000L);
     }
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
index 35b72e4d9..1b8e51ba6 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -27,6 +27,8 @@ import lombok.Data;
 public class FakeSourceSplit implements SourceSplit {
     private int splitId;
 
+    private int rowNum;
+
     @Override
     public String splitId() {
         return String.valueOf(splitId);
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index 9536bc355..5346b838a 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -18,28 +18,40 @@
 package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
+public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> {
 
     private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
     private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
     private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
 
-    public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
+    private final FakeConfig fakeConfig;
+    /**
+     * Partitions that have been assigned to readers.
+     */
+    private final Set<FakeSourceSplit> assignedSplits;
+
+    public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
+                                     FakeConfig config,
+                                     Set<FakeSourceSplit> assignedSplits) {
         this.enumeratorContext = enumeratorContext;
         this.pendingSplits = new HashMap<>();
+        this.fakeConfig = config;
+        this.assignedSplits = new HashSet<>(assignedSplits);
     }
 
     @Override
@@ -60,12 +72,12 @@ public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSour
 
     @Override
     public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
-
+        addSplitChangeToPendingAssignments(splits);
     }
 
     @Override
     public int currentUnassignedSplitSize() {
-        return 0;
+        return pendingSplits.size();
     }
 
     @Override
@@ -79,8 +91,8 @@ public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSour
     }
 
     @Override
-    public Serializable snapshotState(long checkpointId) throws Exception {
-        return null;
+    public FakeSourceState snapshotState(long checkpointId) throws Exception {
+        return new FakeSourceState(assignedSplits);
     }
 
     @Override
@@ -89,19 +101,31 @@ public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSour
     }
 
     private void discoverySplits() {
-        List<FakeSourceSplit> allSplit = new ArrayList<>();
+        Set<FakeSourceSplit> allSplit = new HashSet<>();
         LOG.info("Starting to calculate splits.");
         int numReaders = enumeratorContext.currentParallelism();
+        int readerRowNum = fakeConfig.getRowNum();
+        int splitNum  = fakeConfig.getSplitNum();
+        int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum);
         for (int i = 0; i < numReaders; i++) {
-            allSplit.add(new FakeSourceSplit(i));
+            int index = i;
+            for (int num = 0; num < readerRowNum; index += numReaders, num += splitRowNum) {
+                allSplit.add(new FakeSourceSplit(index, Math.min(splitRowNum, readerRowNum - num)));
+            }
         }
-        for (FakeSourceSplit split : allSplit) {
-            int ownerReader = split.getSplitId() % numReaders;
+
+        assignedSplits.forEach(allSplit::remove);
+        addSplitChangeToPendingAssignments(allSplit);
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
+        LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
+    }
+
+    private void addSplitChangeToPendingAssignments(Collection<FakeSourceSplit> newSplits) {
+        for (FakeSourceSplit split : newSplits) {
+            int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism();
             pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
                 .add(split);
         }
-        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
-        LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
     }
 
     private void assignPendingSplits() {
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
index fbda5c63f..93a338e11 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
@@ -17,7 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.fake.state;
 
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 import java.io.Serializable;
+import java.util.Set;
 
+@Getter
+@AllArgsConstructor
 public class FakeSourceState implements Serializable {
+    private final Set<FakeSourceSplit> assignedSplits;
 }
diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index 14807d5c9..15cf1592e 100644
--- a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
+++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -47,7 +47,7 @@ public class FakeDataGeneratorTest {
         SeaTunnelRowType seaTunnelRowType = seaTunnelSchema.getSeaTunnelRowType();
         FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
         FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig);
-        List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+        List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
         Assertions.assertNotNull(seaTunnelRows);
         Assertions.assertEquals(seaTunnelRows.size(), 10);
         for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 2412a087f..a98c4f042 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -26,6 +26,9 @@ source {
   # This is a example source plugin **only for test and demonstrate the feature source plugin**
   FakeSource {
     result_table_name = "fake"
+    row.num = 100
+    split.row = 25
+    split.read-interval = 2000
     schema = {
       fields {
         name = "string"
@@ -53,7 +56,7 @@ sink {
         row_rules = [
           {
             rule_type = MAX_ROW
-            rule_value = 10
+            rule_value = 100
           },
           {
             rule_type = MIN_ROW