You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/29 07:48:01 UTC

[GitHub] [flink] sjwiesman opened a new pull request #13010: Datagen

sjwiesman opened a new pull request #13010:
URL: https://github.com/apache/flink/pull/13010


   ## What is the purpose of the change
   
   Updates the DataGen source to support most types. This enables using DataGen in conjunction with Flink's LIKE clause as it currently does not support overwriting physical columns with computed columns. 
   
   ## Brief change log
   
   I refactored the table factory and added support for all types except RAW and STRUCTURED. See commit messages for a full list of changes. I also added support for creating bounded tables which is useful when prototyping a query. 
   
   
   ## Verifying this change
   
   New unit tests
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r485782974



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A {@link StreamTableSource} that emits each number from a given interval exactly once,
+ * possibly in parallel. See {@link StatefulSequenceSource}.
+ */
+@Internal
+public class DataGenTableSource implements ScanTableSource {
+
+	private final DataGenerator[] fieldGenerators;
+	private final String tableName;
+	private final TableSchema schema;
+	private final long rowsPerSecond;
+	private final Long numberOfRows;
+
+	public DataGenTableSource(
+		DataGenerator[] fieldGenerators,
+		String tableName,
+		TableSchema schema,
+		long rowsPerSecond,
+		Long numberOfRows) {
+		this.fieldGenerators = fieldGenerators;
+		this.tableName = tableName;
+		this.schema = schema;
+		this.rowsPerSecond = rowsPerSecond;
+		this.numberOfRows = numberOfRows;
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+		boolean isBounded = numberOfRows == null;

Review comment:
       nice catch




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-690711964


   Sorry for the long back and forth, just pushed a fix addressing your comments. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-672286041


   @JingsongLi thanks for taking a look. I've updated my PR accordingly. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13010: Datagen

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665164803






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman closed pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman closed pull request #13010:
URL: https://github.com/apache/flink/pull/13010


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r486047882



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -34,9 +39,19 @@
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
 	private final DataGenerator<T> generator;
+
 	private final long rowsPerSecond;
 
+	@Nullable
+	private Long numberOfRows;
+
+	private int outputSoFar;

Review comment:
       transient

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -34,9 +39,19 @@
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
 	private final DataGenerator<T> generator;
+
 	private final long rowsPerSecond;
 
+	@Nullable
+	private Long numberOfRows;
+
+	private int outputSoFar;
+
+	private int toOutput;

Review comment:
       transient

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
+
+
+/**
+ * Creates a random {@link DataGeneratorContainer} for a particular logical type.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public class RandomGeneratorVisitor extends DataGenVisitorBase {
+
+	public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
+
+	private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
+
+	private final ConfigOptions.OptionBuilder minKey;
+
+	private final ConfigOptions.OptionBuilder maxKey;
+
+	public RandomGeneratorVisitor(String name, ReadableConfig config) {
+		super(name, config);
+
+		this.minKey = key(FIELDS + "." + name + "." + MIN);
+		this.maxKey = key(FIELDS + "." + name + "." + MAX);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BooleanType booleanType) {
+		return DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+	}
+
+	@Override
+	public DataGeneratorContainer visit(CharType booleanType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+		return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(VarCharType booleanType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+		return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TinyIntType booleanType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Byte.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.byteGenerator(
+				config.get(min).byteValue(), config.get(max).byteValue()),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(SmallIntType booleanType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue((int) Short.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Short.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.shortGenerator(
+				config.get(min).shortValue(),
+				config.get(max).shortValue()),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(IntType integerType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue(Integer.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue(Integer.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.intGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BigIntType bigIntType) {
+		ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
+		ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.longGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(FloatType floatType) {
+		ConfigOption<Float> min = minKey.floatType().defaultValue(Float.MIN_VALUE);
+		ConfigOption<Float> max = maxKey.floatType().defaultValue(Float.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.floatGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DoubleType doubleType) {
+		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
+		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.doubleGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DecimalType decimalType) {
+		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
+		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			new BigDecimalRandomGenerator(
+				RandomGenerator.doubleGenerator(config.get(min), config.get(max)),
+				decimalType.getPrecision(), decimalType.getScale()));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(YearMonthIntervalType yearMonthIntervalType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue(0);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue(120000); // Period max
+		return DataGeneratorContainer.of(
+			RandomGenerator.intGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
+		ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
+		ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.longGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ArrayType arrayType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String fieldName = name + "." + "element";
+		DataGeneratorContainer container = arrayType
+			.getElementType()
+			.accept(new RandomGeneratorVisitor(fieldName, config));
+
+		DataGenerator<Object[]> generator = RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption));
+		return DataGeneratorContainer.of(
+			new DataGeneratorMapper<>(generator, (GenericArrayData::new)),
+			container.getOptions().toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(MultisetType multisetType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String fieldName = name + "." + "element";
+		DataGeneratorContainer container = multisetType
+			.getElementType()
+			.accept(new RandomGeneratorVisitor(fieldName, config));
+
+		DataGenerator<Map<Object, Integer>> mapGenerator = RandomGenerator.mapGenerator(
+			container.getGenerator(),
+			RandomGenerator.intGenerator(0, 10),
+			config.get(lenOption));
+
+		return DataGeneratorContainer.of(
+			new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
+			container.getOptions().toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(MapType mapType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String keyName = name + "." + "key";
+		String valName = name + "." + "value";
+
+		DataGeneratorContainer keyContainer = mapType
+			.getKeyType()
+			.accept(new RandomGeneratorVisitor(keyName, config));
+
+		DataGeneratorContainer valContainer = mapType
+			.getValueType()
+			.accept(new RandomGeneratorVisitor(valName, config));
+
+		Set<ConfigOption<?>> options = keyContainer.getOptions();
+		options.addAll(valContainer.getOptions());
+
+		DataGenerator<Map<Object, Object>> mapGenerator = RandomGenerator.mapGenerator(
+			keyContainer.getGenerator(),
+			valContainer.getGenerator(),
+			config.get(lenOption));
+
+		return DataGeneratorContainer.of(
+			new DataGeneratorMapper<>(mapGenerator, GenericMapData::new),
+			options.toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(RowType rowType) {
+		List<DataGeneratorContainer> fieldContainers = rowType.getFields()
+			.stream()
+			.map(field -> {
+				String fieldName = name + "." + field.getName();
+				return field.getType().accept(new RandomGeneratorVisitor(fieldName, config));
+			}).collect(Collectors.toList());
+
+		ConfigOption<?>[] options = fieldContainers
+			.stream()
+			.flatMap(container -> container.getOptions().stream())
+			.toArray(ConfigOption[]::new);
+
+		DataGenerator[] generators = fieldContainers
+			.stream()
+			.map(DataGeneratorContainer::getGenerator)
+			.toArray(DataGenerator[]::new);
+
+		String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+		return DataGeneratorContainer.of(new RowDataGenerator(generators, fieldNames), options);
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + logicalType);
+	}
+
+	private static RandomGenerator<StringData> getRandomStringGenerator(int length) {
+		return new RandomGenerator<StringData>() {
+			@Override
+			public StringData next() {
+				return StringData.fromString(random.nextHexString(length));
+			}
+		};
+	}
+
+	private static class BigDecimalRandomGenerator implements DataGenerator<DecimalData> {

Review comment:
       I think it can be named `DecimalDataRandomGenerator`
   BigDecimal makes confused

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.function.Supplier;
+
+import static java.time.temporal.ChronoField.MILLI_OF_SECOND;
+
+/**
+ * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
+ */
+public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	protected final String name;
+
+	protected final ReadableConfig config;
+
+	protected DataGenVisitorBase(String name, ReadableConfig config) {
+		this.name = name;
+		this.config = config;
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DateType dateType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> (int) LocalDate.now().toEpochDay()));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimeType timeType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(() -> LocalTime.now().get(MILLI_OF_SECOND)));

