You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/09/14 17:48:34 UTC

[flink] branch master updated (021712a -> ef01fec)

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 021712a  [FLINK-19083] Remove deprecated DataStream#split from documentation
     new 37e128c  [hotfix][table] Refactor DataGen source to use LogicalTypeVisitor pattern
     new 4d5e11a  [FLINK-18735][table] Support bounded datagen tables
     new 8f7369e   [FLINK-18735][table] Add support for more types to DataGen source
     new aee918d   [FLINK-18735][table] Update documentation of DataGen source
     new ef01fec  [FLINK-18735][table] Add additional logging to DataGen source

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/connectors/datagen.md               | 202 +++++++++---
 .../source/datagen/DataGeneratorSource.java        |  43 ++-
 .../functions/source/datagen/RandomGenerator.java  |  46 +++
 .../source/datagen/SequenceGenerator.java          |  13 +
 .../table/factories/DataGenTableSourceFactory.java | 305 ++-----------------
 .../factories/datagen/DataGenTableSource.java      |  89 ++++++
 .../factories/datagen/DataGenVisitorBase.java      | 110 +++++++
 .../factories/datagen/DataGeneratorContainer.java  |  59 ++++
 .../factories/datagen/RandomGeneratorVisitor.java  | 337 +++++++++++++++++++++
 .../datagen/SequenceGeneratorVisitor.java          | 177 +++++++++++
 .../datagen/types/DataGeneratorMapper.java         |  60 ++++
 .../factories/datagen/types/RowDataGenerator.java  |  80 +++++
 .../factories/DataGenTableSourceFactoryTest.java   | 130 +++++---
 .../stream/table/DataGeneratorConnectorITCase.java |  72 +++++
 14 files changed, 1354 insertions(+), 369 deletions(-)
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGeneratorContainer.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
 create mode 100644 flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
 create mode 100644 flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java


[flink] 05/05: [FLINK-18735][table] Add additional logging to DataGen source

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef01fec2a632f65556e0b30faad7399120b62e95
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Aug 11 08:40:44 2020 -0500

    [FLINK-18735][table] Add additional logging to DataGen source
    
    This closes #13010
---
 docs/dev/table/connectors/datagen.md               | 173 +++++++++++++++++----
 .../source/datagen/DataGeneratorSource.java        |  23 ++-
 .../table/factories/DataGenTableSourceFactory.java |   5 +-
 .../factories/datagen/DataGenTableSource.java      |  71 ++-------
 .../factories/datagen/DataGenVisitorBase.java      |  16 +-
 .../factories/datagen/RandomGeneratorVisitor.java  |  87 ++++-------
 .../datagen/types/DataGeneratorMapper.java         |  60 +++++++
 .../factories/datagen/types/RowDataGenerator.java  |  80 ++++++++++
 .../stream/table/DataGeneratorConnectorITCase.java |  72 +++++++++
 9 files changed, 428 insertions(+), 159 deletions(-)

diff --git a/docs/dev/table/connectors/datagen.md b/docs/dev/table/connectors/datagen.md
index 99cce5b..27e7a20 100644
--- a/docs/dev/table/connectors/datagen.md
+++ b/docs/dev/table/connectors/datagen.md
@@ -43,42 +43,159 @@ For variable sized types, char/varchar/string/array/map/multiset, the length can
 Additionally, a total number of rows can be specified, resulting in a bounded table.
 
 There also exists a sequence generator, where users specify a sequence of start and end values.
-Complex types cannot be generated as a sequence.
 If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes.
 
 Time types are always the local machines current system time.
 
-<div class="codetabs" markdown="1">
-<div data-lang="SQL" markdown="1">
 {% highlight sql %}
