You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/09/14 17:48:39 UTC

[flink] 05/05: [FLINK-18735][table] Add additional logging to DataGen source

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef01fec2a632f65556e0b30faad7399120b62e95
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Aug 11 08:40:44 2020 -0500

    [FLINK-18735][table] Add additional logging to DataGen source
    
    This closes #13010
---
 docs/dev/table/connectors/datagen.md               | 173 +++++++++++++++++----
 .../source/datagen/DataGeneratorSource.java        |  23 ++-
 .../table/factories/DataGenTableSourceFactory.java |   5 +-
 .../factories/datagen/DataGenTableSource.java      |  71 ++-------
 .../factories/datagen/DataGenVisitorBase.java      |  16 +-
 .../factories/datagen/RandomGeneratorVisitor.java  |  87 ++++-------
 .../datagen/types/DataGeneratorMapper.java         |  60 +++++++
 .../factories/datagen/types/RowDataGenerator.java  |  80 ++++++++++
 .../stream/table/DataGeneratorConnectorITCase.java |  72 +++++++++
 9 files changed, 428 insertions(+), 159 deletions(-)

diff --git a/docs/dev/table/connectors/datagen.md b/docs/dev/table/connectors/datagen.md
index 99cce5b..27e7a20 100644
--- a/docs/dev/table/connectors/datagen.md
+++ b/docs/dev/table/connectors/datagen.md
@@ -43,42 +43,159 @@ For variable sized types, char/varchar/string/array/map/multiset, the length can
 Additionally, a total number of rows can be specified, resulting in a bounded table.
 
 There also exists a sequence generator, where users specify a sequence of start and end values.
-Complex types cannot be generated as a sequence.
 If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes.
 
 Time types are always the local machines current system time.
 
-<div class="codetabs" markdown="1">
-<div data-lang="SQL" markdown="1">
 {% highlight sql %}