Review comment:
       Shouldn't this be `MILLI_OF_DAY`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -34,9 +39,19 @@
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorSource.class);
+
 	private final DataGenerator<T> generator;
+
 	private final long rowsPerSecond;
 
+	@Nullable
+	private Long numberOfRows;

Review comment:
       final




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407",
       "triggerID" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cbe73382b1eb400653d068a8726bc673fb7e6394",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6453",
       "triggerID" : "cbe73382b1eb400653d068a8726bc673fb7e6394",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a96948229811ec6f8b72a2fd2ad019a9fa6b2d67 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407) 
   * cbe73382b1eb400653d068a8726bc673fb7e6394 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6453) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r486047447



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Supplier;
+
+/**
+ * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
+ */
+public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	protected final String name;
+
+	protected final ReadableConfig config;
+
+	protected DataGenVisitorBase(String name, ReadableConfig config) {
+		this.name = name;
+		this.config = config;
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DateType dateType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimeType timeType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimestampType timestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + logicalType);
+	}
+
+	private interface SerializableSupplier<T> extends Supplier<T>, Serializable { }

Review comment:
       I meant we could do something like `return (Supplier<T> & Serializable) () -> {...}`, but it is OK for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-686512196


   @JingsongLi what do you think of the status of this PR? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r484163628



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Supplier;
+
+/**
+ * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
+ */
+public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	protected final String name;
+
+	protected final ReadableConfig config;
+
+	protected DataGenVisitorBase(String name, ReadableConfig config) {
+		this.name = name;
+		this.config = config;
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DateType dateType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimeType timeType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimestampType timestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + logicalType);
+	}
+
+	private interface SerializableSupplier<T> extends Supplier<T>, Serializable { }

