You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/10/14 01:55:38 UTC
[incubator-seatunnel] branch dev updated: [feature][connector][fake] Support mutil splits for fake source connector (#2974)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c28c44b7c [feature][connector][fake] Support mutil splits for fake source connector (#2974)
c28c44b7c is described below
commit c28c44b7c96b3dc18d0ebe77cb5985e0c906afe3
Author: liugddx <80...@qq.com>
AuthorDate: Fri Oct 14 09:55:33 2022 +0800
[feature][connector][fake] Support mutil splits for fake source connector (#2974)
* feature Support more than splits and parallelism for fake connector
close #2961
* test parallelism
* Not practical average
* fix a bug
* remove split rowNum
* [Feature][Connector-V2] Improve class attribute
* fine-tuning
* Remove useless blank line.
* support STREAMING
* [feature][connector][fake] Support mutil-splits for fake source
* [connector][fake] Make sure the bounded flow is correct
* Update seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
Co-authored-by: hailin0 <ha...@gmail.com>
Co-authored-by: TyrantLucifer <ty...@apache.org>
Co-authored-by: Zongwen Li <zo...@apache.org>
Co-authored-by: hailin0 <ha...@gmail.com>
---
.../fake/{source => config}/FakeConfig.java | 12 +-
.../seatunnel/fake/source/FakeDataGenerator.java | 1 +
.../seatunnel/fake/source/FakeSource.java | 24 ++--
.../seatunnel/fake/source/FakeSourceReader.java | 65 ++++++++---
.../seatunnel/fake/source/FakeSourceSplit.java | 34 ++++++
.../fake/source/FakeSourceSplitEnumerator.java | 122 +++++++++++++++++++++
.../seatunnel/fake/state/FakeSourceState.java | 23 ++++
.../FakeDataGeneratorTest.java | 1 +
.../main/resources/examples/fake_to_console.conf | 4 +-
9 files changed, 255 insertions(+), 31 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
similarity index 86%
rename from seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
rename to seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 8d4fe1d77..7a79c9a30 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeConfig.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.connectors.seatunnel.fake.config;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -27,11 +27,11 @@ import java.io.Serializable;
@Builder
@Getter
public class FakeConfig implements Serializable {
- private static final String ROW_NUM = "row.num";
- private static final String MAP_SIZE = "map.size";
- private static final String ARRAY_SIZE = "array.size";
- private static final String BYTES_LENGTH = "bytes.length";
- private static final String STRING_LENGTH = "string.length";
+ public static final String ROW_NUM = "row.num";
+ public static final String MAP_SIZE = "map.size";
+ public static final String ARRAY_SIZE = "array.size";
+ public static final String BYTES_LENGTH = "bytes.length";
+ public static final String STRING_LENGTH = "string.length";
private static final int DEFAULT_ROW_NUM = 5;
private static final int DEFAULT_MAP_SIZE = 5;
private static final int DEFAULT_ARRAY_SIZE = 5;
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index b73f32784..c2a7251f0 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 5e22ce5fc..e715a42e1 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -20,22 +20,23 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
@AutoService(SeaTunnelSource.class)
-public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
- private Config pluginConfig;
private JobContext jobContext;
private SeaTunnelSchema schema;
private FakeConfig fakeConfig;
@@ -51,7 +52,17 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
}
@Override
- public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
+ public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
+ return new FakeSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
+ return new FakeSourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
}
@@ -62,7 +73,6 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
@Override
public void prepare(Config pluginConfig) {
- this.pluginConfig = pluginConfig;
assert pluginConfig.hasPath(FakeDataGenerator.SCHEMA);
this.schema = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeDataGenerator.SCHEMA));
this.fakeConfig = FakeConfig.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 2dbec493b..2d0aa6512 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,24 +19,27 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
-import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import lombok.extern.slf4j.Slf4j;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.List;
@Slf4j
-public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-
- private final SingleSplitReaderContext context;
+public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
+ private final SourceReader.Context context;
+ private final Deque<FakeSourceSplit> splits = new LinkedList<>();
private final FakeDataGenerator fakeDataGenerator;
+ boolean noMoreSplit;
- public FakeSourceReader(SingleSplitReaderContext context, FakeDataGenerator randomData) {
+ public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
this.context = context;
- this.fakeDataGenerator = randomData;
+ this.fakeDataGenerator = fakeDataGenerator;
}
@Override
@@ -52,16 +55,46 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
- // Generate a random number of rows to emit.
- List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
- for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
- output.collect(seaTunnelRow);
- }
- if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- // signal to the source that we have reached the end of the data.
- log.info("Closed the bounded fake source");
- context.signalNoMoreElement();
+ synchronized (output.getCheckpointLock()) {
+ FakeSourceSplit split = splits.poll();
+ if (null != split) {
+ // Generate a random number of rows to emit.
+ List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+ for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
+ output.collect(seaTunnelRow);
+ }
+ } else {
+ if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded fake source");
+ context.signalNoMoreElement();
+ }
+ if (!noMoreSplit) {
+ log.info("wait split!");
+ }
+ }
+
}
Thread.sleep(1000L);
}
+
+ @Override
+ public List<FakeSourceSplit> snapshotState(long checkpointId) throws Exception {
+ return new ArrayList<>(splits);
+ }
+
+ @Override
+ public void addSplits(List<FakeSourceSplit> splits) {
+ this.splits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
new file mode 100644
index 000000000..35b72e4d9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class FakeSourceSplit implements SourceSplit {
+ private int splitId;
+
+ @Override
+ public String splitId() {
+ return String.valueOf(splitId);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
new file mode 100644
index 000000000..9536bc355
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
+ private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
+ private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
+
+ public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
+ this.enumeratorContext = enumeratorContext;
+ this.pendingSplits = new HashMap<>();
+ }
+
+ @Override
+ public void open() {
+ // No connection needs to be opened
+ }
+
+ @Override
+ public void run() throws Exception {
+ discoverySplits();
+ assignPendingSplits();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // nothing
+ }
+
+ @Override
+ public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
+
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return 0;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ // nothing
+ }
+
+ @Override
+ public Serializable snapshotState(long checkpointId) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+
+ }
+
+ private void discoverySplits() {
+ List<FakeSourceSplit> allSplit = new ArrayList<>();
+ LOG.info("Starting to calculate splits.");
+ int numReaders = enumeratorContext.currentParallelism();
+ for (int i = 0; i < numReaders; i++) {
+ allSplit.add(new FakeSourceSplit(i));
+ }
+ for (FakeSourceSplit split : allSplit) {
+ int ownerReader = split.getSplitId() % numReaders;
+ pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
+ .add(split);
+ }
+ LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
+ LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
+ }
+
+ private void assignPendingSplits() {
+ // Check if there's any pending splits for given readers
+ for (int pendingReader : enumeratorContext.registeredReaders()) {
+ // Remove pending assignment for the reader
+ final Set<FakeSourceSplit> pendingAssignmentForReader =
+ pendingSplits.remove(pendingReader);
+
+ if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
+ // Assign pending splits to reader
+ LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
+ enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
+ enumeratorContext.signalNoMoreSplits(pendingReader);
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
new file mode 100644
index 000000000..fbda5c63f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.state;
+
+import java.io.Serializable;
+
+public class FakeSourceState implements Serializable {
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index bc4d991b6..14807d5c9 100644
--- a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
+++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
index 79c062f91..212ec1cdf 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf
@@ -20,8 +20,8 @@
env {
# You can set flink configuration here
- execution.parallelism = 1
- #job.mode = "BATCH"
+ execution.parallelism = 2
+ job.mode = "STREAMING"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}