You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/10/15 09:59:13 UTC
[incubator-seatunnel] branch dev updated: [improve][connector][fake] supports setting the number of split rows and reading interval (#3098)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new efabe6af7 [improve][connector][fake] supports setting the number of split rows and reading interval (#3098)
efabe6af7 is described below
commit efabe6af7f189b2836e13251ab0ea7583dc4c287
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Sat Oct 15 17:59:04 2022 +0800
[improve][connector][fake] supports setting the number of split rows and reading interval (#3098)
---
docs/en/connector-v2/source/FakeSource.md | 30 ++++++++-----
.../seatunnel/fake/config/FakeConfig.java | 12 ++++++
.../seatunnel/fake/source/FakeDataGenerator.java | 4 +-
.../seatunnel/fake/source/FakeSource.java | 15 ++++---
.../seatunnel/fake/source/FakeSourceReader.java | 34 ++++++++++-----
.../seatunnel/fake/source/FakeSourceSplit.java | 2 +
.../fake/source/FakeSourceSplitEnumerator.java | 50 ++++++++++++++++------
.../seatunnel/fake/state/FakeSourceState.java | 9 ++++
.../FakeDataGeneratorTest.java | 2 +-
.../resources/assertion/fakesource_to_assert.conf | 5 ++-
10 files changed, 118 insertions(+), 45 deletions(-)
diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md
index 47f604090..c3cd1912c 100644
--- a/docs/en/connector-v2/source/FakeSource.md
+++ b/docs/en/connector-v2/source/FakeSource.md
@@ -18,15 +18,17 @@ just for some test cases such as type conversion or connector new feature testin
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| schema | config | yes | - |
-| row.num | int | no | 5 |
-| map.size | int | no | 5 |
-| array.size | int | no | 5 |
-| bytes.length | int | no | 5 |
-| string.length | int | no | 5 |
-| common-options | | no | - |
+| name | type | required | default value |
+|---------------------|--------|----------|---------------|
+| schema | config | yes | - |
+| row.num | int | no | 5 |
+| split.num | int | no | 1 |
+| split.read-interval | long | no | 1 |
+| map.size | int | no | 5 |
+| array.size | int | no | 5 |
+| bytes.length | int | no | 5 |
+| string.length | int | no | 5 |
+| common-options | | no | - |
### schema [config]
@@ -81,7 +83,15 @@ Source plugin common parameters, please refer to [Source Common Options](common-
### row.num
-Total num of data that connector generated
+The total number of data generated per degree of parallelism
+
+### split.num
+
+the number of splits generated by the enumerator for each degree of parallelism
+
+### split.read-interval
+
+The interval(mills) between two split reads in a reader
### map.size
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
index 7a79c9a30..eb04a92ef 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java
@@ -28,6 +28,8 @@ import java.io.Serializable;
@Getter
public class FakeConfig implements Serializable {
public static final String ROW_NUM = "row.num";
+ public static final String SPLIT_NUM = "split.num";
+ public static final String SPLIT_READ_INTERVAL = "split.read-interval";
public static final String MAP_SIZE = "map.size";
public static final String ARRAY_SIZE = "array.size";
public static final String BYTES_LENGTH = "bytes.length";
@@ -40,6 +42,10 @@ public class FakeConfig implements Serializable {
@Builder.Default
private int rowNum = DEFAULT_ROW_NUM;
@Builder.Default
+ private int splitNum = 1;
+ @Builder.Default
+ private int splitReadInterval = 1;
+ @Builder.Default
private int mapSize = DEFAULT_MAP_SIZE;
@Builder.Default
private int arraySize = DEFAULT_ARRAY_SIZE;
@@ -53,6 +59,12 @@ public class FakeConfig implements Serializable {
if (config.hasPath(ROW_NUM)) {
builder.rowNum(config.getInt(ROW_NUM));
}
+ if (config.hasPath(SPLIT_NUM)) {
+ builder.splitNum(config.getInt(SPLIT_NUM));
+ }
+ if (config.hasPath(SPLIT_READ_INTERVAL)) {
+ builder.splitReadInterval(config.getInt(SPLIT_READ_INTERVAL));
+ }
if (config.hasPath(MAP_SIZE)) {
builder.mapSize(config.getInt(MAP_SIZE));
}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index c2a7251f0..085bd158a 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -58,9 +58,9 @@ public class FakeDataGenerator {
return new SeaTunnelRow(randomRow.toArray());
}
- public List<SeaTunnelRow> generateFakedRows() {
+ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
- for (int i = 0; i < fakeConfig.getRowNum(); i++) {
+ for (int i = 0; i < rowNum; i++) {
seaTunnelRows.add(randomRow());
}
return seaTunnelRows;
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index e715a42e1..16275840e 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -27,15 +27,16 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
-import java.io.Serializable;
+import java.util.Collections;
@AutoService(SeaTunnelSource.class)
-public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, Serializable> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeSourceState> {
private JobContext jobContext;
private SeaTunnelSchema schema;
@@ -52,18 +53,18 @@ public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit
}
@Override
- public SourceSplitEnumerator<FakeSourceSplit, Serializable> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
- return new FakeSourceSplitEnumerator(enumeratorContext);
+ public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> createEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) throws Exception {
+ return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, Collections.emptySet());
}
@Override
- public SourceSplitEnumerator<FakeSourceSplit, Serializable> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, Serializable checkpointState) throws Exception {
- return new FakeSourceSplitEnumerator(enumeratorContext);
+ public SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> restoreEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext, FakeSourceState checkpointState) throws Exception {
+ return new FakeSourceSplitEnumerator(enumeratorContext, fakeConfig, checkpointState.getAssignedSplits());
}
@Override
public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
- return new FakeSourceReader(readerContext, new FakeDataGenerator(schema, fakeConfig));
+ return new FakeSourceReader(readerContext, schema, fakeConfig);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 2d0aa6512..e7b75eab7 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -21,9 +21,12 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import lombok.extern.slf4j.Slf4j;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
@@ -34,12 +37,16 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
private final SourceReader.Context context;
private final Deque<FakeSourceSplit> splits = new LinkedList<>();
+
+ private final FakeConfig config;
private final FakeDataGenerator fakeDataGenerator;
- boolean noMoreSplit;
+ private volatile boolean noMoreSplit;
+ private volatile long latestTimestamp = 0;
- public FakeSourceReader(SourceReader.Context context, FakeDataGenerator fakeDataGenerator) {
+ public FakeSourceReader(SourceReader.Context context, SeaTunnelSchema schema, FakeConfig fakeConfig) {
this.context = context;
- this.fakeDataGenerator = fakeDataGenerator;
+ this.config = fakeConfig;
+ this.fakeDataGenerator = new FakeDataGenerator(schema, fakeConfig);
}
@Override
@@ -53,27 +60,32 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
}
@Override
- @SuppressWarnings("magicnumber")
+ @SuppressWarnings("MagicNumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
+ long currentTimestamp = Instant.now().toEpochMilli();
+ if (currentTimestamp <= latestTimestamp + config.getSplitReadInterval()) {
+ return;
+ }
+ latestTimestamp = currentTimestamp;
synchronized (output.getCheckpointLock()) {
FakeSourceSplit split = splits.poll();
if (null != split) {
// Generate a random number of rows to emit.
- List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+ List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(split.getRowNum());
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
output.collect(seaTunnelRow);
}
+ log.info("{} rows of data have been generated in split({}). Generation time: {}", split.getRowNum(), split.splitId(), latestTimestamp);
} else {
- if (noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
- // signal to the source that we have reached the end of the data.
- log.info("Closed the bounded fake source");
- context.signalNoMoreElement();
- }
if (!noMoreSplit) {
log.info("wait split!");
}
}
-
+ }
+ if (splits.isEmpty() && noMoreSplit && Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ log.info("Closed the bounded fake source");
+ context.signalNoMoreElement();
}
Thread.sleep(1000L);
}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
index 35b72e4d9..1b8e51ba6 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplit.java
@@ -27,6 +27,8 @@ import lombok.Data;
public class FakeSourceSplit implements SourceSplit {
private int splitId;
+ private int rowNum;
+
@Override
public String splitId() {
return String.valueOf(splitId);
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
index 9536bc355..5346b838a 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceSplitEnumerator.java
@@ -18,28 +18,40 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, Serializable> {
+public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSourceSplit, FakeSourceState> {
private static final Logger LOG = LoggerFactory.getLogger(FakeSourceSplitEnumerator.class);
private final SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext;
private final Map<Integer, Set<FakeSourceSplit>> pendingSplits;
- public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext) {
+ private final FakeConfig fakeConfig;
+ /**
+ * Partitions that have been assigned to readers.
+ */
+ private final Set<FakeSourceSplit> assignedSplits;
+
+ public FakeSourceSplitEnumerator(SourceSplitEnumerator.Context<FakeSourceSplit> enumeratorContext,
+ FakeConfig config,
+ Set<FakeSourceSplit> assignedSplits) {
this.enumeratorContext = enumeratorContext;
this.pendingSplits = new HashMap<>();
+ this.fakeConfig = config;
+ this.assignedSplits = new HashSet<>(assignedSplits);
}
@Override
@@ -60,12 +72,12 @@ public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSour
@Override
public void addSplitsBack(List<FakeSourceSplit> splits, int subtaskId) {
-
+ addSplitChangeToPendingAssignments(splits);
}
@Override
public int currentUnassignedSplitSize() {
- return 0;
+ return pendingSplits.size();
}
@Override
@@ -79,8 +91,8 @@ public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSour
}
@Override
- public Serializable snapshotState(long checkpointId) throws Exception {
- return null;
+ public FakeSourceState snapshotState(long checkpointId) throws Exception {
+ return new FakeSourceState(assignedSplits);
}
@Override
@@ -89,19 +101,31 @@ public class FakeSourceSplitEnumerator implements SourceSplitEnumerator<FakeSour
}
private void discoverySplits() {
- List<FakeSourceSplit> allSplit = new ArrayList<>();
+ Set<FakeSourceSplit> allSplit = new HashSet<>();
LOG.info("Starting to calculate splits.");
int numReaders = enumeratorContext.currentParallelism();
+ int readerRowNum = fakeConfig.getRowNum();
+ int splitNum = fakeConfig.getSplitNum();
+ int splitRowNum = (int) Math.ceil((double) readerRowNum / splitNum);
for (int i = 0; i < numReaders; i++) {
- allSplit.add(new FakeSourceSplit(i));
+ int index = i;
+ for (int num = 0; num < readerRowNum; index += numReaders, num += splitRowNum) {
+ allSplit.add(new FakeSourceSplit(index, Math.min(splitRowNum, readerRowNum - num)));
+ }
}
- for (FakeSourceSplit split : allSplit) {
- int ownerReader = split.getSplitId() % numReaders;
+
+ assignedSplits.forEach(allSplit::remove);
+ addSplitChangeToPendingAssignments(allSplit);
+ LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
+ LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
+ }
+
+ private void addSplitChangeToPendingAssignments(Collection<FakeSourceSplit> newSplits) {
+ for (FakeSourceSplit split : newSplits) {
+ int ownerReader = split.getSplitId() % enumeratorContext.currentParallelism();
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
- LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
- LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
}
private void assignPendingSplits() {
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
index fbda5c63f..93a338e11 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/state/FakeSourceState.java
@@ -17,7 +17,16 @@
package org.apache.seatunnel.connectors.seatunnel.fake.state;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceSplit;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
import java.io.Serializable;
+import java.util.Set;
+@Getter
+@AllArgsConstructor
public class FakeSourceState implements Serializable {
+ private final Set<FakeSourceSplit> assignedSplits;
}
diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
index 14807d5c9..15cf1592e 100644
--- a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
+++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java
@@ -47,7 +47,7 @@ public class FakeDataGeneratorTest {
SeaTunnelRowType seaTunnelRowType = seaTunnelSchema.getSeaTunnelRowType();
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig);
- List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows();
+ List<SeaTunnelRow> seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
Assertions.assertNotNull(seaTunnelRows);
Assertions.assertEquals(seaTunnelRows.size(), 10);
for (SeaTunnelRow seaTunnelRow : seaTunnelRows) {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
index 2412a087f..a98c4f042 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-assert-e2e/src/test/resources/assertion/fakesource_to_assert.conf
@@ -26,6 +26,9 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
+ row.num = 100
+ split.row = 25
+ split.read-interval = 2000
schema = {
fields {
name = "string"
@@ -53,7 +56,7 @@ sink {
row_rules = [
{
rule_type = MAX_ROW
- rule_value = 10
+ rule_value = 100
},
{
rule_type = MIN_ROW