Review comment:
       I think lambda is already `Serializable`.

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
+
+/**
+ * Creates a sequential {@link DataGeneratorContainer} for a particular logical type.
+ */
+@Internal
+public class SequenceGeneratorVisitor extends DataGenVisitorBase {
+
+	private final ReadableConfig config;
+
+	private final String startKeyStr;
+
+	private final String endKeyStr;
+
+	private final ConfigOption<Integer> intStart;
+
+	private final ConfigOption<Integer> intEnd;
+
+	private final ConfigOption<Long> longStart;
+
+	private final ConfigOption<Long> longEnd;
+
+	public SequenceGeneratorVisitor(String name, ReadableConfig config) {
+		super(name, config);
+
+		this.config = config;
+
+		this.startKeyStr = FIELDS + "." + name + "." + START;
+		this.endKeyStr = FIELDS + "." + name + "." + END;
+
+		ConfigOptions.OptionBuilder startKey = key(startKeyStr);
+		ConfigOptions.OptionBuilder endKey = key(endKeyStr);
+
+		config.getOptional(startKey.stringType().noDefaultValue()).orElseThrow(
+			() -> new ValidationException(
+				"Could not find required property '" + startKeyStr + "' for sequence generator."));
+		config.getOptional(endKey.stringType().noDefaultValue()).orElseThrow(
+			() -> new ValidationException(
+				"Could not find required property '" + endKeyStr + "' for sequence generator."));
+
+		this.intStart = startKey.intType().noDefaultValue();
+		this.intEnd = endKey.intType().noDefaultValue();
+		this.longStart = startKey.longType().noDefaultValue();
+		this.longEnd = endKey.longType().noDefaultValue();
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BooleanType booleanType) {
+		return DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+	}
+
+	@Override
+	public DataGeneratorContainer visit(CharType booleanType) {
+		return DataGeneratorContainer.of(
+			getSequenceStringGenerator(
+				config.get(longStart), config.get(longEnd)),
+			longStart, longEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(VarCharType booleanType) {
+		return DataGeneratorContainer.of(
+			getSequenceStringGenerator(
+				config.get(longStart), config.get(longEnd)),
+			longStart, longEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TinyIntType booleanType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.byteGenerator(
+				config.get(intStart).byteValue(),
+				config.get(intEnd).byteValue()),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(SmallIntType booleanType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.shortGenerator(
+				config.get(intStart).shortValue(),
+				config.get(intEnd).shortValue()),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(IntType integerType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.intGenerator(
+				config.get(intStart), config.get(intEnd)),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BigIntType bigIntType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.longGenerator(
+				config.get(longStart), config.get(longEnd)),
+			longStart, longEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(FloatType floatType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.floatGenerator(
+				config.get(intStart).shortValue(),
+				config.get(intEnd).shortValue()),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DoubleType doubleType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.doubleGenerator(
+				config.get(intStart), config.get(intEnd)),
+			intStart, intEnd);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DecimalType decimalType) {
+		return DataGeneratorContainer.of(
+			SequenceGenerator.bigDecimalGenerator(

Review comment:
       Looks like the structures of Decimal, Times are wrong, you can take a look to the comments of `RowData`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -45,18 +62,34 @@
 	 * @param generator data generator.
 	 */
 	public DataGeneratorSource(DataGenerator<T> generator) {
-		this(generator, Long.MAX_VALUE);
+		this(generator, "generator", Long.MAX_VALUE, null);
 	}
 
 	/**
 	 * Creates a source that emits records by {@link DataGenerator}.
 	 *
 	 * @param generator data generator.
 	 * @param rowsPerSecond Control the emit rate.
+	 * @param numberOfRows Total number of rows to output.
 	 */
-	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond) {
+	public DataGeneratorSource(DataGenerator<T> generator, String name, long rowsPerSecond, Long numberOfRows) {
 		this.generator = generator;
+		this.name = name;

Review comment:
       Do we need to add name?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+
+import org.junit.Test;
+
+public class DataGeneratorConnectorITCase extends StreamingTestBase {

Review comment:
       You can use `BatchTestBase`.

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Utility for mapping the output of a {@link DataGenerator}.
+ */
+@Internal
+public class DataGeneratorMapper<A, B> implements DataGenerator<B> {
+
+	private final DataGenerator<A> generator;
+
+	private final SerializableFunction<A, B> mapper;
+
+	public DataGeneratorMapper(DataGenerator<A> generator, SerializableFunction<A, B> mapper) {
+		this.generator = generator;
+		this.mapper = mapper;
+	}
+
+	@Override
+	public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+		generator.open(name, context, runtimeContext);
+	}
+
+	@Override
+	public boolean hasNext() {
+		return generator.hasNext();
+	}
+
+	@Override
+	public B next() {
+		return mapper.apply(generator.next());
+	}
+
+	public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}

Review comment:
       Ditto.

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenTableSource.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
+import org.apache.flink.table.sources.StreamTableSource;
+
+/**
+ * A {@link StreamTableSource} that emits each number from a given interval exactly once,
+ * possibly in parallel. See {@link StatefulSequenceSource}.
+ */
+@Internal
+public class DataGenTableSource implements ScanTableSource {
+
+	private final DataGenerator[] fieldGenerators;
+	private final String tableName;
+	private final TableSchema schema;
+	private final long rowsPerSecond;
+	private final Long numberOfRows;
+
+	public DataGenTableSource(
+		DataGenerator[] fieldGenerators,
+		String tableName,
+		TableSchema schema,
+		long rowsPerSecond,
+		Long numberOfRows) {
+		this.fieldGenerators = fieldGenerators;
+		this.tableName = tableName;
+		this.schema = schema;
+		this.rowsPerSecond = rowsPerSecond;
+		this.numberOfRows = numberOfRows;
+	}
+
+	@Override
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+		boolean isBounded = numberOfRows == null;

Review comment:
       Looks like the opposite?

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/RowDataGenerator.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Data generator for Flink's internal {@link RowData} type.
+ */
+@Internal
+public class RowDataGenerator implements DataGenerator<RowData> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final DataGenerator[] fieldGenerators;

Review comment:
       Can we add `?` to `DataGenerator` like `DataGenerator<?>[]`? At least, The compiler will have no warning.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table;
+
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+
+import org.junit.Test;
+
+public class DataGeneratorConnectorITCase extends StreamingTestBase {
+
+	private static final String TABLE = "CREATE TABLE datagen_t (\n" +
+		"	f0 CHAR(1),\n" +
+		"	f1 VARCHAR(10),\n" +
+		"	f2 STRING,\n" +
+		"	f3 BOOLEAN,\n" +
+		"	f4 DECIMAL(32,2),\n" +
+		"	f5 TINYINT,\n" +
+		"	f6 SMALLINT,\n" +
+		"	f7 INT,\n" +
+		"	f8 BIGINT,\n" +
+		"	f9 FLOAT,\n" +
+		"	f10 DOUBLE,\n" +
+		"	f11 DATE,\n" +
+		"	f12 TIME,\n" +
+		"	f13 TIMESTAMP(3),\n" +
+		"	f14 TIMESTAMP WITH LOCAL TIME ZONE,\n" +
+		"	f15 INT ARRAY,\n" +
+		"	f16 MAP<STRING, DATE>,\n" +
+		"	f17 DECIMAL(32,2) MULTISET,\n" +
+		"	f18 ROW<a BIGINT, b TIME, c ROW<d TIMESTAMP>>\n" +
+		") WITH (" +
+		"	'connector' = 'datagen',\n" +
+		"	'number-of-rows' = '10'\n" +
+		")";
+
+	private static final String SINK = "CREATE TABLE sink WITH ('connector' = 'blackhole') LIKE datagen_t (EXCLUDING ALL)";
+
+	@Test
+	public void testTypes() {
+		tEnv().executeSql(TABLE);
+		tEnv().executeSql(SINK);
+		tEnv().from("datagen_t").executeInsert("sink");

Review comment:
       You should use: `Lists.newArrayList(tEnv().executeSql("select * from datagen_t").collect())`.
   In this way, the types can be verified.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-689714414


   @JingsongLi thank you for the thorough. Review. I've corrected the types and updated the test to validate them. If you don't have any other comments I'll merge on green. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407",
       "triggerID" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a96948229811ec6f8b72a2fd2ad019a9fa6b2d67 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r485779807



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -45,18 +62,34 @@
 	 * @param generator data generator.
 	 */
 	public DataGeneratorSource(DataGenerator<T> generator) {
-		this(generator, Long.MAX_VALUE);
+		this(generator, "generator", Long.MAX_VALUE, null);
 	}
 
 	/**
 	 * Creates a source that emits records by {@link DataGenerator}.
 	 *
 	 * @param generator data generator.
 	 * @param rowsPerSecond Control the emit rate.
+	 * @param numberOfRows Total number of rows to output.
 	 */
-	public DataGeneratorSource(DataGenerator<T> generator, long rowsPerSecond) {
+	public DataGeneratorSource(DataGenerator<T> generator, String name, long rowsPerSecond, Long numberOfRows) {
 		this.generator = generator;
+		this.name = name;

Review comment:
       that's a remnant from an old commit, will remove. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416) 
   * a96948229811ec6f8b72a2fd2ad019a9fa6b2d67 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r468309695



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
##########
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.types.Row;
+
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
+import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
+
+
+/**
+ * Creates a random {@link DataGeneratorContainer} for a particular logical type.
+ */
+@Internal
+@SuppressWarnings("unchecked")
+public class RandomGeneratorVisitor extends DataGenVisitorBase {
+
+	public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
+
+	private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
+
+	private final ConfigOptions.OptionBuilder minKey;
+
+	private final ConfigOptions.OptionBuilder maxKey;
+
+	public RandomGeneratorVisitor(String name, ReadableConfig config) {
+		super(name, config);
+
+		this.minKey = key(FIELDS + "." + name + "." + MIN);
+		this.maxKey = key(FIELDS + "." + name + "." + MAX);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BooleanType booleanType) {
+		return DataGeneratorContainer.of(RandomGenerator.booleanGenerator());
+	}
+
+	@Override
+	public DataGeneratorContainer visit(CharType booleanType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+		return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(VarCharType booleanType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+		return DataGeneratorContainer.of(getRandomStringGenerator(config.get(lenOption)), lenOption);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TinyIntType booleanType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Byte.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.byteGenerator(
+				config.get(min).byteValue(), config.get(max).byteValue()),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(SmallIntType booleanType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue((int) Short.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue((int) Short.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.shortGenerator(
+				config.get(min).shortValue(),
+				config.get(max).shortValue()),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(IntType integerType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue(Integer.MIN_VALUE);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue(Integer.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.intGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(BigIntType bigIntType) {
+		ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
+		ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.longGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(FloatType floatType) {
+		ConfigOption<Float> min = minKey.floatType().defaultValue(Float.MIN_VALUE);
+		ConfigOption<Float> max = maxKey.floatType().defaultValue(Float.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.floatGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DoubleType doubleType) {
+		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
+		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.doubleGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DecimalType decimalType) {
+		ConfigOption<Double> min = minKey.doubleType().defaultValue(Double.MIN_VALUE);
+		ConfigOption<Double> max = maxKey.doubleType().defaultValue(Double.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			new BigDecimalRandomGenerator(
+				RandomGenerator.doubleGenerator(config.get(min), config.get(max)),
+				decimalType.getPrecision(), decimalType.getScale()));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(YearMonthIntervalType yearMonthIntervalType) {
+		ConfigOption<Integer> min = minKey.intType().defaultValue(0);
+		ConfigOption<Integer> max = maxKey.intType().defaultValue(120000); // Period max
+		return DataGeneratorContainer.of(
+			RandomGenerator.intGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DayTimeIntervalType dayTimeIntervalType) {
+		ConfigOption<Long> min = minKey.longType().defaultValue(Long.MIN_VALUE);
+		ConfigOption<Long> max = maxKey.longType().defaultValue(Long.MAX_VALUE);
+		return DataGeneratorContainer.of(
+			RandomGenerator.longGenerator(
+				config.get(min), config.get(max)),
+			min, max);
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ArrayType arrayType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String fieldName = name + "." + "element";
+		DataGeneratorContainer container = arrayType
+			.getElementType()
+			.accept(new RandomGeneratorVisitor(fieldName, config));
+
+		return DataGeneratorContainer.of(
+			RandomGenerator.arrayGenerator(container.getGenerator(), config.get(lenOption)),
+			container.getOptions().toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(MultisetType multisetType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String fieldName = name + "." + "element";
+		DataGeneratorContainer container = multisetType
+			.getElementType()
+			.accept(new RandomGeneratorVisitor(fieldName, config));
+
+		return DataGeneratorContainer.of(
+			RandomGenerator.mapGenerator(container.getGenerator(),
+				RandomGenerator.intGenerator(0, 10),
+				config.get(lenOption)),
+			container.getOptions().toArray(new ConfigOption<?>[0]));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(MapType mapType) {
+		ConfigOption<Integer> lenOption = key(FIELDS + "." + name + "." + LENGTH)
+			.intType()
+			.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+
+		String keyName = name + "." + "key";
+		String valName = name + "." + "value";
+
+		DataGeneratorContainer keyContainer = mapType
+			.getKeyType()
+			.accept(new RandomGeneratorVisitor(keyName, config));
+
+		DataGeneratorContainer valContainer = mapType
+			.getValueType()
+			.accept(new RandomGeneratorVisitor(valName, config));
+
+		Set<ConfigOption<?>> options = keyContainer.getOptions();
+		options.addAll(valContainer.getOptions());
+
+		return DataGeneratorContainer.of(

Review comment:
       Take a look to the comments of `RowData`. The data strutures should be:
   ```
    * +--------------------------------+-----------------------------------------+
    * | Row                            | {@link RowData}                         |
    * +--------------------------------+-----------------------------------------+
    * | ARRAY                          | {@link ArrayData}                       |
    * +--------------------------------+-----------------------------------------+
    * | MAP / MULTISET                 | {@link MapData}                         |
    * +--------------------------------+-----------------------------------------+
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407",
       "triggerID" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cbe73382b1eb400653d068a8726bc673fb7e6394",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6453",
       "triggerID" : "cbe73382b1eb400653d068a8726bc673fb7e6394",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cbe73382b1eb400653d068a8726bc673fb7e6394 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6453) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r468305915



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -77,8 +102,11 @@ public void run(SourceContext<T> ctx) throws Exception {
 
 		while (isRunning) {
 			for (int i = 0; i < taskRowsPerSecond; i++) {
-				if (isRunning && generator.hasNext()) {
+				if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
 					synchronized (ctx.getCheckpointLock()) {
+						if (numberOfRows != null) {
+							outputSoFar++;

Review comment:
       I have an idea: print `outputSoFar` in `close`. It is good for debugging.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -77,8 +102,11 @@ public void run(SourceContext<T> ctx) throws Exception {
 
 		while (isRunning) {
 			for (int i = 0; i < taskRowsPerSecond; i++) {
-				if (isRunning && generator.hasNext()) {
+				if (isRunning && generator.hasNext() && (numberOfRows == null || outputSoFar < toOutput)) {
 					synchronized (ctx.getCheckpointLock()) {
+						if (numberOfRows != null) {

Review comment:
       Remove this if?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] sjwiesman commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
sjwiesman commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r485789362



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Supplier;
+
+/**
+ * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
+ */
+public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	protected final String name;
+
+	protected final ReadableConfig config;
+
+	protected DataGenVisitorBase(String name, ReadableConfig config) {
+		this.name = name;
+		this.config = config;
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DateType dateType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimeType timeType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimestampType timestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + logicalType);
+	}
+
+	private interface SerializableSupplier<T> extends Supplier<T>, Serializable { }

Review comment:
       Its not, the source gets caught in the closure cleaner without this. 

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/types/DataGeneratorMapper.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen.types;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+/**
+ * Utility for mapping the output of a {@link DataGenerator}.
+ */
+@Internal
+public class DataGeneratorMapper<A, B> implements DataGenerator<B> {
+
+	private final DataGenerator<A> generator;
+
+	private final SerializableFunction<A, B> mapper;
+
+	public DataGeneratorMapper(DataGenerator<A> generator, SerializableFunction<A, B> mapper) {
+		this.generator = generator;
+		this.mapper = mapper;
+	}
+
+	@Override
+	public void open(String name, FunctionInitializationContext context, RuntimeContext runtimeContext) throws Exception {
+		generator.open(name, context, runtimeContext);
+	}
+
+	@Override
+	public boolean hasNext() {
+		return generator.hasNext();
+	}
+
+	@Override
+	public B next() {
+		return mapper.apply(generator.next());
+	}
+
+	public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}

Review comment:
       same, we need this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r486047447



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/DataGenVisitorBase.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.datagen;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.function.Supplier;
+
+/**
+ * Base class for translating {@link LogicalType LogicalTypes} to {@link DataGeneratorContainer}'s.
+ */
+public abstract class DataGenVisitorBase extends LogicalTypeDefaultVisitor<DataGeneratorContainer> {
+
+	protected final String name;
+
+	protected final ReadableConfig config;
+
+	protected DataGenVisitorBase(String name, ReadableConfig config) {
+		this.name = name;
+		this.config = config;
+	}
+
+	@Override
+	public DataGeneratorContainer visit(DateType dateType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDate::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimeType timeType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(TimestampType timestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(LocalDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(ZonedTimestampType zonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(OffsetDateTime::now));
+	}
+
+	@Override
+	public DataGeneratorContainer visit(LocalZonedTimestampType localZonedTimestampType) {
+		return DataGeneratorContainer.of(TimeGenerator.of(Instant::now));
+	}
+
+	@Override
+	protected DataGeneratorContainer defaultMethod(LogicalType logicalType) {
+		throw new ValidationException("Unsupported type: " + logicalType);
+	}
+
+	private interface SerializableSupplier<T> extends Supplier<T>, Serializable { }

Review comment:
       I meant we could do something like `return (Supplier<FileSystem> & Serializable) () -> {...}`, but it is OK for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13010:
URL: https://github.com/apache/flink/pull/13010#discussion_r468310256



##########
File path: docs/dev/table/connectors/datagen.md
##########
@@ -29,25 +29,24 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-The DataGen connector allows for reading by data generation rules.
+The DataGen connector allows for creating tables based on in-memory data generation.
+This is useful when developing queries locally without access to external systems such as Kafka.
+Tables can include [Computed Column syntax]({% link dev/table/sql/create.md %}#create-table) which allows for flexible record generation.
 
-The DataGen connector can work with [Computed Column syntax]({% link dev/table/sql/create.md %}#create-table).
-This allows you to generate records flexibly.
+The DataGen connector is built-in, no additional dependencies are required.
 
-The DataGen connector is built-in.
+Usage
+-----
 
-<span class="label label-danger">Attention</span> Complex types are not supported: Array, Map, Row. Please construct these types by computed column.
+By default, a DataGen table will create an unbounded number of rows with a random value for each column.
+For variable sized types, char/varchar/string/array/map/multiset, the length can be specified.
+Additionally, a total number of rows can be specified, resulting in a bounded table.
 
-How to create a DataGen table
-----------------
-
-The boundedness of table: when the generation of field data in the table is completed, the reading
-is finished. So the boundedness of the table depends on the boundedness of fields.
-
-For each field, there are two ways to generate data:
+There also exists a sequence generator, where users specify a sequence of start and end values.
+Complex types cannot be generated as a sequence.
+If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes.
 
-- Random generator is the default generator, you can specify random max and min values. For char/varchar/string, the length can be specified. It is a unbounded generator.
-- Sequence generator, you can specify sequence start and end values. It is a bounded generator, when the sequence number reaches the end value, the reading ends.
+Time types are always the local machines current system time.

Review comment:
       Maybe we can have a table to show all types.
   Display the generation strategies they support, and the required parameters?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 920c6c0fc76622f1331cc3b0b2b019a2397d62bf Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974) 
   * 125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13010: [FLINK-18735][table] Add support for more types to DataGen source

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13010:
URL: https://github.com/apache/flink/pull/13010#issuecomment-665174560


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4973",
       "triggerID" : "60b2d39cd6ff94091cde6bc40d0d9b8e65a89531",
       "triggerType" : "PUSH"
     }, {
       "hash" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4974",
       "triggerID" : "920c6c0fc76622f1331cc3b0b2b019a2397d62bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=5416",
       "triggerID" : "125b5ad0567b8ee2c5cfdae1768ae047e9c96cdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407",
       "triggerID" : "a96948229811ec6f8b72a2fd2ad019a9fa6b2d67",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cbe73382b1eb400653d068a8726bc673fb7e6394",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cbe73382b1eb400653d068a8726bc673fb7e6394",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a96948229811ec6f8b72a2fd2ad019a9fa6b2d67 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6407) 
   * cbe73382b1eb400653d068a8726bc673fb7e6394 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org