You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/09/14 17:48:36 UTC
[flink] 02/05: [FLINK-18735][table] Support bounded datagen tables
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4d5e11a6c261f30c9fbf94ff9c4df96b57873372
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 28 10:34:38 2020 -0500
[FLINK-18735][table] Support bounded datagen tables
---
.../source/datagen/DataGeneratorSource.java | 34 ++++++++++++++++++++--
.../table/factories/DataGenTableSourceFactory.java | 9 +++++-
.../factories/datagen/DataGenTableSource.java | 11 ++++---
3 files changed, 46 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index 6760260..0352d9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -19,11 +19,14 @@
package org.apache.flink.streaming.api.functions.source.datagen;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import javax.annotation.Nullable;
+
/**
* A data generator source that abstract data generator. It can be used to easy startup/test
* for streaming job and performance testing.
@@ -37,6 +40,13 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
private final DataGenerator<T> generator;
private final long rowsPerSecond;
+ @Nullable
+ private Long numberOfRows;
+
+ private int outputSoFar;
+
+ private int toOutput;
+
transient volatile boolean isRunning;
/**
@@ -45,7 +55,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
* @param generator data generator.
*/
public DataGeneratorSource(DataGenerator<T> generator) {
- this(generator, Long.MAX_VALUE);
+ this(generator, Long.MAX_VALUE, null);
}
/**
@@ -53,10 +63,25 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
*
* @param generator data generator.
* @param rowsPerSecond Control the emit rate.
+ * @param numberOfRows Total number of rows to output.
*/
- public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond) {
+ public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, Long numberOfRows) {
this.generator = generator;
this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+
+ if (numberOfRows != null) {
+ final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+ final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+ final int baseSize = (int) (numberOfRows / stepSize);
+ toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
+ }
}
@Override
@@ -77,8 +102,11 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
while (isRunning) {
for (int i = 0; i < taskRowsPerSecond; i++) {
- if (isRunning && generator.hasNext()) {
+ if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
synchronized (ctx.getCheckpointLock()) {
+ if (numberOfRows != null) {
+ outputSoFar++;
+ }
ctx.collect(this.generator.next());
}
} else {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index e10fd52..c3ad2b9 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -53,6 +53,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
.withDescription("Rows per second to control the emit rate.");
+ public static final ConfigOption<Long> NUMBER_OF_ROWS = key("number-of-rows")
+ .longType()
+ .noDefaultValue()
+ .withDescription("Total number of rows to emit. By default, the source is unbounded.");
+
public static final String FIELDS = "fields";
public static final String KIND = "kind";
public static final String START = "start";
@@ -78,6 +83,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(ROWS_PER_SECOND);
+ options.add(NUMBER_OF_ROWS);
return options;
}
@@ -108,10 +114,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
Set<String> consumedOptionKeys = new HashSet<>();
consumedOptionKeys.add(CONNECTOR.key());
consumedOptionKeys.add(ROWS_PER_SECOND.key());
+ consumedOptionKeys.add(NUMBER_OF_ROWS.key());
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(), consumedOptionKeys);
- return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND));
+ return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
}
private DataGeneratorContainer createContainer(
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
index 634f275..935b465 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -45,28 +45,31 @@ public class DataGenTableSource implements ScanTableSource {
private final DataGenerator[] fieldGenerators;
private final TableSchema schema;
private final long rowsPerSecond;
+ private final Long numberOfRows;
- public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond) {
+ public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond, Long numberOfRows) {
this.fieldGenerators = fieldGenerators;
this.schema = schema;
this.rowsPerSecond = rowsPerSecond;
+ this.numberOfRows = numberOfRows;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
- return SourceFunctionProvider.of(createSource(), false);
+ boolean isBounded = numberOfRows == null;
+ return SourceFunctionProvider.of(createSource(), isBounded);
}
@VisibleForTesting
public DataGeneratorSource<RowData> createSource() {
return new DataGeneratorSource<>(
new RowGenerator(fieldGenerators, schema.getFieldNames()),
- rowsPerSecond);
+ rowsPerSecond, numberOfRows);
}
@Override
public DynamicTableSource copy() {
- return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond);
+ return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond, numberOfRows);
}
@Override