You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/14 13:38:00 UTC
[flink] 03/03: [FLINK-22475][table-api-java-bridge] Add placeholder
options for datagen connector
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit abd3da9e0a28aaa526a569133feadfe0c9af68cc
Author: Ingo Bürk <in...@tngtech.com>
AuthorDate: Tue May 11 17:20:57 2021 +0200
[FLINK-22475][table-api-java-bridge] Add placeholder options for datagen connector
This closes #15896.
---
.../flink/table/factories/DataGenOptions.java | 103 +++++++++++++++++++++
.../table/factories/DataGenTableSourceFactory.java | 54 ++++-------
.../factories/datagen/RandomGeneratorVisitor.java | 19 ++--
.../datagen/SequenceGeneratorVisitor.java | 8 +-
.../factories/DataGenTableSourceFactoryTest.java | 68 +++++++-------
5 files changed, 167 insertions(+), 85 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
new file mode 100644
index 0000000..e445359
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */
+@Internal
+public class DataGenOptions {
+
+ public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
+
+ public static final String FIELDS = "fields";
+ public static final String KIND = "kind";
+ public static final String START = "start";
+ public static final String END = "end";
+ public static final String MIN = "min";
+ public static final String MAX = "max";
+ public static final String LENGTH = "length";
+
+ public static final String SEQUENCE = "sequence";
+ public static final String RANDOM = "random";
+
+ public static final ConfigOption<Long> ROWS_PER_SECOND =
+ key("rows-per-second")
+ .longType()
+ .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
+ .withDescription("Rows per second to control the emit rate.");
+
+ public static final ConfigOption<Long> NUMBER_OF_ROWS =
+ key("number-of-rows")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Total number of rows to emit. By default, the source is unbounded.");
+
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption<String> FIELD_KIND =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, KIND))
+ .stringType()
+ .defaultValue("random")
+ .withDescription("Generator of this '#' field. Can be 'sequence' or 'random'.");
+
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption<String> FIELD_MIN =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, MIN))
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Minimum value to generate for fields of kind 'random'. Minimum value possible for the type of the field.");
+
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption<String> FIELD_MAX =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX))
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");
+
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption<Integer> FIELD_LENGTH =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "Size or length of the collection for generating char/varchar/string/array/map/multiset types.");
+
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption<String> FIELD_START =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, START))
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Start value of sequence generator.");
+
+ /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+ public static final ConfigOption<String> FIELD_END =
+ ConfigOptions.key(String.format("%s.#.%s", FIELDS, END))
+ .stringType()
+ .noDefaultValue()
+ .withDescription("End value of sequence generator.");
+
+ private DataGenOptions() {}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 4cce51d..622f511 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -46,31 +46,6 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public static final String IDENTIFIER = "datagen";
- public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
-
- public static final ConfigOption<Long> ROWS_PER_SECOND =
- key("rows-per-second")
- .longType()
- .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
- .withDescription("Rows per second to control the emit rate.");
-
- public static final ConfigOption<Long> NUMBER_OF_ROWS =
- key("number-of-rows")
- .longType()
- .noDefaultValue()
- .withDescription(
- "Total number of rows to emit. By default, the source is unbounded.");
-
- public static final String FIELDS = "fields";
- public static final String KIND = "kind";
- public static final String START = "start";
- public static final String END = "end";
- public static final String MIN = "min";
- public static final String MAX = "max";
- public static final String LENGTH = "length";
-
- public static final String SEQUENCE = "sequence";
- public static final String RANDOM = "random";
@Override
public String factoryIdentifier() {
@@ -85,8 +60,17 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
@Override
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
- options.add(ROWS_PER_SECOND);
- options.add(NUMBER_OF_ROWS);
+ options.add(DataGenOptions.ROWS_PER_SECOND);
+ options.add(DataGenOptions.NUMBER_OF_ROWS);
+
+ // Placeholder options
+ options.add(DataGenOptions.FIELD_KIND);
+ options.add(DataGenOptions.FIELD_MIN);
+ options.add(DataGenOptions.FIELD_MAX);
+ options.add(DataGenOptions.FIELD_LENGTH);
+ options.add(DataGenOptions.FIELD_START);
+ options.add(DataGenOptions.FIELD_END);
+
return options;
}
@@ -105,7 +89,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
DataType type = schema.getFieldDataTypes()[i];
ConfigOption<String> kind =
- key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM);
+ key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.KIND)
+ .stringType()
+ .defaultValue(DataGenOptions.RANDOM);
DataGeneratorContainer container =
createContainer(name, type, options.get(kind), options);
fieldGenerators[i] = container.getGenerator();
@@ -118,8 +104,8 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
Set<String> consumedOptionKeys = new HashSet<>();
consumedOptionKeys.add(CONNECTOR.key());
- consumedOptionKeys.add(ROWS_PER_SECOND.key());
- consumedOptionKeys.add(NUMBER_OF_ROWS.key());
+ consumedOptionKeys.add(DataGenOptions.ROWS_PER_SECOND.key());
+ consumedOptionKeys.add(DataGenOptions.NUMBER_OF_ROWS.key());
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
FactoryUtil.validateUnconsumedKeys(
factoryIdentifier(), options.keySet(), consumedOptionKeys);
@@ -129,16 +115,16 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
fieldGenerators,
name,
schema,
- options.get(ROWS_PER_SECOND),
- options.get(NUMBER_OF_ROWS));
+ options.get(DataGenOptions.ROWS_PER_SECOND),
+ options.get(DataGenOptions.NUMBER_OF_ROWS));
}
private DataGeneratorContainer createContainer(
String name, DataType type, String kind, ReadableConfig options) {
switch (kind) {
- case RANDOM:
+ case DataGenOptions.RANDOM:
return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
- case SEQUENCE:
+ case DataGenOptions.SEQUENCE:
return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
default:
throw new ValidationException("Unsupported generator kind: " + kind);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index f18a851..13c93af 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
@@ -55,10 +56,6 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
/** Creates a random {@link DataGeneratorContainer} for a particular logical type. */
@Internal
@@ -76,8 +73,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
public RandomGeneratorVisitor(String name, ReadableConfig config) {
super(name, config);
- this.minKey = key(FIELDS + "." + name + "." + MIN);
- this.maxKey = key(FIELDS + "." + name + "." + MAX);
+ this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
+ this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
}
@Override
@@ -88,7 +85,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
@Override
public DataGeneratorContainer visit(CharType booleanType) {
ConfigOption<Integer> lenOption =
- key(FIELDS + "." + name + "." + LENGTH)
+ key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
return DataGeneratorContainer.of(
@@ -98,7 +95,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
@Override
public DataGeneratorContainer visit(VarCharType booleanType) {
ConfigOption<Integer> lenOption =
- key(FIELDS + "." + name + "." + LENGTH)
+ key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
return DataGeneratorContainer.of(
@@ -190,7 +187,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
@Override
public DataGeneratorContainer visit(ArrayType arrayType) {
ConfigOption<Integer> lenOption =
- key(FIELDS + "." + name + "." + LENGTH)
+ key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
@@ -208,7 +205,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
@Override
public DataGeneratorContainer visit(MultisetType multisetType) {
ConfigOption<Integer> lenOption =
- key(FIELDS + "." + name + "." + LENGTH)
+ key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
@@ -230,7 +227,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
@Override
public DataGeneratorContainer visit(MapType mapType) {
ConfigOption<Integer> lenOption =
- key(FIELDS + "." + name + "." + LENGTH)
+ key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
.intType()
.defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
index 214697b..8ce3e51 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.DataGenOptions;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
@@ -38,9 +39,6 @@ import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
/** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
@Internal
@@ -65,8 +63,8 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase {
this.config = config;
- this.startKeyStr = FIELDS + "." + name + "." + START;
- this.endKeyStr = FIELDS + "." + name + "." + END;
+ this.startKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.START;
+ this.endKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.END;
ConfigOptions.OptionBuilder startKey = key(startKeyStr);
ConfigOptions.OptionBuilder endKey = key(endKeyStr);
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index fc07257..5943a9e 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -44,17 +44,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.KIND;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.NUMBER_OF_ROWS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.RANDOM;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.ROWS_PER_SECOND;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.SEQUENCE;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.junit.Assert.assertTrue;
@@ -104,7 +93,7 @@ public class DataGenTableSourceFactoryTest {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(NUMBER_OF_ROWS.key(), "10");
+ descriptor.putString(DataGenOptions.NUMBER_OF_ROWS.key(), "10");
// add min max option for numeric types
descriptor.putString("fields.f4.min", "1.0");
@@ -138,18 +127,21 @@ public class DataGenTableSourceFactoryTest {
public void testSource() throws Exception {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putLong(ROWS_PER_SECOND.key(), 100);
+ descriptor.putLong(DataGenOptions.ROWS_PER_SECOND.key(), 100);
- descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
- descriptor.putLong(FIELDS + ".f0." + LENGTH, 20);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.LENGTH, 20);
- descriptor.putString(FIELDS + ".f1." + KIND, RANDOM);
- descriptor.putLong(FIELDS + ".f1." + MIN, 10);
- descriptor.putLong(FIELDS + ".f1." + MAX, 100);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f1." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f1." + DataGenOptions.MIN, 10);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f1." + DataGenOptions.MAX, 100);
- descriptor.putString(FIELDS + ".f2." + KIND, SEQUENCE);
- descriptor.putLong(FIELDS + ".f2." + START, 50);
- descriptor.putLong(FIELDS + ".f2." + END, 60);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f2." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.START, 50);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.END, 60);
List<RowData> results = runGenerator(SCHEMA, descriptor);
@@ -191,9 +183,10 @@ public class DataGenTableSourceFactoryTest {
public void testSequenceCheckpointRestore() throws Exception {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
- descriptor.putLong(FIELDS + ".f0." + START, 0);
- descriptor.putLong(FIELDS + ".f0." + END, 100);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, 100);
DynamicTableSource dynamicTableSource =
createTableSource(
@@ -225,8 +218,9 @@ public class DataGenTableSourceFactoryTest {
try {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
- descriptor.putLong(FIELDS + ".f0." + END, 100);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, 100);
createTableSource(
ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -249,8 +243,9 @@ public class DataGenTableSourceFactoryTest {
try {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
- descriptor.putLong(FIELDS + ".f0." + START, 0);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0);
createTableSource(
ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -294,8 +289,9 @@ public class DataGenTableSourceFactoryTest {
try {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
- descriptor.putLong(FIELDS + ".f0." + START, 0);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+ descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0);
createTableSource(
ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -316,8 +312,9 @@ public class DataGenTableSourceFactoryTest {
try {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
- descriptor.putInt(FIELDS + ".f0." + LENGTH, 100);
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+ descriptor.putInt(DataGenOptions.FIELDS + ".f0." + DataGenOptions.LENGTH, 100);
createTableSource(
ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -338,9 +335,10 @@ public class DataGenTableSourceFactoryTest {
try {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
- descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
- descriptor.putString(FIELDS + ".f0." + START, "Wrong");
- descriptor.putString(FIELDS + ".f0." + END, "Wrong");
+ descriptor.putString(
+ DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+ descriptor.putString(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, "Wrong");
+ descriptor.putString(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, "Wrong");
createTableSource(
ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),