-CREATE TABLE datagen (
- f_sequence INT,
- f_random INT,
- f_random_str STRING,
- ts TIMESTAMP(3)
- WATERMARK FOR ts AS ts
+CREATE TABLE Orders (
+    order_number BIGINT,
+    price        DECIMAL(32,2),
+    buyer        ROW<first_name STRING, last_name STRING>
+    order_time   TIMESTAMP(3)
 ) WITH (
- 'connector' = 'datagen',
-
- -- optional options --
-
- 'rows-per-second'='5',
- 
- -- make the table bounded
- 'number-of-rows'='10'
+  'connector' = 'datagen'
+)
+{% endhighlight %}
 
- 'fields.f_sequence.kind'='sequence',
- 'fields.f_sequence.start'='1',
- 'fields.f_sequence.end'='1000',
+Often, the data generator connector is used in conjuction with the ``LIKE`` clause to mock out physical tables. 
 
- 'fields.f_random.min'='1',
- 'fields.f_random.max'='1000',
+{% highlight sql %}
+CREATE TABLE Orders (
+    order_number BIGINT,
+    price        DECIMAL(32,2),
+    buyer        ROW<first_name STRING, last_name STRING>
+    order_time   TIMESTAMP(3)
+) WITH (...)
 
- 'fields.f_random_str.length'='10'
+-- create a bounded mock table
+CREATE TEMPORARY TABLE GenOrders 
+WITH ( 
+    'connector' = 'datagen',
+    'number-of-rows' = '10'
 )
+LIKE Orders (EXCLUDING ALL)
 {% endhighlight %}
-</div>
-</div>
+
+Types
+-----
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Type</th>
+            <th class="text-center" style="width: 25%">Supported Generators</th>
+            <th class="text-center" style="width: 50%">Notes</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>BOOLEAN</td>
+            <td>random</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>CHAR</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>VARCHAR</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>STRING</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DECIMAL</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>TINYINT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>  
+        <tr>
+            <td>SMALLINT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr> 
+        <tr>
+            <td>INT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr> 
+        <tr>
+            <td>BIGINT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>FLOAT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DOUBLE</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DATE</td>
+            <td>random</td>
+            <td>Always resolves to the current date of the local machine.</td>
+        </tr>
+        <tr>
+            <td>TIME</td>
+            <td>random</td>
+            <td>Always resolves to the current time of the local machine.</td>
+        </tr>
+        <tr>
+            <td>TIMESTAMP</td>
+            <td>random</td>
+            <td>Always resolves to the current timestamp of the local machine.</td>
+        </tr>    
+        <tr>
+            <td>TIMESTAMP WITH LOCAL TIMEZONE</td>
+            <td>random</td>
+            <td>Always resolves to the current timestamp of the local machine.</td>
+        </tr>   
+        <tr>
+            <td>INTERVAL YEAR TO MONTH</td>
+            <td>random</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>INTERVAL DAY TO MONTH</td>
+            <td>random</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>ROW</td>
+            <td>random</td>
+            <td>Generates a row with random subfields.</td>
+        </tr>
+        <tr>
+            <td>ARRAY</td>
+            <td>random</td>
+            <td>Generates an array with random entries.</td>
+        </tr>
+        <tr>
+            <td>MAP</td>
+            <td>random</td>
+            <td>Generates a map with random entries.</td>
+        </tr>
+        <tr>
+            <td>MULTISET</td>
+            <td>random</td>
+            <td>Generates a multiset with random entries.</td>
+        </tr>
+    </tbody>
+</table>
 
 Connector Options
 ----------------
@@ -127,21 +244,21 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">(Minimum value of type)</td>
       <td>(Type of field)</td>
-      <td>Minimum value of random generator, work for number types.</td>
+      <td>Minimum value of random generator, work for numeric types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.max</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">(Maximum value of type)</td>
       <td>(Type of field)</td>
-      <td>Maximum value of random generator, work for number types.</td>
+      <td>Maximum value of random generator, work for numeric types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.length</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
-      <td>Length for string generating of random generator, work for char/varchar/string.</td>
+      <td>Size or length of the collection for generating char/varchar/string/array/map/multiset types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.start</h5></td>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index 0352d9d..1de9b28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -37,15 +39,18 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
 	private final DataGenerator<T> generator;
+
 	private final long rowsPerSecond;
 
 	@Nullable
-	private Long numberOfRows;
+	private final Long numberOfRows;
 
-	private int outputSoFar;
+	private transient int outputSoFar;
 
-	private int toOutput;
+	private transient int toOutput;
 
 	transient volatile boolean isRunning;
 
@@ -65,7 +70,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	 * @param rowsPerSecond Control the emit rate.
 	 * @param numberOfRows Total number of rows to output.
 	 */
-	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, Long numberOfRows) {
+	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows) {
 		this.generator = generator;
 		this.rowsPerSecond = rowsPerSecond;
 		this.numberOfRows = numberOfRows;
@@ -104,9 +109,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 			for (int i = 0; i < taskRowsPerSecond; i++) {
 				if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
 					synchronized (ctx.getCheckpointLock()) {
-						if (numberOfRows != null) {
-							outputSoFar++;
-						}
+						outputSoFar++;
 						ctx.collect(this.generator.next());
 					}
 				} else {
@@ -124,6 +127,12 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	}
 
 	@Override
+	public void close() throws Exception {
+		super.close();
+		LOG.info("generated {} rows", outputSoFar);
+	}
+
+	@Override
 	public void cancel() {
 		isRunning = false;
 	}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index a2b9df3..dbc39ab 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -93,7 +93,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		context.getCatalogTable().getOptions().forEach(options::setString);
 
 		TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-		DataGenerator[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
+		DataGenerator<?>[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
 		Set<ConfigOption<?>> optionalOptions = new HashSet<>();
 
 		for (int i = 0; i < fieldGenerators.length; i++) {
@@ -118,7 +118,8 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
 		FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(), consumedOptionKeys);
 
-		return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
+		String name = context.getObjectIdentifier().toString();
+		return new DataGenTableSource(fieldGenerators, name, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
 	}
 
 	private DataGeneratorContainer createContainer(
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
index 935b465..e5a9b36 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -20,9 +20,6 @@ package org.apache.flink.table.factories.datagen;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
@@ -31,8 +28,8 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
 import org.apache.flink.table.sources.StreamTableSource;
 
 /**
@@ -42,13 +39,20 @@ import org.apache.flink.table.sources.StreamTableSource;
 @Internal
 public class DataGenTableSource implements ScanTableSource {
 
-	private final DataGenerator[] fieldGenerators;
+	private final DataGenerator<?>[] fieldGenerators;
+	private final String tableName;
 	private final TableSchema schema;
 	private final long rowsPerSecond;
 	private final Long numberOfRows;
 
-	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond, Long numberOfRows) {
+	public DataGenTableSource(
+		DataGenerator<?>[] fieldGenerators,
+		String tableName,
+		TableSchema schema,
+		long rowsPerSecond,
+		Long numberOfRows) {
 		this.fieldGenerators = fieldGenerators;
+		this.tableName = tableName;
 		this.schema = schema;
 		this.rowsPerSecond = rowsPerSecond;
 		this.numberOfRows = numberOfRows;
@@ -56,20 +60,20 @@ public class DataGenTableSource implements ScanTableSource {
 
 	@Override
 	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-		boolean isBounded = numberOfRows == null;
+		boolean isBounded = numberOfRows != null;
 		return SourceFunctionProvider.of(createSource(), isBounded);
 	}
 
 	@VisibleForTesting
 	public DataGeneratorSource<RowData> createSource() {
 		return new DataGeneratorSource<>(
-			new RowGenerator(fieldGenerators, schema.getFieldNames()),
+			new RowDataGenerator(fieldGenerators, schema.getFieldNames()),
 			rowsPerSecond, numberOfRows);
 	}
 
 	@Override
 	public DynamicTableSource copy() {
-		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond, numberOfRows);
+		return new DataGenTableSource(fieldGenerators, tableName, schema, rowsPerSecond, numberOfRows);
 	}
 
 	@Override
@@ -81,54 +85,5 @@ public class DataGenTableSource implements ScanTableSource {
 	public ChangelogMode getChangelogMode() {
 		return ChangelogMode.insertOnly();
 	}
-
-	private static class RowGenerator implements DataGenerator<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final DataGenerator[] fieldGenerators;
-		private final String[] fieldNames;
-
-		private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
-			this.fieldGenerators = fieldGenerators;
-			this.fieldNames = fieldNames;
-		}
-
-		@Override
-		public void open(
-			String name,
-			FunctionInitializationContext context,
-			RuntimeContext runtimeContext) throws Exception {
-			for (int i = 0; i < fieldGenerators.length; i++) {
-				fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
-			}
-		}
-
-		@Override
-		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			for (DataGenerator generator : fieldGenerators) {
-				generator.snapshotState(context);
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			for (DataGenerator generator : fieldGenerators) {
-				if (!generator.hasNext()) {
-					return false;
-				}
-			}
-			return true;
-		}
-
-		@Override
-		public RowData next() {
-			GenericRowData row = new GenericRowData(fieldNames.length);
-			for (int i = 0; i < fieldGenerators.length; i++) {
-				row.setField(i, fieldGenerators[i].next());
-			}
-			return row;
-		}
-	}
 }
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
index a34c91f..f68dcbc 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -32,13 +33,12 @@ import org.apache.flink.table.types.logical.ZonedTimestampType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import java.io.Serializable;
-import java.time.Instant;
 import java.time.LocalDate;
-import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.time.OffsetDateTime;
 import java.util.function.Supplier;
 
+import static java.time.temporal.ChronoField.MILLI_OF_DAY;
+
 /**
  * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
  */
@@ -55,27 +55,27 @@ public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataG
 
 	@Override
 	public DataGeneratorContainer visit(DateType dateType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> (int) LocalDate.now().toEpochDay()));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(TimeType timeType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY)));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(TimestampType timestampType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index 97fafe4..77c2b45 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -27,7 +27,12 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -45,12 +50,12 @@ import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
-import org.apache.flink.types.Row;
 
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -170,7 +175,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
 		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
 		return DataGeneratorContainer.of(
-			new BigDecimalRandomGenerator(
+			new DecimalDataRandomGenerator(
 				RandomGenerator.doubleGenerator(config.get(min), config.get(max)),
 				decimalType.getPrecision(), decimalType.getScale()));
 	}
@@ -206,8 +211,9 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 			.getElementType()
 			.accept(new RandomGeneratorVisitor(fieldName, config));
 
+		DataGenerator<Object[]> generator = RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption));
 		return DataGeneratorContainer.of(
-			RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption)),
+			new DataGeneratorMapper<>(generator, (GenericArrayData::new)),
 			container.getOptions().toArray(new ConfigOption<?>[0]));
 	}
 
@@ -222,10 +228,13 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 			.getElementType()
 			.accept(new RandomGeneratorVisitor(fieldName, config));
 
+		DataGenerator<Map<Object, Integer>> mapGenerator = RandomGenerator.mapGenerator(
+			container.getGenerator(),
+			RandomGenerator.intGenerator(0, 10),
+			config.get(lenOption));
+
 		return DataGeneratorContainer.of(
-			RandomGenerator.mapGenerator(container.getGenerator(),
-				RandomGenerator.intGenerator(0, 10),
-				config.get(lenOption)),
+			new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
 			container.getOptions().toArray(new ConfigOption<?>[0]));
 	}
 
@@ -249,11 +258,13 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		Set<ConfigOption<?>> options = keyContainer.getOptions();
 		options.addAll(valContainer.getOptions());
 
+		DataGenerator<Map<Object, Object>> mapGenerator = RandomGenerator.mapGenerator(
+			keyContainer.getGenerator(),
+			valContainer.getGenerator(),
+			config.get(lenOption));
+
 		return DataGeneratorContainer.of(
-			RandomGenerator.mapGenerator(
-				keyContainer.getGenerator(),
-				valContainer.getGenerator(),
-				config.get(lenOption)),
+			new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
 			options.toArray(new ConfigOption<?>[0]));
 	}
 
@@ -271,12 +282,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 			.flatMap(container -> container.getOptions().stream())
 			.toArray(ConfigOption[]::new);
 
-		List<DataGenerator> generators = fieldContainers
+		DataGenerator[] generators = fieldContainers
 			.stream()
 			.map(DataGeneratorContainer::getGenerator)
-			.collect(Collectors.toList());
+			.toArray(DataGenerator[]::new);
 
-		return DataGeneratorContainer.of(new RowGenerator(name, rowType.getFieldNames(), generators), options);
+		String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+		return DataGeneratorContainer.of(new RowDataGenerator(generators, fieldNames), options);
 	}
 
 	@Override
@@ -293,51 +306,12 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		};
 	}
 