-CREATE TABLE datagen (
- f_sequence INT,
- f_random INT,
- f_random_str STRING,
- ts TIMESTAMP(3)
- WATERMARK FOR ts AS ts
+CREATE TABLE Orders (
+    order_number BIGINT,
+    price        DECIMAL(32,2),
+    buyer        ROW<first_name STRING, last_name STRING>
+    order_time   TIMESTAMP(3)
 ) WITH (
- 'connector' = 'datagen',
-
- -- optional options --
-
- 'rows-per-second'='5',
- 
- -- make the table bounded
- 'number-of-rows'='10'
+  'connector' = 'datagen'
+)
+{% endhighlight %}
 
- 'fields.f_sequence.kind'='sequence',
- 'fields.f_sequence.start'='1',
- 'fields.f_sequence.end'='1000',
+Often, the data generator connector is used in conjuction with the ``LIKE`` clause to mock out physical tables. 
 
- 'fields.f_random.min'='1',
- 'fields.f_random.max'='1000',
+{% highlight sql %}
+CREATE TABLE Orders (
+    order_number BIGINT,
+    price        DECIMAL(32,2),
+    buyer        ROW<first_name STRING, last_name STRING>
+    order_time   TIMESTAMP(3)
+) WITH (...)
 
- 'fields.f_random_str.length'='10'
+-- create a bounded mock table
+CREATE TEMPORARY TABLE GenOrders 
+WITH ( 
+    'connector' = 'datagen',
+    'number-of-rows' = '10'
 )
+LIKE Orders (EXCLUDING ALL)
 {% endhighlight %}
-</div>
-</div>
+
+Types
+-----
+
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 25%">Type</th>
+            <th class="text-center" style="width: 25%">Supported Generators</th>
+            <th class="text-center" style="width: 50%">Notes</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td>BOOLEAN</td>
+            <td>random</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>CHAR</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>VARCHAR</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>STRING</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DECIMAL</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>TINYINT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>  
+        <tr>
+            <td>SMALLINT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr> 
+        <tr>
+            <td>INT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr> 
+        <tr>
+            <td>BIGINT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>FLOAT</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DOUBLE</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>DATE</td>
+            <td>random</td>
+            <td>Always resolves to the current date of the local machine.</td>
+        </tr>
+        <tr>
+            <td>TIME</td>
+            <td>random</td>
+            <td>Always resolves to the current time of the local machine.</td>
+        </tr>
+        <tr>
+            <td>TIMESTAMP</td>
+            <td>random</td>
+            <td>Always resolves to the current timestamp of the local machine.</td>
+        </tr>    
+        <tr>
+            <td>TIMESTAMP WITH LOCAL TIMEZONE</td>
+            <td>random</td>
+            <td>Always resolves to the current timestamp of the local machine.</td>
+        </tr>   
+        <tr>
+            <td>INTERVAL YEAR TO MONTH</td>
+            <td>random</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>INTERVAL DAY TO MONTH</td>
+            <td>random</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>ROW</td>
+            <td>random</td>
+            <td>Generates a row with random subfields.</td>
+        </tr>
+        <tr>
+            <td>ARRAY</td>
+            <td>random</td>
+            <td>Generates an array with random entries.</td>
+        </tr>
+        <tr>
+            <td>MAP</td>
+            <td>random</td>
+            <td>Generates a map with random entries.</td>
+        </tr>
+        <tr>
+            <td>MULTISET</td>
+            <td>random</td>
+            <td>Generates a multiset with random entries.</td>
+        </tr>
+    </tbody>
+</table>
 
 Connector Options
 ----------------
@@ -127,21 +244,21 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">(Minimum value of type)</td>
       <td>(Type of field)</td>
-      <td>Minimum value of random generator, work for number types.</td>
+      <td>Minimum value of random generator, work for numeric types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.max</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">(Maximum value of type)</td>
       <td>(Type of field)</td>
-      <td>Maximum value of random generator, work for number types.</td>
+      <td>Maximum value of random generator, work for numeric types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.length</h5></td>
       <td>optional</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
-      <td>Length for string generating of random generator, work for char/varchar/string.</td>
+      <td>Size or length of the collection for generating char/varchar/string/array/map/multiset types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.start</h5></td>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index 0352d9d..1de9b28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -37,15 +39,18 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
 	private final DataGenerator<T> generator;
+
 	private final long rowsPerSecond;
 
 	@Nullable
-	private Long numberOfRows;
+	private final Long numberOfRows;
 
-	private int outputSoFar;
+	private transient int outputSoFar;
 
-	private int toOutput;
+	private transient int toOutput;
 
 	transient volatile boolean isRunning;
 
@@ -65,7 +70,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	 * @param rowsPerSecond Control the emit rate.
 	 * @param numberOfRows Total number of rows to output.
 	 */
-	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, Long numberOfRows) {
+	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows) {
 		this.generator = generator;
 		this.rowsPerSecond = rowsPerSecond;
 		this.numberOfRows = numberOfRows;
@@ -104,9 +109,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 			for (int i = 0; i < taskRowsPerSecond; i++) {
 				if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
 					synchronized (ctx.getCheckpointLock()) {
-						if (numberOfRows != null) {
-							outputSoFar++;
-						}
+						outputSoFar++;
 						ctx.collect(this.generator.next());
 					}
 				} else {
@@ -124,6 +127,12 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
 	}
 
 	@Override
+	public void close() throws Exception {
+		super.close();
+		LOG.info("generated {} rows", outputSoFar);
+	}
+
+	@Override
 	public void cancel() {
 		isRunning = false;
 	}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index a2b9df3..dbc39ab 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -93,7 +93,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		context.getCatalogTable().getOptions().forEach(options::setString);
 
 		TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-		DataGenerator[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
+		DataGenerator<?>[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
 		Set<ConfigOption<?>> optionalOptions = new HashSet<>();
 
 		for (int i = 0; i < fieldGenerators.length; i++) {
@@ -118,7 +118,8 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
 		FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(), consumedOptionKeys);
 
-		return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
+		String name = context.getObjectIdentifier().toString();
+		return new DataGenTableSource(fieldGenerators, name, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
 	}
 
 	private DataGeneratorContainer createContainer(
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
index 935b465..e5a9b36 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -20,9 +20,6 @@ package org.apache.flink.table.factories.datagen;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
@@ -31,8 +28,8 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
 import org.apache.flink.table.sources.StreamTableSource;
 
 /**
@@ -42,13 +39,20 @@ import org.apache.flink.table.sources.StreamTableSource;
 @Internal
 public class DataGenTableSource implements ScanTableSource {
 
-	private final DataGenerator[] fieldGenerators;
+	private final DataGenerator<?>[] fieldGenerators;
+	private final String tableName;
 	private final TableSchema schema;
 	private final long rowsPerSecond;
 	private final Long numberOfRows;
 
-	public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond, Long numberOfRows) {
+	public DataGenTableSource(
+		DataGenerator<?>[] fieldGenerators,
+		String tableName,
+		TableSchema schema,
+		long rowsPerSecond,
+		Long numberOfRows) {
 		this.fieldGenerators = fieldGenerators;
+		this.tableName = tableName;
 		this.schema = schema;
 		this.rowsPerSecond = rowsPerSecond;
 		this.numberOfRows = numberOfRows;
@@ -56,20 +60,20 @@ public class DataGenTableSource implements ScanTableSource {
 
 	@Override
 	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
-		boolean isBounded = numberOfRows == null;
+		boolean isBounded = numberOfRows != null;
 		return SourceFunctionProvider.of(createSource(), isBounded);
 	}
 
 	@VisibleForTesting
 	public DataGeneratorSource<RowData> createSource() {
 		return new DataGeneratorSource<>(
-			new RowGenerator(fieldGenerators, schema.getFieldNames()),
+			new RowDataGenerator(fieldGenerators, schema.getFieldNames()),
 			rowsPerSecond, numberOfRows);
 	}
 
 	@Override
 	public DynamicTableSource copy() {
-		return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond, numberOfRows);
+		return new DataGenTableSource(fieldGenerators, tableName, schema, rowsPerSecond, numberOfRows);
 	}
 
 	@Override
@@ -81,54 +85,5 @@ public class DataGenTableSource implements ScanTableSource {
 	public ChangelogMode getChangelogMode() {
 		return ChangelogMode.insertOnly();
 	}
-
-	private static class RowGenerator implements DataGenerator<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final DataGenerator[] fieldGenerators;
-		private final String[] fieldNames;
-
-		private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
-			this.fieldGenerators = fieldGenerators;
-			this.fieldNames = fieldNames;
-		}
-
-		@Override
-		public void open(
-			String name,
-			FunctionInitializationContext context,
-			RuntimeContext runtimeContext) throws Exception {
-			for (int i = 0; i < fieldGenerators.length; i++) {
-				fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
-			}
-		}
-
-		@Override
-		public void snapshotState(FunctionSnapshotContext context) throws Exception {
-			for (DataGenerator generator : fieldGenerators) {
-				generator.snapshotState(context);
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			for (DataGenerator generator : fieldGenerators) {
-				if (!generator.hasNext()) {
-					return false;
-				}
-			}
-			return true;
-		}
-
-		@Override
-		public RowData next() {
-			GenericRowData row = new GenericRowData(fieldNames.length);
-			for (int i = 0; i < fieldGenerators.length; i++) {
-				row.setField(i, fieldGenerators[i].next());
-			}
-			return row;
-		}
-	}
 }
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
index a34c91f..f68dcbc 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DateType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -32,13 +33,12 @@ import org.apache.flink.table.types.logical.ZonedTimestampType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
 
 import java.io.Serializable;
-import java.time.Instant;
 import java.time.LocalDate;
-import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.time.OffsetDateTime;
 import java.util.function.Supplier;
 
+import static java.time.temporal.ChronoField.MILLI_OF_DAY;
+
 /**
  * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
  */
@@ -55,27 +55,27 @@ public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataG
 
 	@Override
 	public DataGeneratorContainer visit(DateType dateType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> (int) LocalDate.now().toEpochDay()));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(TimeType timeType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY)));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(TimestampType timestampType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
 	}
 
 	@Override
 	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
-		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
 	}
 
 	@Override
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index 97fafe4..77c2b45 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -27,7 +27,12 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -45,12 +50,12 @@ import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
-import org.apache.flink.types.Row;
 
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -170,7 +175,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
 		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
 		return DataGeneratorContainer.of(
-			new BigDecimalRandomGenerator(
+			new DecimalDataRandomGenerator(
 				RandomGenerator.doubleGenerator(config.get(min), config.get(max)),
 				decimalType.getPrecision(), decimalType.getScale()));
 	}
