You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2020/09/14 17:48:39 UTC
[flink] 05/05: [FLINK-18735][table] Add additional logging to
DataGen source
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ef01fec2a632f65556e0b30faad7399120b62e95
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Aug 11 08:40:44 2020 -0500
[FLINK-18735][table] Add additional logging to DataGen source
This closes #13010
---
docs/dev/table/connectors/datagen.md | 173 +++++++++++++++++----
.../source/datagen/DataGeneratorSource.java | 23 ++-
.../table/factories/DataGenTableSourceFactory.java | 5 +-
.../factories/datagen/DataGenTableSource.java | 71 ++-------
.../factories/datagen/DataGenVisitorBase.java | 16 +-
.../factories/datagen/RandomGeneratorVisitor.java | 87 ++++-------
.../datagen/types/DataGeneratorMapper.java | 60 +++++++
.../factories/datagen/types/RowDataGenerator.java | 80 ++++++++++
.../stream/table/DataGeneratorConnectorITCase.java | 72 +++++++++
9 files changed, 428 insertions(+), 159 deletions(-)
diff --git a/docs/dev/table/connectors/datagen.md b/docs/dev/table/connectors/datagen.md
index 99cce5b..27e7a20 100644
--- a/docs/dev/table/connectors/datagen.md
+++ b/docs/dev/table/connectors/datagen.md
@@ -43,42 +43,159 @@ For variable sized types, char/varchar/string/array/map/multiset, the length can
Additionally, a total number of rows can be specified, resulting in a bounded table.
There also exists a sequence generator, where users specify a sequence of start and end values.
-Complex types cannot be generated as a sequence.
If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes.
Time types are always the local machines current system time.
-<div class="codetabs" markdown="1">
-<div data-lang="SQL" markdown="1">
{% highlight sql %}
-CREATE TABLE datagen (
- f_sequence INT,
- f_random INT,
- f_random_str STRING,
- ts TIMESTAMP(3)
- WATERMARK FOR ts AS ts
+CREATE TABLE Orders (
+ order_number BIGINT,
+ price DECIMAL(32,2),
+ buyer ROW<first_name STRING, last_name STRING>
+ order_time TIMESTAMP(3)
) WITH (
- 'connector' = 'datagen',
-
- -- optional options --
-
- 'rows-per-second'='5',
-
- -- make the table bounded
- 'number-of-rows'='10'
+ 'connector' = 'datagen'
+)
+{% endhighlight %}
- 'fields.f_sequence.kind'='sequence',
- 'fields.f_sequence.start'='1',
- 'fields.f_sequence.end'='1000',
+Often, the data generator connector is used in conjuction with the ``LIKE`` clause to mock out physical tables.
- 'fields.f_random.min'='1',
- 'fields.f_random.max'='1000',
+{% highlight sql %}
+CREATE TABLE Orders (
+ order_number BIGINT,
+ price DECIMAL(32,2),
+ buyer ROW<first_name STRING, last_name STRING>
+ order_time TIMESTAMP(3)
+) WITH (...)
- 'fields.f_random_str.length'='10'
+-- create a bounded mock table
+CREATE TEMPORARY TABLE GenOrders
+WITH (
+ 'connector' = 'datagen',
+ 'number-of-rows' = '10'
)
+LIKE Orders (EXCLUDING ALL)
{% endhighlight %}
-</div>
-</div>
+
+Types
+-----
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Type</th>
+ <th class="text-center" style="width: 25%">Supported Generators</th>
+ <th class="text-center" style="width: 50%">Notes</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td>BOOLEAN</td>
+ <td>random</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>CHAR</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>VARCHAR</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>STRING</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DECIMAL</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>TINYINT</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>SMALLINT</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>INT</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>BIGINT</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>FLOAT</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DOUBLE</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>DATE</td>
+ <td>random</td>
+ <td>Always resolves to the current date of the local machine.</td>
+ </tr>
+ <tr>
+ <td>TIME</td>
+ <td>random</td>
+ <td>Always resolves to the current time of the local machine.</td>
+ </tr>
+ <tr>
+ <td>TIMESTAMP</td>
+ <td>random</td>
+ <td>Always resolves to the current timestamp of the local machine.</td>
+ </tr>
+ <tr>
+ <td>TIMESTAMP WITH LOCAL TIMEZONE</td>
+ <td>random</td>
+ <td>Always resolves to the current timestamp of the local machine.</td>
+ </tr>
+ <tr>
+ <td>INTERVAL YEAR TO MONTH</td>
+ <td>random</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>INTERVAL DAY TO MONTH</td>
+ <td>random</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>ROW</td>
+ <td>random</td>
+ <td>Generates a row with random subfields.</td>
+ </tr>
+ <tr>
+ <td>ARRAY</td>
+ <td>random</td>
+ <td>Generates an array with random entries.</td>
+ </tr>
+ <tr>
+ <td>MAP</td>
+ <td>random</td>
+ <td>Generates a map with random entries.</td>
+ </tr>
+ <tr>
+ <td>MULTISET</td>
+ <td>random</td>
+ <td>Generates a multiset with random entries.</td>
+ </tr>
+ </tbody>
+</table>
Connector Options
----------------
@@ -127,21 +244,21 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">(Minimum value of type)</td>
<td>(Type of field)</td>
- <td>Minimum value of random generator, work for number types.</td>
+ <td>Minimum value of random generator, work for numeric types.</td>
</tr>
<tr>
<td><h5>fields.#.max</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">(Maximum value of type)</td>
<td>(Type of field)</td>
- <td>Maximum value of random generator, work for number types.</td>
+ <td>Maximum value of random generator, work for numeric types.</td>
</tr>
<tr>
<td><h5>fields.#.length</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
- <td>Length for string generating of random generator, work for char/varchar/string.</td>
+ <td>Size or length of the collection for generating char/varchar/string/array/map/multiset types.</td>
</tr>
<tr>
<td><h5>fields.#.start</h5></td>
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
index 0352d9d..1de9b28 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -37,15 +39,18 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
private final DataGenerator<T> generator;
+
private final long rowsPerSecond;
@Nullable
- private Long numberOfRows;
+ private final Long numberOfRows;
- private int outputSoFar;
+ private transient int outputSoFar;
- private int toOutput;
+ private transient int toOutput;
transient volatile boolean isRunning;
@@ -65,7 +70,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
* @param rowsPerSecond Control the emit rate.
* @param numberOfRows Total number of rows to output.
*/
- public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, Long numberOfRows) {
+ public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond, @Nullable Long numberOfRows) {
this.generator = generator;
this.rowsPerSecond = rowsPerSecond;
this.numberOfRows = numberOfRows;
@@ -104,9 +109,7 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
for (int i = 0; i < taskRowsPerSecond; i++) {
if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
synchronized (ctx.getCheckpointLock()) {
- if (numberOfRows != null) {
- outputSoFar++;
- }
+ outputSoFar++;
ctx.collect(this.generator.next());
}
} else {
@@ -124,6 +127,12 @@ public class DataGeneratorSource<T> extends RichParallelSourceFunction<T> implem
}
@Override
+ public void close() throws Exception {
+ super.close();
+ LOG.info("generated {} rows", outputSoFar);
+ }
+
+ @Override
public void cancel() {
isRunning = false;
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index a2b9df3..dbc39ab 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -93,7 +93,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
context.getCatalogTable().getOptions().forEach(options::setString);
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
- DataGenerator[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
+ DataGenerator<?>[] fieldGenerators = new DataGenerator[schema.getFieldCount()];
Set<ConfigOption<?>> optionalOptions = new HashSet<>();
for (int i = 0; i < fieldGenerators.length; i++) {
@@ -118,7 +118,8 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
FactoryUtil.validateUnconsumedKeys(factoryIdentifier(), options.keySet(), consumedOptionKeys);
- return new DataGenTableSource(fieldGenerators, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
+ String name = context.getObjectIdentifier().toString();
+ return new DataGenTableSource(fieldGenerators, name, schema, options.get(ROWS_PER_SECOND), options.get(NUMBER_OF_ROWS));
}
private DataGeneratorContainer createContainer(
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
index 935b465..e5a9b36 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
@@ -20,9 +20,6 @@ package org.apache.flink.table.factories.datagen;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
@@ -31,8 +28,8 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
import org.apache.flink.table.sources.StreamTableSource;
/**
@@ -42,13 +39,20 @@ import org.apache.flink.table.sources.StreamTableSource;
@Internal
public class DataGenTableSource implements ScanTableSource {
- private final DataGenerator[] fieldGenerators;
+ private final DataGenerator<?>[] fieldGenerators;
+ private final String tableName;
private final TableSchema schema;
private final long rowsPerSecond;
private final Long numberOfRows;
- public DataGenTableSource(DataGenerator[] fieldGenerators, TableSchema schema, long rowsPerSecond, Long numberOfRows) {
+ public DataGenTableSource(
+ DataGenerator<?>[] fieldGenerators,
+ String tableName,
+ TableSchema schema,
+ long rowsPerSecond,
+ Long numberOfRows) {
this.fieldGenerators = fieldGenerators;
+ this.tableName = tableName;
this.schema = schema;
this.rowsPerSecond = rowsPerSecond;
this.numberOfRows = numberOfRows;
@@ -56,20 +60,20 @@ public class DataGenTableSource implements ScanTableSource {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
- boolean isBounded = numberOfRows == null;
+ boolean isBounded = numberOfRows != null;
return SourceFunctionProvider.of(createSource(), isBounded);
}
@VisibleForTesting
public DataGeneratorSource<RowData> createSource() {
return new DataGeneratorSource<>(
- new RowGenerator(fieldGenerators, schema.getFieldNames()),
+ new RowDataGenerator(fieldGenerators, schema.getFieldNames()),
rowsPerSecond, numberOfRows);
}
@Override
public DynamicTableSource copy() {
- return new DataGenTableSource(fieldGenerators, schema, rowsPerSecond, numberOfRows);
+ return new DataGenTableSource(fieldGenerators, tableName, schema, rowsPerSecond, numberOfRows);
}
@Override
@@ -81,54 +85,5 @@ public class DataGenTableSource implements ScanTableSource {
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
-
- private static class RowGenerator implements DataGenerator<RowData> {
-
- private static final long serialVersionUID = 1L;
-
- private final DataGenerator[] fieldGenerators;
- private final String[] fieldNames;
-
- private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
- this.fieldGenerators = fieldGenerators;
- this.fieldNames = fieldNames;
- }
-
- @Override
- public void open(
- String name,
- FunctionInitializationContext context,
- RuntimeContext runtimeContext) throws Exception {
- for (int i = 0; i < fieldGenerators.length; i++) {
- fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- for (DataGenerator generator : fieldGenerators) {
- generator.snapshotState(context);
- }
- }
-
- @Override
- public boolean hasNext() {
- for (DataGenerator generator : fieldGenerators) {
- if (!generator.hasNext()) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public RowData next() {
- GenericRowData row = new GenericRowData(fieldNames.length);
- for (int i = 0; i < fieldGenerators.length; i++) {
- row.setField(i, fieldGenerators[i].next());
- }
- return row;
- }
- }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
index a34c91f..f68dcbc 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -32,13 +33,12 @@ import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
import java.io.Serializable;
-import java.time.Instant;
import java.time.LocalDate;
-import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.time.OffsetDateTime;
import java.util.function.Supplier;
+import static java.time.temporal.ChronoField.MILLI_OF_DAY;
+
/**
* Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
*/
@@ -55,27 +55,27 @@ public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataG
@Override
public DataGeneratorContainer visit(DateType dateType) {
- return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+ return DataGeneratorContainer.of(TimeGenerator.of(() -> (int) LocalDate.now().toEpochDay()));
}
@Override
public DataGeneratorContainer visit(TimeType timeType) {
- return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+ return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_DAY)));
}
@Override
public DataGeneratorContainer visit(TimestampType timestampType) {
- return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+ return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
}
@Override
public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
- return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+ return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
}
@Override
public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
- return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+ return DataGeneratorContainer.of(TimeGenerator.of(() -> TimestampData.fromEpochMillis(System.currentTimeMillis())));
}
@Override
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index 97fafe4..77c2b45 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -27,7 +27,12 @@ import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
@@ -45,12 +50,12 @@ import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
-import org.apache.flink.types.Row;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -170,7 +175,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
return DataGeneratorContainer.of(
- new BigDecimalRandomGenerator(
+ new DecimalDataRandomGenerator(
RandomGenerator.doubleGenerator(config.get(min), config.get(max)),
decimalType.getPrecision(), decimalType.getScale()));
}
@@ -206,8 +211,9 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
.getElementType()
.accept(new RandomGeneratorVisitor(fieldName, config));
+ DataGenerator<Object[]> generator = RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption));
return DataGeneratorContainer.of(
- RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption)),
+ new DataGeneratorMapper<>(generator, (GenericArrayData::new)),
container.getOptions().toArray(new ConfigOption<?>[0]));
}
@@ -222,10 +228,13 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
.getElementType()
.accept(new RandomGeneratorVisitor(fieldName, config));
+ DataGenerator<Map<Object, Integer>> mapGenerator = RandomGenerator.mapGenerator(
+ container.getGenerator(),
+ RandomGenerator.intGenerator(0, 10),
+ config.get(lenOption));
+
return DataGeneratorContainer.of(
- RandomGenerator.mapGenerator(container.getGenerator(),
- RandomGenerator.intGenerator(0, 10),
- config.get(lenOption)),
+ new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
container.getOptions().toArray(new ConfigOption<?>[0]));
}
@@ -249,11 +258,13 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
Set<ConfigOption<?>> options = keyContainer.getOptions();
options.addAll(valContainer.getOptions());
+ DataGenerator<Map<Object, Object>> mapGenerator = RandomGenerator.mapGenerator(
+ keyContainer.getGenerator(),
+ valContainer.getGenerator(),
+ config.get(lenOption));
+
return DataGeneratorContainer.of(
- RandomGenerator.mapGenerator(
- keyContainer.getGenerator(),
- valContainer.getGenerator(),
- config.get(lenOption)),
+ new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
options.toArray(new ConfigOption<?>[0]));
}
@@ -271,12 +282,14 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
.flatMap(container -> container.getOptions().stream())
.toArray(ConfigOption[]::new);
- List<DataGenerator> generators = fieldContainers
+ DataGenerator[] generators = fieldContainers
.stream()
.map(DataGeneratorContainer::getGenerator)
- .collect(Collectors.toList());
+ .toArray(DataGenerator[]::new);
- return DataGeneratorContainer.of(new RowGenerator(name, rowType.getFieldNames(), generators), options);
+ String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+ return DataGeneratorContainer.of(new RowDataGenerator(generators, fieldNames), options);
}
@Override
@@ -293,51 +306,12 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
};
}
- private static class RowGenerator implements DataGenerator<Row> {
-
- private final String rowName;
-
- private final List<String> fieldNames;
-
- private final List<DataGenerator> generators;
-
- private RowGenerator(String rowName, List<String> fieldNames, List<DataGenerator> generators) {
- this.rowName = rowName;
- this.fieldNames = fieldNames;
- this.generators = generators;
- }
-
- @Override
- public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
- for (int i = 0; i < generators.size(); i++) {
- String fullName = rowName + "." + fieldNames.get(i);
- generators.get(i).open(fullName, context, runtimeContext);
- }
- }
-
- @Override
- public boolean hasNext() {
- return true;
- }
-
- @Override
- public Row next() {
- Row row = new Row(generators.size());
-
- for (int i = 0; i < generators.size(); i++) {
- row.setField(i, generators.get(i).next());
- }
-
- return row;
- }
- }
-
- private static class BigDecimalRandomGenerator implements DataGenerator<BigDecimal> {
+ private static class DecimalDataRandomGenerator implements DataGenerator<DecimalData> {
private final RandomGenerator<Double> generator;
private final int precision;
private final int scale;
- public BigDecimalRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
+ public DecimalDataRandomGenerator(RandomGenerator<Double> generator, int precision, int scale) {
this.generator = generator;
this.precision = precision;
this.scale = scale;
@@ -354,9 +328,10 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
}
@Override
- public BigDecimal next() {
+ public DecimalData next() {
BigDecimal decimal = new BigDecimal(generator.next(), new MathContext(precision));
- return decimal.setScale(scale, RoundingMode.DOWN);
+ decimal = decimal.setScale(scale, RoundingMode.DOWN);
+ return DecimalData.fromBigDecimal(decimal, precision, scale);
}
}
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
new file mode 100644
index 0000000..25532cf
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Utility for mapping the output of a {@link DataGenerator}.
+ */
+@Internal
+public class DataGeneratorMapper<A, B> implements DataGenerator<B> {
+
+ private final DataGenerator<A> generator;
+
+ private final SerializableFunction<A, B> mapper;
+
+ public DataGeneratorMapper(DataGenerator<A> generator, SerializableFunction<A, B> mapper) {
+ this.generator = generator;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+ generator.open(name, context, runtimeContext);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return generator.hasNext();
+ }
+
+ @Override
+ public B next() {
+ return mapper.apply(generator.next());
+ }
+
+ public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
new file mode 100644
index 0000000..26cbc24
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Data generator for Flink's internal {@link RowData} type.
+ */
+@Internal
+public class RowDataGenerator implements DataGenerator<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final DataGenerator<?>[] fieldGenerators;
+ private final String[] fieldNames;
+
+ public RowDataGenerator(DataGenerator<?>[] fieldGenerators, String[] fieldNames) {
+ this.fieldGenerators = fieldGenerators;
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public void open(
+ String name,
+ FunctionInitializationContext context,
+ RuntimeContext runtimeContext) throws Exception {
+ for (int i = 0; i < fieldGenerators.length; i++) {
+ fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ for (DataGenerator<?> generator : fieldGenerators) {
+ generator.snapshotState(context);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ for (DataGenerator<?> generator : fieldGenerators) {
+ if (!generator.hasNext()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public RowData next() {
+ GenericRowData row = new GenericRowData(fieldNames.length);
+ for (int i = 0; i < fieldGenerators.length; i++) {
+ row.setField(i, fieldGenerators[i].next());
+ }
+ return row;
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
new file mode 100644
index 0000000..25be021
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataGeneratorConnectorITCase extends BatchTestBase {
+
+ private static final String TABLE = "CREATE TABLE datagen_t (\n" +
+ " f0 CHAR(1),\n" +
+ " f1 VARCHAR(10),\n" +
+ " f2 STRING,\n" +
+ " f3 BOOLEAN,\n" +
+ " f4 DECIMAL(32,2),\n" +
+ " f5 TINYINT,\n" +
+ " f6 SMALLINT,\n" +
+ " f7 INT,\n" +
+ " f8 BIGINT,\n" +
+ " f9 FLOAT,\n" +
+ " f10 DOUBLE,\n" +
+ " f11 DATE,\n" +
+ " f12 TIME,\n" +
+ " f13 TIMESTAMP(3),\n" +
+ " f14 TIMESTAMP WITH LOCAL TIME ZONE,\n" +
+ " f15 INT ARRAY,\n" +
+ " f16 MAP<STRING, DATE>,\n" +
+ " f17 DECIMAL(32,2) MULTISET,\n" +
+ " f18 ROW<a BIGINT, b TIME, c ROW<d TIMESTAMP>>\n" +
+ ") WITH (" +
+ " 'connector' = 'datagen',\n" +
+ " 'number-of-rows' = '10'\n" +
+ ")";
+
+ @Test
+ public void testTypes() throws Exception {
+ tEnv().executeSql(TABLE);
+
+ List<Row> results = new ArrayList<>();
+
+ try (CloseableIterator<Row> iter = tEnv().executeSql("select * from datagen_t").collect()) {
+ while (iter.hasNext()) {
+ results.add(iter.next());
+ }
+ }
+
+ Assert.assertEquals("Unexpected number of results",10, results.size());
+ }
+}