You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/09/14 17:48:36 UTC

[flink] 02/05: [FLINK-18735][table] Support bounded datagen tables

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4d5e11a6c261f30c9fbf94ff9c4df96b57873372
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 28 10:34:38 2020 -0500

    [FLINK-18735][table] Support bounded datagen tables
---
 .../source/datagen/DataGeneratorSource.java        | 34 ++++++++++++++++++++--
 .../table/factories/DataGenTableSourceFactory.java |  9 +++++-
 .../factories/datagen/DataGenTableSource.java      | 11 ++++---
 3 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index 6760260..0352d9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -19,11 +19,14 @@
 package org.apache.flink.streaming.api.functions.source.datagen;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
+import javax.annotation.Nullable;
+
 /**
  * A data generator source that abstract data generator. It can be used to easy startup/test
  * for streaming job and performance testing.
@@ -37,6 +40,13 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	private final DataGenerator<T> generator;
 	private final long rowsPerSecond;
 
+	@Nullable
+	private Long numberOfRows;
+
+	private int outputSoFar;
+
+	private int toOutput;
+
 	transient volatile boolean isRunning;
 
 	/**
@@ -45,7 +55,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	 * @param generator data generator.
 	 */
 	public DataGeneratorSource(DataGenerator<T> generator) {
-		this(generator, Long.MAX_VALUE);
+		this(generator, Long.MAX_VALUE, null);
 	}
 
 	/**
@@ -53,10 +63,25 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	 *
 	 * @param generator data generator.
 	 * @param rowsPerSecond Control the emit rate.
+	 * @param numberOfRows Total number of rows to output.
 	 */
-	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond) {
+	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, Long numberOfRows) {
 		this.generator = generator;
 		this.rowsPerSecond = rowsPerSecond;
+		this.numberOfRows = numberOfRows;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		if (numberOfRows != null) {
+			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+			final int baseSize = (int) (numberOfRows / stepSize);
+			toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
+		}
 	}
 
 	@Override
@@ -77,8 +102,11 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 
 		while (isRunning) {
 			for (int i = 0; i < taskRowsPerSecond; i++) {
-				if (isRunning && generator.hasNext()) {
+				if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
 					synchronized (ctx.getCheckpointLock()) {
+						if (numberOfRows != null) {
+							outputSoFar++;
+						}
 						ctx.collect(this.generator.next());
 					}
 				} else {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index e10fd52..c3ad2b9 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -53,6 +53,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 			.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
 			.withDescription("Rows per second to control the emit rate.");
 
+	public static final ConfigOption<Long> NUMBER_OF_ROWS = key("number-of-rows")
+		.longType()
+		.noDefaultValue()
+		.withDescription("Total number of rows to emit. By default, the source is unbounded.");
+
 	public static final String FIELDS = "fields";
 	public static final String KIND = "kind";
 	public static final String START = "start";
@@ -78,6 +83,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 	public Set<ConfigOption<?>> optionalOptions() {
 		Set<ConfigOption<?>> options = new HashSet<>();
 		options.add(ROWS_PER_SECOND);
+		options.add(NUMBER_OF_ROWS);
 		return options;
 	}
 
@@ -108,10 +114,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		Set<String> consumedOptionKeys = new HashSet<>();
 		consumedOptionKeys.add(CONNECTOR.key());
 		consumedOptionKeys.add(ROWS_PER_SECOND.key());
+		consumedOptionKeys.add(NUMBER_OF_ROWS.key());
 		optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
 		FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(), consumedOptionKeys);
 
-		return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND));
+		return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
 	}
 
 	private DataGeneratorContainer createContainer(
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
index 634f275..935b465 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -45,28 +45,31 @@ public class DataGenTableSource implements ScanTableSource {
 	private final DataGenerator[] fieldGenerators;
 	private final TableSchema schema;
 	private final long rowsPerSecond;
+	private final Long numberOfRows;
 
-	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond) {
+	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond, Long numberOfRows) {
 		this.fieldGenerators = fieldGenerators;
 		this.schema = schema;
 		this.rowsPerSecond = rowsPerSecond;
+		this.numberOfRows = numberOfRows;
 	}
 
 	@Override
 	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-		return SourceFunctionProvider.of(createSource(), false);
+		boolean isBounded = numberOfRows == null;
+		return SourceFunctionProvider.of(createSource(), isBounded);
 	}
 
 	@VisibleForTesting
 	public DataGeneratorSource<RowData> createSource() {
 		return new DataGeneratorSource<>(
 			new RowGenerator(fieldGenerators, schema.getFieldNames()),
-			rowsPerSecond);
+			rowsPerSecond, numberOfRows);
 	}
 
 	@Override
 	public DynamicTableSource copy() {
-		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond);
+		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond, numberOfRows);
 	}
 
 	@Override