@@ -206,8 +211,9 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 			.getElementType()
 			.accept(new RandomGeneratorVisitor(fieldName, config));
 
+		DataGenerator<Object[]> generator = RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption));
 		return DataGeneratorContainer.of(
-			RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption)),
+			new DataGeneratorMapper<>(generator, (GenericArrayData::new)),
 			container.getOptions().toArray(new ConfigOption<?>[0]));
 	}
 
@@ -222,10 +228,13 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 			.getElementType()
 			.accept(new RandomGeneratorVisitor(fieldName, config));
 
+		DataGenerator<Map<Object, Integer>> mapGenerator = RandomGenerator.mapGenerator(
+			container.getGenerator(),
+			RandomGenerator.intGenerator(0, 10),
+			config.get(lenOption));
+
 		return DataGeneratorContainer.of(
-			RandomGenerator.mapGenerator(container.getGenerator(),
-				RandomGenerator.intGenerator(0, 10),
-				config.get(lenOption)),
+			new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
 			container.getOptions().toArray(new ConfigOption<?>[0]));
 	}
 
@@ -249,11 +258,13 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		Set<ConfigOption<?>> options = keyContainer.getOptions();
 		options.addAll(valContainer.getOptions());
 
+		DataGenerator<Map<Object, Object>> mapGenerator = RandomGenerator.mapGenerator(
+			keyContainer.getGenerator(),
+			valContainer.getGenerator(),
+			config.get(lenOption));
+
 		return DataGeneratorContainer.of(
-			RandomGenerator.mapGenerator(
-				keyContainer.getGenerator(),
-				valContainer.getGenerator(),
-				config.get(lenOption)),
+			new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
 			options.toArray(new ConfigOption<?>[0]));
 	}
 
