You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/14 01:55:38 UTC

[incubator-seatunnel] branch dev updated: [feature][connector][fake] Support mutil splits for fake source connector (#2974)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c28c44b7c [feature][connector][fake] Support mutil splits for fake source connector (#2974)
c28c44b7c is described below

commit c28c44b7c96b3dc18d0ebe77cb5985e0c906afe3
Author: liugddx <80...@qq.com>
AuthorDate: Fri Oct 14 09:55:33 2022 +0800

    [feature][connector][fake] Support mutil splits for fake source connector (#2974)
    
    * feature Support more than splits and parallelism for fake connector
    
    close #2961
    
    * test parallelism
    
    * Not practical average
    
    * fix a bug
    
    * remove split rowNum
    
    * [Feature][Connector-V2] Improve class attribute
    
    * fine-tuning
    
    * Remove useless blank line.
    
    * support STREAMING
    
    * [feature][connector][fake] Support mutil-splits for fake source
    
    * [connector][fake] Make sure the bounded flow is correct
    
    * Update seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
    
    Co-authored-by: hailin0 <ha...@gmail.com>
    
    Co-authored-by: TyrantLucifer <ty...@apache.org>
    Co-authored-by: Zongwen Li <zo...@apache.org>
    Co-authored-by: hailin0 <ha...@gmail.com>
---
 .../fake/{source => config}/FakeConfig.java        |  12 +-
 .../seatunnel/fake/source/FakeDataGenerator.java   |   1 +
 .../seatunnel/fake/source/FakeSource.java          |  24 ++--
 .../seatunnel/fake/source/FakeSourceReader.java    |  65 ++++++++---
 .../seatunnel/fake/source/FakeSourceSplit.java     |  34 ++++++
 .../fake/source/FakeSourceSplitEnumerator.java     | 122 +++++++++++++++++++++
 .../seatunnel/fake/state/FakeSourceState.java      |  23 ++++
 .../FakeDataGeneratorTest.java                     |   1 +
 .../main/resources/examples/fake_to_console.conf   |   4 +-
 9 files changed, 255 insertions(+), 31 deletions(-)

diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
similarity index 86%
rename from seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
rename to seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 8d4fe1d77..7a79c9a30 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.connectors.seatunnel.fake.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -27,11 +27,11 @@ import java.io.Serializable;
 @Builder
 @Getter
 public class FakeConfig implements Serializable {
-    private static final String ROW_NUM = "row.num";
-    private static final String MAP_SIZE = "map.size";
-    private static final String ARRAY_SIZE = "array.size";
-    private static final String BYTES_LENGTH = "bytes.length";
-    private static final String STRING_LENGTH = "string.length";
+    public static final String ROW_NUM = "row.num";
+    public static final String MAP_SIZE = "map.size";
+    public static final String ARRAY_SIZE = "array.size";
+    public static final String BYTES_LENGTH = "bytes.length";
+    public static final String STRING_LENGTH = "string.length";
     private static final int DEFAULT_ROW_NUM = 5;
     private static final int DEFAULT_MAP_SIZE = 5;
     private static final int DEFAULT_ARRAY_SIZE = 5;
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index b73f32784..c2a7251f0 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 5e22ce5fc..e715a42e1 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -20,22 +20,23 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(SeaTunnelSource.class)
-public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
 
-    private Config pluginConfig;
     private JobContext jobContext;
     private SeaTunnelSchema schema;
     private FakeConfig fakeConfig;
@@ -51,7 +52,17 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     }
 
     @Override
-    public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
+    public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
+        return new FakeSourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
+        return new FakeSourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
         return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
     }
 
@@ -62,7 +73,6 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
 
     @Override
     public void prepare(Config pluginConfig) {
-        this.pluginConfig = pluginConfig;
         assert pluginConfig.hasPath(FakeDataGenerator.SCHEMA);
         this.schema = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeDataGenerator.SCHEMA));
         this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 2dbec493b..2d0aa6512 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,24 +19,27 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.List;
 
 @Slf4j