-	private static class RowGenerator implements DataGenerator<Row> {
-
-		private final String rowName;
-
-		private final List<String> fieldNames;
-
-		private final List<DataGenerator> generators;
-
-		private RowGenerator(String rowName, List<String> fieldNames, List<DataGenerator> generators) {
-			this.rowName = rowName;
-			this.fieldNames = fieldNames;
-			this.generators = generators;
-		}
-
-		@Override
-		public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
-			for (int i = 0; i < generators.size(); i++) {
-				String fullName = rowName + "." + fieldNames.get(i);
-				generators.get(i).open(fullName, context, runtimeContext);
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			return true;
-		}
-
-		@Override
-		public Row next() {
-			Row row = new Row(generators.size());
-
-			for (int i = 0; i < generators.size(); i++) {
-				row.setField(i, generators.get(i).next());
-			}
-
-			return row;
-		}
-	}
-
-	private static class BigDecimalRandomGenerator implements DataGenerator<BigDecimal> {
+	private static class DecimalDataRandomGenerator implements DataGenerator<DecimalData> {
 		private final RandomGenerator<Double> generator;
 		private final int precision;
 		private final int scale;
 
-		public BigDecimalRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
+		public DecimalDataRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
 			this.generator = generator;
 			this.precision = precision;
 			this.scale = scale;
@@ -354,9 +328,10 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		}
 
 		@Override
-		public BigDecimal next() {
+		public DecimalData next() {
 			BigDecimal decimal = new BigDecimal(generator.next(), new MathContext(precision));
-			return decimal.setScale(scale, RoundingMode.DOWN);
+			decimal = decimal.setScale(scale, RoundingMode.DOWN);
+			return DecimalData.fromBigDecimal(decimal, precision, scale);
 		}
 	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
new file mode 100644
index 0000000..25532cf
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Utility for mapping the output of a {@link DataGenerator}.
+ */
+@Internal
+public class DataGeneratorMapper<A, B> implements DataGenerator<B> {
+
+	private final DataGenerator<A> generator;
+
+	private final SerializableFunction<A, B> mapper;
+
+	public DataGeneratorMapper(DataGenerator<A> generator, SerializableFunction<A, B> mapper) {
+		this.generator = generator;
+		this.mapper = mapper;
+	}
+
+	@Override
+	public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+		generator.open(name, context, runtimeContext);
+	}
+
+	@Override
+	public boolean hasNext() {
+		return generator.hasNext();
+	}
+
+	@Override
+	public B next() {
+		return mapper.apply(generator.next());
+	}
+
+	public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
new file mode 100644
index 0000000..26cbc24
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Data generator for Flink's internal {@link RowData} type.
+ */
+@Internal
+public class RowDataGenerator implements DataGenerator<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataGenerator<?>[] fieldGenerators;
+	private final String[] fieldNames;
+
+	public RowDataGenerator(DataGenerator<?>[] fieldGenerators, String[] fieldNames) {
+		this.fieldGenerators = fieldGenerators;
+		this.fieldNames = fieldNames;
+	}
+
+	@Override
+	public void open(
+		String name,
+		FunctionInitializationContext context,
+		RuntimeContext runtimeContext) throws Exception {
+		for (int i = 0; i < fieldGenerators.length; i++) {
+			fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		for (DataGenerator<?> generator : fieldGenerators) {
+			generator.snapshotState(context);
+		}
+	}
+
+	@Override
+	public boolean hasNext() {
+		for (DataGenerator<?> generator : fieldGenerators) {
+			if (!generator.hasNext()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public RowData next() {
+		GenericRowData row = new GenericRowData(fieldNames.length);
+		for (int i = 0; i < fieldGenerators.length; i++) {
+			row.setField(i, fieldGenerators[i].next());
+		}
+		return row;
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
new file mode 100644
index 0000000..25be021
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataGeneratorConnectorITCase extends BatchTestBase {
+
+	private static final String TABLE = "CREATE TABLE datagen_t (\n" +
+		"	f0 CHAR(1),\n" +
+		"	f1 VARCHAR(10),\n" +
+		"	f2 STRING,\n" +
+		"	f3 BOOLEAN,\n" +
+		"	f4 DECIMAL(32,2),\n" +
+		"	f5 TINYINT,\n" +
+		"	f6 SMALLINT,\n" +
+		"	f7 INT,\n" +
+		"	f8 BIGINT,\n" +
+		"	f9 FLOAT,\n" +
+		"	f10 DOUBLE,\n" +
+		"	f11 DATE,\n" +
+		"	f12 TIME,\n" +
+		"	f13 TIMESTAMP(3),\n" +
+		"	f14 TIMESTAMP WITH LOCAL TIME ZONE,\n" +
+		"	f15 INT ARRAY,\n" +
+		"	f16 MAP<STRING, DATE>,\n" +
+		"	f17 DECIMAL(32,2) MULTISET,\n" +
+		"	f18 ROW<a BIGINT, b TIME, c ROW<d TIMESTAMP>>\n" +
+		") WITH (" +
+		"	'connector' = 'datagen',\n" +
+		"	'number-of-rows' = '10'\n" +
+		")";
+
+	@Test
+	public void testTypes() throws Exception {
+		tEnv().executeSql(TABLE);
+
+		List<Row> results = new ArrayList<>();
+
+		try (CloseableIterator<Row> iter = tEnv().executeSql("select * from datagen_t").collect()) {
+			while (iter.hasNext()) {
+				results.add(iter.next());
+			}
+		}
+
+		Assert.assertEquals("Unexpected number of results",10, results.size());
+	}
+}


[flink] 01/05: [hotfix][table] Refactor DataGen source to use LogicalTypeVisitor pattern

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 37e128c637746c9c93e2199f85517a29844cfb63
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jul 27 11:27:42 2020 -0500

    [hotfix][table] Refactor DataGen source to use LogicalTypeVisitor pattern
---
 .../table/factories/DataGenTableSourceFactory.java | 293 +--------------------
 .../factories/datagen/DataGenTableSource.java      | 131 +++++++++
 .../factories/datagen/DataGeneratorContainer.java  |  59 +++++
 .../factories/datagen/RandomGeneratorVisitor.java  | 166 ++++++++++++
 .../datagen/SequenceGeneratorVisitor.java          | 175 ++++++++++++
 .../factories/DataGenTableSourceFactoryTest.java   |   2 +-
 6 files changed, 540 insertions(+), 286 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 4a6c3a8..e10fd52 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -19,33 +19,20 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions.OptionBuilder;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
-import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
-import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
-import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.factories.datagen.DataGenTableSource;
+import org.apache.flink.table.factories.datagen.DataGeneratorContainer;
+import org.apache.flink.table.factories.datagen.RandomGeneratorVisitor;
+import org.apache.flink.table.factories.datagen.SequenceGeneratorVisitor;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -60,7 +47,6 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 
 	public static final String IDENTIFIER = "datagen";
 	public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
-	public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
 
 	public static final ConfigOption<Long> ROWS_PER_SECOND = key("rows-per-second")
 			.longType()
@@ -111,10 +97,10 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 			ConfigOption<String> kind = key(FIELDS + "." + name + "." + KIND)
 					.stringType().defaultValue(RANDOM);
 			DataGeneratorContainer container = createContainer(name, type, options.get(kind), options);
-			fieldGenerators[i] = container.generator;
+			fieldGenerators[i] = container.getGenerator();
 
 			optionalOptions.add(kind);
-			optionalOptions.addAll(container.options);
+			optionalOptions.addAll(container.getOptions());
 		}
 
 		FactoryUtil.validateFactoryOptions(requiredOptions(), optionalOptions, options);
@@ -132,274 +118,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 			String name, DataType type, String kind, ReadableConfig options) {
 		switch (kind) {
 			case RANDOM:
-				return createRandomContainer(name, type, options);
+				return type.getLogicalType().accept(new RandomGeneratorVisitor(name, type, options));
 			case SEQUENCE:
-				return createSequenceContainer(name, type, options);
+				return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, type, options));
 			default:
 				throw new ValidationException("Unsupported generator kind: " + kind);
 		}
 	}
-
-	private DataGeneratorContainer createRandomContainer(String name, DataType type, ReadableConfig config) {
-		OptionBuilder minKey = key(FIELDS + "." + name + "." + MIN);
-		OptionBuilder maxKey = key(FIELDS + "." + name + "." + MAX);
-		switch (type.getLogicalType().getTypeRoot()) {
-			case BOOLEAN: {
-				return DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
-			}
-			case CHAR:
-			case VARCHAR: {
-				ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
-						.intType()
-						.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
-				return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
-			}
-			case TINYINT: {
-				ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
-				ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Byte.MAX_VALUE);
-				return DataGeneratorContainer.of(
-						RandomGenerator.byteGenerator(
-								config.get(min).byteValue(), config.get(max).byteValue()),
-						min, max);
-			}
-			case SMALLINT: {
-				ConfigOption<Integer> min = minKey.intType().defaultValue((int) Short.MIN_VALUE);
-				ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Short.MAX_VALUE);
-				return DataGeneratorContainer.of(
-						RandomGenerator.shortGenerator(
-								config.get(min).shortValue(),
-								config.get(max).shortValue()),
-						min, max);
-			}
-			case INTEGER: {
-				ConfigOption<Integer> min = minKey.intType().defaultValue(Integer.MIN_VALUE);
-				ConfigOption<Integer> max = maxKey.intType().defaultValue(Integer.MAX_VALUE);
-				return DataGeneratorContainer.of(
-						RandomGenerator.intGenerator(
-								config.get(min), config.get(max)),
-						min, max);
-			}
-			case BIGINT: {
-				ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
-				ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
-				return DataGeneratorContainer.of(
-						RandomGenerator.longGenerator(
-								config.get(min), config.get(max)),
-						min, max);
-			}
-			case FLOAT: {
-				ConfigOption<Float> min = minKey.floatType().defaultValue(Float.MIN_VALUE);
-				ConfigOption<Float> max = maxKey.floatType().defaultValue(Float.MAX_VALUE);
-				return DataGeneratorContainer.of(
-						RandomGenerator.floatGenerator(
-								config.get(min), config.get(max)),
-						min, max);
-			}
-			case DOUBLE: {
-				ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
-				ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
-				return DataGeneratorContainer.of(
-						RandomGenerator.doubleGenerator(
-								config.get(min), config.get(max)),
-						min, max);
-			}
-			default:
-				throw new ValidationException("Unsupported type: " + type);
-		}
-	}
-
-	private DataGeneratorContainer createSequenceContainer(String name, DataType type, ReadableConfig config) {
-		String startKeyStr = FIELDS + "." + name + "." + START;
-		String endKeyStr = FIELDS + "." + name + "." + END;
-		OptionBuilder startKey = key(startKeyStr);
-		OptionBuilder endKey = key(endKeyStr);
-
-		config.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
-				() -> new ValidationException(
-						"Could not find required property '" + startKeyStr + "' for sequence generator."));
-		config.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
-				() -> new ValidationException(
-						"Could not find required property '" + endKeyStr + "' for sequence generator."));
-
-		ConfigOption<Integer> intStart = startKey.intType().noDefaultValue();
-		ConfigOption<Integer> intEnd = endKey.intType().noDefaultValue();
-		ConfigOption<Long> longStart = startKey.longType().noDefaultValue();
-		ConfigOption<Long> longEnd = endKey.longType().noDefaultValue();
-		switch (type.getLogicalType().getTypeRoot()) {
-			case CHAR:
-			case VARCHAR:
-				return DataGeneratorContainer.of(
-						getSequenceStringGenerator(
-								config.get(longStart), config.get(longEnd)),
-						longStart, longEnd);
-			case TINYINT:
-				return DataGeneratorContainer.of(
-						SequenceGenerator.byteGenerator(
-								config.get(intStart).byteValue(),
-								config.get(intEnd).byteValue()),
-						intStart, intEnd);
-			case SMALLINT:
-				return DataGeneratorContainer.of(
-						SequenceGenerator.shortGenerator(
-								config.get(intStart).shortValue(),
-								config.get(intEnd).shortValue()),
-						intStart, intEnd);
-			case INTEGER:
-				return DataGeneratorContainer.of(
-						SequenceGenerator.intGenerator(
-								config.get(intStart), config.get(intEnd)),
-						intStart, intEnd);
-			case BIGINT:
-				return DataGeneratorContainer.of(
-						SequenceGenerator.longGenerator(
-								config.get(longStart), config.get(longEnd)),
-						longStart, longEnd);
-			case FLOAT:
-				return DataGeneratorContainer.of(
-						SequenceGenerator.floatGenerator(
-								config.get(intStart).shortValue(),
-								config.get(intEnd).shortValue()),
-						intStart, intEnd);
-			case DOUBLE:
-				return DataGeneratorContainer.of(
-						SequenceGenerator.doubleGenerator(
-								config.get(intStart), config.get(intEnd)),
-						intStart, intEnd);
-			default:
-				throw new ValidationException("Unsupported type: " + type);
-		}
-	}
-
-	private static SequenceGenerator<StringData> getSequenceStringGenerator(long start, long end) {
-		return new SequenceGenerator<StringData>(start, end) {
-			@Override
-			public StringData next() {
-				return StringData.fromString(valuesToEmit.poll().toString());
-			}
-		};
-	}
-
-	private static RandomGenerator<StringData> getRandomStringGenerator(int length) {
-		return new RandomGenerator<StringData>() {
-			@Override
-			public StringData next() {
-				return StringData.fromString(random.nextHexString(length));
-			}
-		};
-	}
-
-	// -------------------------------- Help Classes -------------------------------
-
-	private static class DataGeneratorContainer {
-
-		private DataGenerator generator;
-
-		/**
-		 * Generator config options, for validation.
-		 */
-		private Set<ConfigOption<?>> options;
-
-		private DataGeneratorContainer(DataGenerator generator, Set<ConfigOption<?>> options) {
-			this.generator = generator;
-			this.options = options;
-		}
-
-		private static DataGeneratorContainer of(
-				DataGenerator generator, ConfigOption<?>... options) {
-			return new DataGeneratorContainer(generator, new HashSet<>(Arrays.asList(options)));
-		}
-	}
-
-	/**
-	 * A {@link StreamTableSource} that emits each number from a given interval exactly once,
-	 * possibly in parallel. See {@link StatefulSequenceSource}.
-	 */
-	static class DataGenTableSource implements ScanTableSource {
-
-		private final DataGenerator[] fieldGenerators;
-		private final TableSchema schema;
-		private final long rowsPerSecond;
-
-		private DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond) {
-			this.fieldGenerators = fieldGenerators;
-			this.schema = schema;
-			this.rowsPerSecond = rowsPerSecond;
-		}
-
-		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-			return SourceFunctionProvider.of(createSource(), false);
-		}
-
-		@VisibleForTesting
-		DataGeneratorSource<RowData> createSource() {
-			return new DataGeneratorSource<>(
-					new RowGenerator(fieldGenerators, schema.getFieldNames()),
-					rowsPerSecond);
-		}
-
-		@Override
-		public DynamicTableSource copy() {
-			return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond);
-		}
-
-		@Override
-		public String asSummaryString() {
-			return "DataGenTableSource";
-		}
-
-		@Override
-		public ChangelogMode getChangelogMode() {
-			return ChangelogMode.insertOnly();
-		}
-	}
-
-	private static class RowGenerator implements DataGenerator<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final DataGenerator[] fieldGenerators;
-		private final String[] fieldNames;
-
-		private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
-			this.fieldGenerators = fieldGenerators;
-			this.fieldNames = fieldNames;
-		}
-
-		@Override
-		public void open(
-				String name,
-				FunctionInitializationContext context,
-				RuntimeContext runtimeContext) throws Exception {
-			for (int i = 0; i < fieldGenerators.length; i++) {
-				fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
-			}
-		}
-
-		@Override
-		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			for (DataGenerator generator : fieldGenerators) {
-				generator.snapshotState(context);
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			for (DataGenerator generator : fieldGenerators) {
-				if (!generator.hasNext()) {
-					return false;
-				}
-			}
-			return true;
-		}
-
-		@Override
-		public RowData next() {
-			GenericRowData row = new GenericRowData(fieldNames.length);
-			for (int i = 0; i < fieldGenerators.length; i++) {
-				row.setField(i, fieldGenerators[i].next());
-			}
-			return row;
-		}
-	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
new file mode 100644
index 0000000..634f275
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A {@link StreamTableSource} that emits each number from a given interval exactly once,
+ * possibly in parallel. See {@link StatefulSequenceSource}.
+ */
+@Internal
+public class DataGenTableSource implements ScanTableSource {
+
+	private final DataGenerator[] fieldGenerators;
+	private final TableSchema schema;
+	private final long rowsPerSecond;
+
+	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond) {
+		this.fieldGenerators = fieldGenerators;
+		this.schema = schema;
+		this.rowsPerSecond = rowsPerSecond;
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+		return SourceFunctionProvider.of(createSource(), false);
+	}
+
+	@VisibleForTesting
+	public DataGeneratorSource<RowData> createSource() {
+		return new DataGeneratorSource<>(
+			new RowGenerator(fieldGenerators, schema.getFieldNames()),
+			rowsPerSecond);
+	}
+
+	@Override
+	public DynamicTableSource copy() {
+		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond);
+	}
+
+	@Override
+	public String asSummaryString() {
+		return "DataGenTableSource";
+	}
+
+	@Override
+	public ChangelogMode getChangelogMode() {
+		return ChangelogMode.insertOnly();
+	}
+
+	private static class RowGenerator implements DataGenerator<RowData> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final DataGenerator[] fieldGenerators;
+		private final String[] fieldNames;
+
+		private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
+			this.fieldGenerators = fieldGenerators;
+			this.fieldNames = fieldNames;
+		}
+
+		@Override
+		public void open(
+			String name,
+			FunctionInitializationContext context,
+			RuntimeContext runtimeContext) throws Exception {
+			for (int i = 0; i < fieldGenerators.length; i++) {
+				fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
+			}
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) throws Exception {
+			for (DataGenerator generator : fieldGenerators) {
+				generator.snapshotState(context);
+			}
+		}
+
+		@Override
+		public boolean hasNext() {
+			for (DataGenerator generator : fieldGenerators) {
+				if (!generator.hasNext()) {
+					return false;
+				}
+			}
+			return true;
+		}
+
+		@Override
+		public RowData next() {
+			GenericRowData row = new GenericRowData(fieldNames.length);
+			for (int i = 0; i < fieldGenerators.length; i++) {
+				row.setField(i, fieldGenerators[i].next());
+			}
+			return row;
+		}
+	}
+}
+
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGeneratorContainer.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGeneratorContainer.java
new file mode 100644
index 0000000..24b2307
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGeneratorContainer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Container class for wrapping a {@link DataGenerator with its configuration options}.
+ */
+@Internal
+public class DataGeneratorContainer {
+
+	private final DataGenerator generator;
+
+	/**
+	 * Generator config options, for validation.
+	 */
+	private final Set<ConfigOption<?>> options;
+
+	private DataGeneratorContainer(DataGenerator generator, Set<ConfigOption<?>> options) {
+		this.generator = generator;
+		this.options = options;
+	}
+
+	public static DataGeneratorContainer of(
+		DataGenerator generator, ConfigOption<?>... options) {
+		return new DataGeneratorContainer(generator, new HashSet<>(Arrays.asList(options)));
+	}
+
+	public DataGenerator getGenerator() {
+		return generator;
+	}
+
+	public Set<ConfigOption<?>> getOptions() {
+		return options;
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
new file mode 100644
index 0000000..c203114
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.*;
+
+/**
+ * Creates a random {@link DataGeneratorContainer} for a particular logical type.
+ */
+@Internal
+public class RandomGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
+
+	private final String name;
+
+	private final DataType type;
+
+	private final ReadableConfig config;
+
+	private final ConfigOptions.OptionBuilder minKey;
+
+	private final ConfigOptions.OptionBuilder maxKey;
+
+	public RandomGeneratorVisitor(String name, DataType type, ReadableConfig config) {
+		this.name = name;
+		this.type = type;
+		this.config = config;
+
+		this.minKey = key(FIELDS + "." + name + "." + MIN);
+		this.maxKey = key(FIELDS + "." + name + "." + MAX);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BooleanType booleanType) {
+		return DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+	}
+
+	@Override
+	public DataGeneratorContainer visit(CharType booleanType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+		return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(VarCharType booleanType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+		return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TinyIntType booleanType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Byte.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.byteGenerator(
+				config.get(min).byteValue(), config.get(max).byteValue()),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(SmallIntType booleanType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue((int) Short.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Short.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.shortGenerator(
+				config.get(min).shortValue(),
+				config.get(max).shortValue()),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(IntType integerType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue(Integer.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue(Integer.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.intGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BigIntType bigIntType) {
+		ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
+		ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.longGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(FloatType floatType) {
+		ConfigOption<Float> min = minKey.floatType().defaultValue(Float.MIN_VALUE);
+		ConfigOption<Float> max = maxKey.floatType().defaultValue(Float.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.floatGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DoubleType doubleType) {
+		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
+		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.doubleGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + type);
+	}
+
+	private static RandomGenerator<StringData> getRandomStringGenerator(int length) {
+		return new RandomGenerator<StringData>() {
+			@Override
+			public StringData next() {
+				return StringData.fromString(random.nextHexString(length));
+			}
+		};
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
new file mode 100644
index 0000000..2fa00d7
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.*;
+
+/**
+ * Creates a sequential {@link DataGeneratorContainer} for a particular logical type.
+ */
+@Internal
+public class SequenceGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	private final DataType type;
+
+	private final ReadableConfig config;
+
+	private final String startKeyStr;
+
+	private final String endKeyStr;
+
+	private final ConfigOption<Integer> intStart;
+
+	private final ConfigOption<Integer> intEnd;
+
+	private final ConfigOption<Long> longStart;
+
+	private final ConfigOption<Long> longEnd;
+
+	public SequenceGeneratorVisitor(String name, DataType type, ReadableConfig config) {
+		this.type = type;
+		this.config = config;
+
+		this.startKeyStr = FIELDS + "." + name + "." + START;
+		this.endKeyStr = FIELDS + "." + name + "." + END;
+
+		ConfigOptions.OptionBuilder startKey = key(startKeyStr);
+		ConfigOptions.OptionBuilder endKey = key(endKeyStr);
+
+		config.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
+			() -> new ValidationException(
+				"Could not find required property '" + startKeyStr + "' for sequence generator."));
+		config.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
+			() -> new ValidationException(
+				"Could not find required property '" + endKeyStr + "' for sequence generator."));
+
+		this.intStart = startKey.intType().noDefaultValue();
+		this.intEnd = endKey.intType().noDefaultValue();
+		this.longStart = startKey.longType().noDefaultValue();
+		this.longEnd = endKey.longType().noDefaultValue();
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BooleanType booleanType) {
+		return DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+	}
+
+	@Override
+	public DataGeneratorContainer visit(CharType booleanType) {
+		return DataGeneratorContainer.of(
+			getSequenceStringGenerator(
+				config.get(longStart), config.get(longEnd)),
+			longStart, longEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(VarCharType booleanType) {
+		return DataGeneratorContainer.of(
+			getSequenceStringGenerator(
+				config.get(longStart), config.get(longEnd)),
+			longStart, longEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TinyIntType booleanType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.byteGenerator(
+				config.get(intStart).byteValue(),
+				config.get(intEnd).byteValue()),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(SmallIntType booleanType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.shortGenerator(
+				config.get(intStart).shortValue(),
+				config.get(intEnd).shortValue()),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(IntType integerType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.intGenerator(
+				config.get(intStart), config.get(intEnd)),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BigIntType bigIntType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.longGenerator(
+				config.get(longStart), config.get(longEnd)),
+			longStart, longEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(FloatType floatType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.floatGenerator(
+				config.get(intStart).shortValue(),
+				config.get(intEnd).shortValue()),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DoubleType doubleType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.doubleGenerator(
+				config.get(intStart), config.get(intEnd)),
+			intStart, intEnd);
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + type);
+	}
+
+	private static SequenceGenerator<StringData> getSequenceStringGenerator(long start, long end) {
+		return new SequenceGenerator<StringData>(start, end) {
+			@Override
+			public StringData next() {
+				return StringData.fromString(valuesToEmit.poll().toString());
+			}
+		};
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 23e96f5..c5a547e 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.factories.DataGenTableSourceFactory.DataGenTableSource;
+import org.apache.flink.table.factories.datagen.DataGenTableSource;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.Assert;


[flink] 02/05: [FLINK-18735][table] Support bounded datagen tables

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4d5e11a6c261f30c9fbf94ff9c4df96b57873372
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 28 10:34:38 2020 -0500

    [FLINK-18735][table] Support bounded datagen tables
---
 .../source/datagen/DataGeneratorSource.java        | 34 ++++++++++++++++++++--
 .../table/factories/DataGenTableSourceFactory.java |  9 +++++-
 .../factories/datagen/DataGenTableSource.java      | 11 ++++---
 3 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index 6760260..0352d9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -19,11 +19,14 @@
 package org.apache.flink.streaming.api.functions.source.datagen;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 
+import javax.annotation.Nullable;
+
 /**
  * A data generator source that abstract data generator. It can be used to easy startup/test
  * for streaming job and performance testing.
@@ -37,6 +40,13 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	private final DataGenerator<T> generator;
 	private final long rowsPerSecond;
 
+	@Nullable
+	private Long numberOfRows;
+
+	private int outputSoFar;
+
+	private int toOutput;
+
 	transient volatile boolean isRunning;
 
 	/**
@@ -45,7 +55,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	 * @param generator data generator.
 	 */
 	public DataGeneratorSource(DataGenerator<T> generator) {
-		this(generator, Long.MAX_VALUE);
+		this(generator, Long.MAX_VALUE, null);
 	}
 
 	/**
@@ -53,10 +63,25 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	 *
 	 * @param generator data generator.
 	 * @param rowsPerSecond Control the emit rate.
+	 * @param numberOfRows Total number of rows to output.
 	 */
-	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond) {
+	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, Long numberOfRows) {
 		this.generator = generator;
 		this.rowsPerSecond = rowsPerSecond;
+		this.numberOfRows = numberOfRows;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+
+		if (numberOfRows != null) {
+			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+			final int baseSize = (int) (numberOfRows / stepSize);
+			toOutput = (numberOfRows % stepSize > taskIdx) ? baseSize + 1 : baseSize;
+		}
 	}
 
 	@Override
@@ -77,8 +102,11 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 
 		while (isRunning) {
 			for (int i = 0; i < taskRowsPerSecond; i++) {
-				if (isRunning && generator.hasNext()) {
+				if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
 					synchronized (ctx.getCheckpointLock()) {
+						if (numberOfRows != null) {
+							outputSoFar++;
+						}
 						ctx.collect(this.generator.next());
 					}
 				} else {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index e10fd52..c3ad2b9 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -53,6 +53,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 			.defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
 			.withDescription("Rows per second to control the emit rate.");
 
+	public static final ConfigOption<Long> NUMBER_OF_ROWS = key("number-of-rows")
+		.longType()
+		.noDefaultValue()
+		.withDescription("Total number of rows to emit. By default, the source is unbounded.");
+
 	public static final String FIELDS = "fields";
 	public static final String KIND = "kind";
 	public static final String START = "start";
@@ -78,6 +83,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 	public Set<ConfigOption<?>> optionalOptions() {
 		Set<ConfigOption<?>> options = new HashSet<>();
 		options.add(ROWS_PER_SECOND);
+		options.add(NUMBER_OF_ROWS);
 		return options;
 	}
 
@@ -108,10 +114,11 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		Set<String> consumedOptionKeys = new HashSet<>();
 		consumedOptionKeys.add(CONNECTOR.key());
 		consumedOptionKeys.add(ROWS_PER_SECOND.key());
+		consumedOptionKeys.add(NUMBER_OF_ROWS.key());
 		optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
 		FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(), consumedOptionKeys);
 
-		return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND));
+		return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
 	}
 
 	private DataGeneratorContainer createContainer(
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
index 634f275..935b465 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -45,28 +45,31 @@ public class DataGenTableSource implements ScanTableSource {
 	private final DataGenerator[] fieldGenerators;
 	private final TableSchema schema;
 	private final long rowsPerSecond;
+	private final Long numberOfRows;
 
-	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond) {
+	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond, Long numberOfRows) {
 		this.fieldGenerators = fieldGenerators;
 		this.schema = schema;
 		this.rowsPerSecond = rowsPerSecond;
+		this.numberOfRows = numberOfRows;
 	}
 
 	@Override
 	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-		return SourceFunctionProvider.of(createSource(), false);
+		boolean isBounded = numberOfRows == null;
+		return SourceFunctionProvider.of(createSource(), isBounded);
 	}
 
 	@VisibleForTesting
 	public DataGeneratorSource<RowData> createSource() {
 		return new DataGeneratorSource<>(
 			new RowGenerator(fieldGenerators, schema.getFieldNames()),
-			rowsPerSecond);
+			rowsPerSecond, numberOfRows);
 	}
 
 	@Override
 	public DynamicTableSource copy() {
-		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond);
+		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond, numberOfRows);
 	}
 
 	@Override


[flink] 04/05: [FLINK-18735][table] Update documentation of DataGen source

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aee918d869e9346839300cb9f2115f5c31c3492b
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 28 12:05:42 2020 -0500

     [FLINK-18735][table] Update documentation of DataGen source
---
 docs/dev/table/connectors/datagen.md | 39 ++++++++++++++++++++++--------------
 1 file changed, 24 insertions(+), 15 deletions(-)

diff --git a/docs/dev/table/connectors/datagen.md b/docs/dev/table/connectors/datagen.md
index ad1e6af..99cce5b 100644
--- a/docs/dev/table/connectors/datagen.md
+++ b/docs/dev/table/connectors/datagen.md
@@ -29,25 +29,24 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-The DataGen connector allows for reading by data generation rules.
+The DataGen connector allows for creating tables based on in-memory data generation.
+This is useful when developing queries locally without access to external systems such as Kafka.
+Tables can include [Computed Column syntax]({% link dev/table/sql/create.md %}#create-table) which allows for flexible record generation.
 
-The DataGen connector can work with [Computed Column syntax]({% link dev/table/sql/create.md %}#create-table).
-This allows you to generate records flexibly.
+The DataGen connector is built-in, no additional dependencies are required.
 
-The DataGen connector is built-in.
+Usage
+-----
 
-<span class="label label-danger">Attention</span> Complex types are not supported: Array, Map, Row. Please construct these types by computed column.
+By default, a DataGen table will create an unbounded number of rows with a random value for each column.
+For variable sized types, char/varchar/string/array/map/multiset, the length can be specified.
+Additionally, a total number of rows can be specified, resulting in a bounded table.
 
-How to create a DataGen table
-----------------
-
-The boundedness of table: when the generation of field data in the table is completed, the reading
-is finished. So the boundedness of the table depends on the boundedness of fields.
-
-For each field, there are two ways to generate data:
+There also exists a sequence generator, where users specify a sequence of start and end values.
+Complex types cannot be generated as a sequence.
+If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes.
 
-- Random generator is the default generator, you can specify random max and min values. For char/varchar/string, the length can be specified. It is a unbounded generator.
-- Sequence generator, you can specify sequence start and end values. It is a bounded generator, when the sequence number reaches the end value, the reading ends.
+Time types are always the local machines current system time.
 
 <div class="codetabs" markdown="1">
 <div data-lang="SQL" markdown="1">
@@ -56,7 +55,7 @@ CREATE TABLE datagen (
  f_sequence INT,
  f_random INT,
  f_random_str STRING,
- ts AS localtimestamp,
+ ts TIMESTAMP(3)
  WATERMARK FOR ts AS ts
 ) WITH (
  'connector' = 'datagen',
@@ -64,6 +63,9 @@ CREATE TABLE datagen (
  -- optional options --
 
  'rows-per-second'='5',
+ 
+ -- make the table bounded
+ 'number-of-rows'='10'
 
  'fields.f_sequence.kind'='sequence',
  'fields.f_sequence.start'='1',
@@ -106,6 +108,13 @@ Connector Options
       <td>Long</td>
       <td>Rows per second to control the emit rate.</td>
     </tr>
+        <tr>
+          <td><h5>number-of-rows</h5></td>
+          <td>optional</td>
+          <td style="word-wrap: break-word;">(none)</td>
+          <td>Long</td>
+          <td>The total number of rows to emit. By default, the table is unbounded.</td>
+        </tr>
     <tr>
       <td><h5>fields.#.kind</h5></td>
       <td>optional</td>


[flink] 03/05: [FLINK-18735][table] Add support for more types to DataGen source

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f7369eec0f3d1779271e9fa35718a4da9bff8cb
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 28 11:51:27 2020 -0500

     [FLINK-18735][table] Add support for more types to DataGen source
---
 .../functions/source/datagen/RandomGenerator.java  |  46 +++++
 .../source/datagen/SequenceGenerator.java          |  13 ++
 .../table/factories/DataGenTableSourceFactory.java |   4 +-
 .../factories/datagen/DataGenVisitorBase.java      | 110 ++++++++++
 .../factories/datagen/RandomGeneratorVisitor.java  | 224 +++++++++++++++++++--
 .../datagen/SequenceGeneratorVisitor.java          |  24 ++-
 .../factories/DataGenTableSourceFactoryTest.java   | 128 ++++++++----
 7 files changed, 482 insertions(+), 67 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/RandomGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/RandomGenerator.java
index 6bc795f..4f48a9c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/RandomGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/RandomGenerator.java
@@ -24,6 +24,9 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 
 import org.apache.commons.math3.random.RandomDataGenerator;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Random generator.
  */
@@ -116,4 +119,47 @@ public abstract class RandomGenerator<T> implements DataGenerator<T> {
 			}
 		};
 	}
+
+	public static <T> RandomGenerator<T[]> arrayGenerator(DataGenerator<T> generator, int len) {
+		return new RandomGenerator<T[]>() {
+
+			@Override
+			public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+				generator.open(name, context, runtimeContext);
+			}
+
+			@Override
+			public T[] next() {
+				@SuppressWarnings("unchecked")
+				T[] array = (T[]) new Object[len];
+
+				for (int i = 0; i < len; i++) {
+					array[i] = generator.next();
+				}
+
+				return array;
+			}
+		};
+	}
+
+	public static <K, V> RandomGenerator<Map<K, V>> mapGenerator(DataGenerator<K> key, DataGenerator<V> value, int size) {
+		return new RandomGenerator<Map<K, V>>() {
+
+			@Override
+			public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+				key.open(name + ".key", context, runtimeContext);
+				value.open(name + ".value", context, runtimeContext);
+			}
+
+			@Override
+			public Map<K, V> next() {
+				Map<K, V> map = new HashMap<>(size);
+				for (int i = 0; i < size; i++) {
+					map.put(key.next(), value.next());
+				}
+
+				return map;
+			}
+		};
+	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java
index 855937e..50f4c13 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java
@@ -27,6 +27,9 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.util.Preconditions;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.util.ArrayDeque;
 import java.util.Deque;
 
@@ -166,6 +169,16 @@ public abstract class SequenceGenerator<T> implements DataGenerator<T> {
 		};
 	}
 
+	public static SequenceGenerator<BigDecimal> bigDecimalGenerator(int start, int end, int precision, int scale) {
+		return new SequenceGenerator<BigDecimal>(start, end) {
+			@Override
+			public BigDecimal next() {
+				BigDecimal decimal = new BigDecimal(valuesToEmit.poll().doubleValue(), new MathContext(precision));
+				return decimal.setScale(scale, RoundingMode.DOWN);
+			}
+		};
+	}
+
 	public static SequenceGenerator<String> stringGenerator(long start, long end) {
 		return new SequenceGenerator<String>(start, end) {
 			@Override
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index c3ad2b9..a2b9df3 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -125,9 +125,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 			String name, DataType type, String kind, ReadableConfig options) {
 		switch (kind) {
 			case RANDOM:
-				return type.getLogicalType().accept(new RandomGeneratorVisitor(name, type, options));
+				return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
 			case SEQUENCE:
-				return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, type, options));
+				return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
 			default:
 				throw new ValidationException("Unsupported generator kind: " + kind);
 		}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
new file mode 100644
index 0000000..a34c91f
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Supplier;
+
+/**
+ * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
+ */
+public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	protected final String name;
+
+	protected final ReadableConfig config;
+
+	protected DataGenVisitorBase(String name, ReadableConfig config) {
+		this.name = name;
+		this.config = config;
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DateType dateType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimeType timeType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimestampType timestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + logicalType);
+	}
+
+	private interface SerializableSupplier<T> extends Supplier<T>, Serializable { }
+
+	private abstract static class TimeGenerator<T> implements DataGenerator<T> {
+
+		public static <T> TimeGenerator<T> of(SerializableSupplier<T> supplier) {
+			return new TimeGenerator<T>() {
+				@Override
+				public T next() {
+					return supplier.get();
+				}
+			};
+		}
+
+		@Override
+		public void open(
+			String name,
+			FunctionInitializationContext context,
+			RuntimeContext runtimeContext) throws Exception { }
+
+		@Override
+		public boolean hasNext() {
+			return true;
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index c203114..97fafe4 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -19,50 +19,65 @@
 package org.apache.flink.table.factories.datagen;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.*;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
+
 
 /**
  * Creates a random {@link DataGeneratorContainer} for a particular logical type.
  */
 @Internal
-public class RandomGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+@SuppressWarnings("unchecked")
+public class RandomGeneratorVisitor extends DataGenVisitorBase {
 
 	public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
 
-	private final String name;
-
-	private final DataType type;
-
-	private final ReadableConfig config;
+	private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
 
 	private final ConfigOptions.OptionBuilder minKey;
 
 	private final ConfigOptions.OptionBuilder maxKey;
 
-	public RandomGeneratorVisitor(String name, DataType type, ReadableConfig config) {
-		this.name = name;
-		this.type = type;
-		this.config = config;
+	public RandomGeneratorVisitor(String name, ReadableConfig config) {
+		super(name, config);
 
 		this.minKey = key(FIELDS + "." + name + "." + MIN);
 		this.maxKey = key(FIELDS + "." + name + "." + MAX);
@@ -151,8 +166,122 @@ public class RandomGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGenera
 	}
 
 	@Override
+	public DataGeneratorContainer visit(DecimalType decimalType) {
+		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
+		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			new BigDecimalRandomGenerator(
+				RandomGenerator.doubleGenerator(config.get(min), config.get(max)),
+				decimalType.getPrecision(), decimalType.getScale()));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(YearMonthIntervalType yearMonthIntervalType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue(0);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue(120000); // Period max
+		return DataGeneratorContainer.of(
+			RandomGenerator.intGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
+		ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
+		ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.longGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ArrayType arrayType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String fieldName = name + "." + "element";
+		DataGeneratorContainer container = arrayType
+			.getElementType()
+			.accept(new RandomGeneratorVisitor(fieldName, config));
+
+		return DataGeneratorContainer.of(
+			RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption)),
+			container.getOptions().toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(MultisetType multisetType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String fieldName = name + "." + "element";
+		DataGeneratorContainer container = multisetType
+			.getElementType()
+			.accept(new RandomGeneratorVisitor(fieldName, config));
+
+		return DataGeneratorContainer.of(
+			RandomGenerator.mapGenerator(container.getGenerator(),
+				RandomGenerator.intGenerator(0, 10),
+				config.get(lenOption)),
+			container.getOptions().toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(MapType mapType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String keyName = name + "." + "key";
+		String valName = name + "." + "value";
+
+		DataGeneratorContainer keyContainer = mapType
+			.getKeyType()
+			.accept(new RandomGeneratorVisitor(keyName, config));
+
+		DataGeneratorContainer valContainer = mapType
+			.getValueType()
+			.accept(new RandomGeneratorVisitor(valName, config));
+
+		Set<ConfigOption<?>> options = keyContainer.getOptions();
+		options.addAll(valContainer.getOptions());
+
+		return DataGeneratorContainer.of(
+			RandomGenerator.mapGenerator(
+				keyContainer.getGenerator(),
+				valContainer.getGenerator(),
+				config.get(lenOption)),
+			options.toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(RowType rowType) {
+		List<DataGeneratorContainer> fieldContainers = rowType.getFields()
+			.stream()
+			.map(field -> {
+				String fieldName = name + "." + field.getName();
+				return field.getType().accept(new RandomGeneratorVisitor(fieldName, config));
+			}).collect(Collectors.toList());
+
+		ConfigOption<?>[] options = fieldContainers
+			.stream()
+			.flatMap(container -> container.getOptions().stream())
+			.toArray(ConfigOption[]::new);
+
+		List<DataGenerator> generators = fieldContainers
+			.stream()
+			.map(DataGeneratorContainer::getGenerator)
+			.collect(Collectors.toList());
+
+		return DataGeneratorContainer.of(new RowGenerator(name, rowType.getFieldNames(), generators), options);
+	}
+
+	@Override
 	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
-		throw new ValidationException("Unsupported type: " + type);
+		throw new ValidationException("Unsupported type: " + logicalType);
 	}
 
 	private static RandomGenerator<StringData> getRandomStringGenerator(int length) {
@@ -163,4 +292,71 @@ public class RandomGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGenera
 			}
 		};
 	}
+
+	private static class RowGenerator implements DataGenerator<Row> {
+
+		private final String rowName;
+
+		private final List<String> fieldNames;
+
+		private final List<DataGenerator> generators;
+
+		private RowGenerator(String rowName, List<String> fieldNames, List<DataGenerator> generators) {
+			this.rowName = rowName;
+			this.fieldNames = fieldNames;
+			this.generators = generators;
+		}
+
+		@Override
+		public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+			for (int i = 0; i < generators.size(); i++) {
+				String fullName = rowName + "." + fieldNames.get(i);
+				generators.get(i).open(fullName, context, runtimeContext);
+			}
+		}
+
+		@Override
+		public boolean hasNext() {
+			return true;
+		}
+
+		@Override
+		public Row next() {
+			Row row = new Row(generators.size());
+
+			for (int i = 0; i < generators.size(); i++) {
+				row.setField(i, generators.get(i).next());
+			}
+
+			return row;
+		}
+	}
+
+	private static class BigDecimalRandomGenerator implements DataGenerator<BigDecimal> {
+		private final RandomGenerator<Double> generator;
+		private final int precision;
+		private final int scale;
+
+		public BigDecimalRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
+			this.generator = generator;
+			this.precision = precision;
+			this.scale = scale;
+		}
+
+		@Override
+		public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+			generator.open(name, context, runtimeContext);
+		}
+
+		@Override
+		public boolean hasNext() {
+			return true;
+		}
+
+		@Override
+		public BigDecimal next() {
+			BigDecimal decimal = new BigDecimal(generator.next(), new MathContext(precision));
+			return decimal.setScale(scale, RoundingMode.DOWN);
+		}
+	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
index 2fa00d7..9a2c655 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
@@ -26,29 +26,27 @@ import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.DoubleType;
 import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.*;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
 
 /**
  * Creates a sequential {@link DataGeneratorContainer} for a particular logical type.
  */
 @Internal
-public class SequenceGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
-
-	private final DataType type;
+public class SequenceGeneratorVisitor extends DataGenVisitorBase {
 
 	private final ReadableConfig config;
 
@@ -64,8 +62,9 @@ public class SequenceGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGene
 
 	private final ConfigOption<Long> longEnd;
 
-	public SequenceGeneratorVisitor(String name, DataType type, ReadableConfig config) {
-		this.type = type;
+	public SequenceGeneratorVisitor(String name, ReadableConfig config) {
+		super(name, config);
+
 		this.config = config;
 
 		this.startKeyStr = FIELDS + "." + name + "." + START;
@@ -160,8 +159,11 @@ public class SequenceGeneratorVisitor extends LogicalTypeDefaultVisitor<DataGene
 	}
 
 	@Override
-	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
-		throw new ValidationException("Unsupported type: " + type);
+	public DataGeneratorContainer visit(DecimalType decimalType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.bigDecimalGenerator(
+				config.get(intStart), config.get(intEnd), decimalType.getPrecision(), decimalType.getScale()),
+			intStart, intEnd);
 	}
 
 	private static SequenceGenerator<StringData> getSequenceStringGenerator(long start, long end) {
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index c5a547e..1be0ffc 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -53,6 +53,7 @@ import static org.apache.flink.table.factories.DataGenTableSourceFactory.KIND;
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.NUMBER_OF_ROWS;
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.RANDOM;
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.ROWS_PER_SECOND;
 import static org.apache.flink.table.factories.DataGenTableSourceFactory.SEQUENCE;
@@ -71,6 +72,43 @@ public class DataGenTableSourceFactoryTest {
 		.build();
 
 	@Test
+	public void testDataTypeCoverage() throws Exception {
+		TableSchema schema = TableSchema.builder()
+			.field("f0", DataTypes.CHAR(1))
+			.field("f1", DataTypes.VARCHAR(10))
+			.field("f2", DataTypes.STRING())
+			.field("f3", DataTypes.BOOLEAN())
+			.field("f4", DataTypes.DECIMAL(32, 2))
+			.field("f5", DataTypes.TINYINT())
+			.field("f6", DataTypes.SMALLINT())
+			.field("f7", DataTypes.INT())
+			.field("f8", DataTypes.BIGINT())
+			.field("f9", DataTypes.FLOAT())
+			.field("f10", DataTypes.DOUBLE())
+			.field("f11", DataTypes.DATE())
+			.field("f12", DataTypes.TIME())
+			.field("f13", DataTypes.TIMESTAMP())
+			.field("f14", DataTypes.TIMESTAMP_WITH_TIME_ZONE())
+			.field("f15", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+			.field("f16", DataTypes.INTERVAL(DataTypes.DAY()))
+			.field("f17", DataTypes.ARRAY(DataTypes.INT()))
+			.field("f18", DataTypes.MAP(DataTypes.STRING(), DataTypes.DATE()))
+			.field("f19", DataTypes.MULTISET(DataTypes.DECIMAL(32, 2)))
+			.field("f20", DataTypes.ROW(
+				DataTypes.FIELD("a", DataTypes.BIGINT()),
+				DataTypes.FIELD("b", DataTypes.TIME()),
+				DataTypes.FIELD("c", DataTypes.ROW(DataTypes.FIELD("d", DataTypes.TIMESTAMP())))
+			)).build();
+
+		DescriptorProperties descriptor = new DescriptorProperties();
+		descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+		descriptor.putString(NUMBER_OF_ROWS.key(), "10");
+
+		List<RowData> results = runGenerator(schema, descriptor);
+		Assert.assertEquals("Failed to generate all rows", 10, results.size());
+	}
+
+	@Test
 	public void testSource() throws Exception {
 		DescriptorProperties descriptor = new DescriptorProperties();
 		descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
@@ -87,8 +125,21 @@ public class DataGenTableSourceFactoryTest {
 		descriptor.putLong(FIELDS + ".f2." + START, 50);
 		descriptor.putLong(FIELDS + ".f2." + END, 60);
 
+		List<RowData> results = runGenerator(TEST_SCHEMA, descriptor);
+
+		Assert.assertEquals(11, results.size());
+		for (int i = 0; i < results.size(); i++) {
+			RowData row = results.get(i);
+			Assert.assertEquals(20, row.getString(0).toString().length());
+			long f1 = row.getLong(1);
+			Assert.assertTrue(f1 >= 10 && f1 <= 100);
+			Assert.assertEquals(i + 50, row.getLong(2));
+		}
+	}
+
+	private List<RowData> runGenerator(TableSchema schema, DescriptorProperties descriptor) throws Exception {
 		DynamicTableSource source = createSource(
-				TEST_SCHEMA, descriptor.asMap());
+			schema, descriptor.asMap());
 
 		assertTrue(source instanceof DataGenTableSource);
 
@@ -103,47 +154,11 @@ public class DataGenTableSourceFactoryTest {
 				new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
 		testHarness.open();
 
-		List<RowData> results = new ArrayList<>();
-
-		gen.run(new SourceFunction.SourceContext<RowData>() {
-
-			private Object lock = new Object();
-
-			@Override
-			public void collect(RowData element) {
-				results.add(element);
-			}
-
-			@Override
-			public void collectWithTimestamp(RowData element, long timestamp) {
-			}
-
-			@Override
-			public void emitWatermark(Watermark mark) {
-			}
-
-			@Override
-			public void markAsTemporarilyIdle() {
-			}
-
-			@Override
-			public Object getCheckpointLock() {
-				return lock;
-			}
+		TestContext ctx = new TestContext();
 
-			@Override
-			public void close() {
-			}
-		});
+		gen.run(ctx);
 
-		Assert.assertEquals(11, results.size());
-		for (int i = 0; i < results.size(); i++) {
-			RowData row = results.get(i);
-			Assert.assertEquals(20, row.getString(0).toString().length());
-			long f1 = row.getLong(1);
-			Assert.assertTrue(f1 >= 10 && f1 <= 100);
-			Assert.assertEquals(i + 50, row.getLong(2));
-		}
+		return ctx.results;
 	}
 
 	@Test
@@ -311,4 +326,37 @@ public class DataGenTableSourceFactoryTest {
 				new Configuration(),
 				Thread.currentThread().getContextClassLoader());
 	}
+
+	private static class TestContext implements SourceFunction.SourceContext<RowData> {
+
+		private final Object lock = new Object();
+
+		private final List<RowData> results = new ArrayList<>();
+
+		@Override
+		public void collect(RowData element) {
+			results.add(element);
+		}
+
+		@Override
+		public void collectWithTimestamp(RowData element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public void markAsTemporarilyIdle() {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
 }