@@ -271,12 +282,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 			.flatMap(container -> container.getOptions().stream())
 			.toArray(ConfigOption[]::new);
 
-		List<DataGenerator> generators = fieldContainers
+		DataGenerator[] generators = fieldContainers
 			.stream()
 			.map(DataGeneratorContainer::getGenerator)
-			.collect(Collectors.toList());
+			.toArray(DataGenerator[]::new);
 
-		return DataGeneratorContainer.of(new RowGenerator(name, rowType.getFieldNames(), generators), options);
+		String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+		return DataGeneratorContainer.of(new RowDataGenerator(generators, fieldNames), options);
 	}
 
 	@Override
@@ -293,51 +306,12 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		};
 	}
 
-	private static class RowGenerator implements DataGenerator<Row> {
-
-		private final String rowName;
-
-		private final List<String> fieldNames;
-
-		private final List<DataGenerator> generators;
-
-		private RowGenerator(String rowName, List<String> fieldNames, List<DataGenerator> generators) {
-			this.rowName = rowName;
-			this.fieldNames = fieldNames;
-			this.generators = generators;
-		}
-
-		@Override
-		public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
-			for (int i = 0; i < generators.size(); i++) {
-				String fullName = rowName + "." + fieldNames.get(i);
-				generators.get(i).open(fullName, context, runtimeContext);
-			}
-		}
-
-		@Override
-		public boolean hasNext() {
-			return true;
-		}
-
-		@Override
-		public Row next() {
-			Row row = new Row(generators.size());
-
-			for (int i = 0; i < generators.size(); i++) {
-				row.setField(i, generators.get(i).next());
-			}
-
-			return row;
-		}
-	}
-
-	private static class BigDecimalRandomGenerator implements DataGenerator<BigDecimal> {
+	private static class DecimalDataRandomGenerator implements DataGenerator<DecimalData> {
 		private final RandomGenerator<Double> generator;
 		private final int precision;
 		private final int scale;
 
-		public BigDecimalRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
+		public DecimalDataRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
 			this.generator = generator;
 			this.precision = precision;
 			this.scale = scale;
@@ -354,9 +328,10 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 		}
 
 		@Override
-		public BigDecimal next() {
+		public DecimalData next() {
 			BigDecimal decimal = new BigDecimal(generator.next(), new MathContext(precision));
-			return decimal.setScale(scale, RoundingMode.DOWN);
+			decimal = decimal.setScale(scale, RoundingMode.DOWN);
+			return DecimalData.fromBigDecimal(decimal, precision, scale);
 		}
 	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
new file mode 100644
index 0000000..25532cf
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Utility for mapping the output of a {@link DataGenerator}.
+ */
+@Internal
+public class DataGeneratorMapper<A, B> implements DataGenerator<B> {
+
+	private final DataGenerator<A> generator;
+
+	private final SerializableFunction<A, B> mapper;
+
+	public DataGeneratorMapper(DataGenerator<A> generator, SerializableFunction<A, B> mapper) {
+		this.generator = generator;
+		this.mapper = mapper;
+	}
+
+	@Override
+	public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+		generator.open(name, context, runtimeContext);
+	}
+
+	@Override
+	public boolean hasNext() {
+		return generator.hasNext();
+	}
+
+	@Override
+	public B next() {
+		return mapper.apply(generator.next());
+	}
+
+	public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
new file mode 100644
index 0000000..26cbc24
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Data generator for Flink's internal {@link RowData} type.
+ */
+@Internal
+public class RowDataGenerator implements DataGenerator<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataGenerator<?>[] fieldGenerators;
+	private final String[] fieldNames;
+
+	public RowDataGenerator(DataGenerator<?>[] fieldGenerators, String[] fieldNames) {
+		this.fieldGenerators = fieldGenerators;
+		this.fieldNames = fieldNames;
+	}
+
+	@Override
+	public void open(
+		String name,
+		FunctionInitializationContext context,
+		RuntimeContext runtimeContext) throws Exception {
+		for (int i = 0; i < fieldGenerators.length; i++) {
+			fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		for (DataGenerator<?> generator : fieldGenerators) {
+			generator.snapshotState(context);
+		}
+	}
+
+	@Override
+	public boolean hasNext() {
+		for (DataGenerator<?> generator : fieldGenerators) {
+			if (!generator.hasNext()) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public RowData next() {
+		GenericRowData row = new GenericRowData(fieldNames.length);
+		for (int i = 0; i < fieldGenerators.length; i++) {
+			row.setField(i, fieldGenerators[i].next());
+		}
+		return row;
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
new file mode 100644
index 0000000..25be021
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataGeneratorConnectorITCase extends BatchTestBase {
+
+	private static final String TABLE = "CREATE TABLE datagen_t (\n" +
+		"	f0 CHAR(1),\n" +
+		"	f1 VARCHAR(10),\n" +
+		"	f2 STRING,\n" +
+		"	f3 BOOLEAN,\n" +
+		"	f4 DECIMAL(32,2),\n" +
+		"	f5 TINYINT,\n" +
+		"	f6 SMALLINT,\n" +
+		"	f7 INT,\n" +
+		"	f8 BIGINT,\n" +
+		"	f9 FLOAT,\n" +
+		"	f10 DOUBLE,\n" +
+		"	f11 DATE,\n" +
+		"	f12 TIME,\n" +
+		"	f13 TIMESTAMP(3),\n" +
+		"	f14 TIMESTAMP WITH LOCAL TIME ZONE,\n" +
+		"	f15 INT ARRAY,\n" +
+		"	f16 MAP<STRING, DATE>,\n" +
+		"	f17 DECIMAL(32,2) MULTISET,\n" +
+		"	f18 ROW<a BIGINT, b TIME, c ROW<d TIMESTAMP>>\n" +
+		") WITH (" +
+		"	'connector' = 'datagen',\n" +
+		"	'number-of-rows' = '10'\n" +
+		")";
+
+	@Test
+	public void testTypes() throws Exception {
+		tEnv().executeSql(TABLE);
+
+		List<Row> results = new ArrayList<>();
+
+		try (CloseableIterator<Row> iter = tEnv().executeSql("select * from datagen_t").collect()) {
+			while (iter.hasNext()) {
+				results.add(iter.next());
+			}
+		}
+
+		Assert.assertEquals("Unexpected number of results",10, results.size());
+	}
+}