-public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-
-    private final SingleSplitReaderContext context;
+public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
 
+    private final SourceReader.Context context;
+    private final Deque<FakeSourceSplit> splits = new LinkedList<>();
     private final FakeDataGenerator fakeDataGenerator;
+    boolean noMoreSplit;
 
-    public FakeSourceReader(SingleSplitReaderContext context, FakeDataGenerator randomData) {
+    public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
         this.context = context;
-        this.fakeDataGenerator = randomData;
+        this.fakeDataGenerator = fakeDataGenerator;
     }
 
     @Override
@@ -52,16 +55,46 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     @Override
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
-        // Generate a random number of rows to emit.
-        List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
-        for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
-            output.collect(seaTunnelRow);
-        }
-        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-            // signal to the source that we have reached the end of the data.
-            log.info("Closed the bounded fake source");
-            context.signalNoMoreElement();
+        synchronized (output.getCheckpointLock()) {
+            FakeSourceSplit split = splits.poll();
+            if (null != split) {
+                // Generate a random number of rows to emit.
+                List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+                for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
+                    output.collect(seaTunnelRow);
+                }
+            } else {
+                if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+                    // signal to the source that we have reached the end of the data.
+                    log.info("Closed the bounded fake source");
+                    context.signalNoMoreElement();
+                }
+                if (!noMoreSplit) {
+                    log.info("wait split!");
+                }
+            }
+
         }
         Thread.sleep(1000L);
     }
+
+    @Override
+    public List<FakeSourceSplit> snapshotState(long checkpointId) throws Exception {
+        return new ArrayList<>(splits);
+    }
+
+    @Override
+    public void addSplits(List<FakeSourceSplit> splits) {
+        this.splits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
new file mode 100644
index 000000000..35b72e4d9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class FakeSourceSplit implements SourceSplit {
+    private int splitId;
+
+    @Override
+    public String splitId() {
+        return String.valueOf(splitId);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
new file mode 100644
index 000000000..9536bc355
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
+    private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
+    private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
+
+    public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
+        this.enumeratorContext = enumeratorContext;
+        this.pendingSplits = new HashMap<>();
+    }
+
+    @Override
+    public void open() {
+        // No connection needs to be opened
+    }
+
+    @Override
+    public void run() throws Exception {
+        discoverySplits();
+        assignPendingSplits();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // nothing
+    }
+
+    @Override
+    public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
+
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return 0;
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        // nothing
+    }
+
+    @Override
+    public Serializable snapshotState(long checkpointId) throws Exception {
+        return null;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+    }
+
+    private void discoverySplits() {
+        List<FakeSourceSplit> allSplit = new ArrayList<>();
+        LOG.info("Starting to calculate splits.");
+        int numReaders = enumeratorContext.currentParallelism();
+        for (int i = 0; i < numReaders; i++) {
+            allSplit.add(new FakeSourceSplit(i));
+        }
+        for (FakeSourceSplit split : allSplit) {
+            int ownerReader = split.getSplitId() % numReaders;
+            pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
+        LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
+    }
+
+    private void assignPendingSplits() {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : enumeratorContext.registeredReaders()) {
+            // Remove pending assignment for the reader
+            final Set<FakeSourceSplit> pendingAssignmentForReader =
+                pendingSplits.remove(pendingReader);
+
+            if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
+                // Assign pending splits to reader
+                LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
+                enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
+                enumeratorContext.signalNoMoreSplits(pendingReader);
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
new file mode 100644
index 000000000..fbda5c63f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.state;
+
+import java.io.Serializable;
+
+public class FakeSourceState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index bc4d991b6..14807d5c9 100644
--- a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
+++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 79c062f91..212ec1cdf 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -20,8 +20,8 @@
 
 env {
   # You can set flink configuration here
-  execution.parallelism = 1
-  #job.mode = "BATCH"
+  execution.parallelism = 2
+  job.mode = "STREAMING"
   #execution.checkpoint.interval = 10000
   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
 }