You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/14 13:38:00 UTC

[flink] 03/03: [FLINK-22475][table-api-java-bridge] Add placeholder options for datagen connector

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abd3da9e0a28aaa526a569133feadfe0c9af68cc
Author: Ingo Bürk <in...@tngtech.com>
AuthorDate: Tue May 11 17:20:57 2021 +0200

    [FLINK-22475][table-api-java-bridge] Add placeholder options for datagen connector
    
    This closes #15896.
---
 .../flink/table/factories/DataGenOptions.java      | 103 +++++++++++++++++++++
 .../table/factories/DataGenTableSourceFactory.java |  54 ++++-------
 .../factories/datagen/RandomGeneratorVisitor.java  |  19 ++--
 .../datagen/SequenceGeneratorVisitor.java          |   8 +-
 .../factories/DataGenTableSourceFactoryTest.java   |  68 +++++++-------
 5 files changed, 167 insertions(+), 85 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
new file mode 100644
index 0000000..e445359
--- /dev/null
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */
+@Internal
+public class DataGenOptions {
+
+    public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
+
+    public static final String FIELDS = "fields";
+    public static final String KIND = "kind";
+    public static final String START = "start";
+    public static final String END = "end";
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String LENGTH = "length";
+
+    public static final String SEQUENCE = "sequence";
+    public static final String RANDOM = "random";
+
+    public static final ConfigOption<Long> ROWS_PER_SECOND =
+            key("rows-per-second")
+                    .longType()
+                    .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
+                    .withDescription("Rows per second to control the emit rate.");
+
+    public static final ConfigOption<Long> NUMBER_OF_ROWS =
+            key("number-of-rows")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Total number of rows to emit. By default, the source is unbounded.");
+
+    /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+    public static final ConfigOption<String> FIELD_KIND =
+            ConfigOptions.key(String.format("%s.#.%s", FIELDS, KIND))
+                    .stringType()
+                    .defaultValue("random")
+                    .withDescription("Generator of this '#' field. Can be 'sequence' or 'random'.");
+
+    /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+    public static final ConfigOption<String> FIELD_MIN =
+            ConfigOptions.key(String.format("%s.#.%s", FIELDS, MIN))
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Minimum value to generate for fields of kind 'random'. Minimum value possible for the type of the field.");
+
+    /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+    public static final ConfigOption<String> FIELD_MAX =
+            ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX))
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field.");
+
+    /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+    public static final ConfigOption<Integer> FIELD_LENGTH =
+            ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH))
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "Size or length of the collection for generating char/varchar/string/array/map/multiset types.");
+
+    /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+    public static final ConfigOption<String> FIELD_START =
+            ConfigOptions.key(String.format("%s.#.%s", FIELDS, START))
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Start value of sequence generator.");
+
+    /** Placeholder {@link ConfigOption}. Not used for retrieving values. */
+    public static final ConfigOption<String> FIELD_END =
+            ConfigOptions.key(String.format("%s.#.%s", FIELDS, END))
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("End value of sequence generator.");
+
+    private DataGenOptions() {}
+}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 4cce51d..622f511 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -46,31 +46,6 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 
     public static final String IDENTIFIER = "datagen";
-    public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L;
-
-    public static final ConfigOption<Long> ROWS_PER_SECOND =
-            key("rows-per-second")
-                    .longType()
-                    .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE)
-                    .withDescription("Rows per second to control the emit rate.");
-
-    public static final ConfigOption<Long> NUMBER_OF_ROWS =
-            key("number-of-rows")
-                    .longType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Total number of rows to emit. By default, the source is unbounded.");
-
-    public static final String FIELDS = "fields";
-    public static final String KIND = "kind";
-    public static final String START = "start";
-    public static final String END = "end";
-    public static final String MIN = "min";
-    public static final String MAX = "max";
-    public static final String LENGTH = "length";
-
-    public static final String SEQUENCE = "sequence";
-    public static final String RANDOM = "random";
 
     @Override
     public String factoryIdentifier() {
@@ -85,8 +60,17 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(ROWS_PER_SECOND);
-        options.add(NUMBER_OF_ROWS);
+        options.add(DataGenOptions.ROWS_PER_SECOND);
+        options.add(DataGenOptions.NUMBER_OF_ROWS);
+
+        // Placeholder options
+        options.add(DataGenOptions.FIELD_KIND);
+        options.add(DataGenOptions.FIELD_MIN);
+        options.add(DataGenOptions.FIELD_MAX);
+        options.add(DataGenOptions.FIELD_LENGTH);
+        options.add(DataGenOptions.FIELD_START);
+        options.add(DataGenOptions.FIELD_END);
+
         return options;
     }
 
@@ -105,7 +89,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
             DataType type = schema.getFieldDataTypes()[i];
 
             ConfigOption<String> kind =
-                    key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM);
+                    key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.KIND)
+                            .stringType()
+                            .defaultValue(DataGenOptions.RANDOM);
             DataGeneratorContainer container =
                     createContainer(name, type, options.get(kind), options);
             fieldGenerators[i] = container.getGenerator();
@@ -118,8 +104,8 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 
         Set<String> consumedOptionKeys = new HashSet<>();
         consumedOptionKeys.add(CONNECTOR.key());
-        consumedOptionKeys.add(ROWS_PER_SECOND.key());
-        consumedOptionKeys.add(NUMBER_OF_ROWS.key());
+        consumedOptionKeys.add(DataGenOptions.ROWS_PER_SECOND.key());
+        consumedOptionKeys.add(DataGenOptions.NUMBER_OF_ROWS.key());
         optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
         FactoryUtil.validateUnconsumedKeys(
                 factoryIdentifier(), options.keySet(), consumedOptionKeys);
@@ -129,16 +115,16 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
                 fieldGenerators,
                 name,
                 schema,
-                options.get(ROWS_PER_SECOND),
-                options.get(NUMBER_OF_ROWS));
+                options.get(DataGenOptions.ROWS_PER_SECOND),
+                options.get(DataGenOptions.NUMBER_OF_ROWS));
     }
 
     private DataGeneratorContainer createContainer(
             String name, DataType type, String kind, ReadableConfig options) {
         switch (kind) {
-            case RANDOM:
+            case DataGenOptions.RANDOM:
                 return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options));
-            case SEQUENCE:
+            case DataGenOptions.SEQUENCE:
                 return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options));
             default:
                 throw new ValidationException("Unsupported generator kind: " + kind);
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
index f18a851..13c93af 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.DataGenOptions;
 import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper;
 import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator;
 import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
@@ -55,10 +56,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
 
 /** Creates a random {@link DataGeneratorContainer} for a particular logical type. */
 @Internal
@@ -76,8 +73,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
     public RandomGeneratorVisitor(String name, ReadableConfig config) {
         super(name, config);
 
-        this.minKey = key(FIELDS + "." + name + "." + MIN);
-        this.maxKey = key(FIELDS + "." + name + "." + MAX);
+        this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN);
+        this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX);
     }
 
     @Override
@@ -88,7 +85,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
     @Override
     public DataGeneratorContainer visit(CharType booleanType) {
         ConfigOption<Integer> lenOption =
-                key(FIELDS + "." + name + "." + LENGTH)
+                key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
                         .intType()
                         .defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
         return DataGeneratorContainer.of(
@@ -98,7 +95,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
     @Override
     public DataGeneratorContainer visit(VarCharType booleanType) {
         ConfigOption<Integer> lenOption =
-                key(FIELDS + "." + name + "." + LENGTH)
+                key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
                         .intType()
                         .defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
         return DataGeneratorContainer.of(
@@ -190,7 +187,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
     @Override
     public DataGeneratorContainer visit(ArrayType arrayType) {
         ConfigOption<Integer> lenOption =
-                key(FIELDS + "." + name + "." + LENGTH)
+                key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
                         .intType()
                         .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
 
@@ -208,7 +205,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
     @Override
     public DataGeneratorContainer visit(MultisetType multisetType) {
         ConfigOption<Integer> lenOption =
-                key(FIELDS + "." + name + "." + LENGTH)
+                key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
                         .intType()
                         .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
 
@@ -230,7 +227,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
     @Override
     public DataGeneratorContainer visit(MapType mapType) {
         ConfigOption<Integer> lenOption =
-                key(FIELDS + "." + name + "." + LENGTH)
+                key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH)
                         .intType()
                         .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
index 214697b..8ce3e51 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
 import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.DataGenOptions;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
@@ -38,9 +39,6 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarCharType;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
 
 /** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
 @Internal
@@ -65,8 +63,8 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase {
 
         this.config = config;
 
-        this.startKeyStr = FIELDS + "." + name + "." + START;
-        this.endKeyStr = FIELDS + "." + name + "." + END;
+        this.startKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.START;
+        this.endKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.END;
 
         ConfigOptions.OptionBuilder startKey = key(startKeyStr);
         ConfigOptions.OptionBuilder endKey = key(endKeyStr);
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index fc07257..5943a9e 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -44,17 +44,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.END;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.KIND;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.NUMBER_OF_ROWS;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.RANDOM;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.ROWS_PER_SECOND;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.SEQUENCE;
-import static org.apache.flink.table.factories.DataGenTableSourceFactory.START;
 import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
 import static org.junit.Assert.assertTrue;
 
@@ -104,7 +93,7 @@ public class DataGenTableSourceFactoryTest {
 
         DescriptorProperties descriptor = new DescriptorProperties();
         descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-        descriptor.putString(NUMBER_OF_ROWS.key(), "10");
+        descriptor.putString(DataGenOptions.NUMBER_OF_ROWS.key(), "10");
 
         // add min max option for numeric types
         descriptor.putString("fields.f4.min", "1.0");
@@ -138,18 +127,21 @@ public class DataGenTableSourceFactoryTest {
     public void testSource() throws Exception {
         DescriptorProperties descriptor = new DescriptorProperties();
         descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-        descriptor.putLong(ROWS_PER_SECOND.key(), 100);
+        descriptor.putLong(DataGenOptions.ROWS_PER_SECOND.key(), 100);
 
-        descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
-        descriptor.putLong(FIELDS + ".f0." + LENGTH, 20);
+        descriptor.putString(
+                DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.LENGTH, 20);
 
-        descriptor.putString(FIELDS + ".f1." + KIND, RANDOM);
-        descriptor.putLong(FIELDS + ".f1." + MIN, 10);
-        descriptor.putLong(FIELDS + ".f1." + MAX, 100);
+        descriptor.putString(
+                DataGenOptions.FIELDS + ".f1." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f1." + DataGenOptions.MIN, 10);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f1." + DataGenOptions.MAX, 100);
 
-        descriptor.putString(FIELDS + ".f2." + KIND, SEQUENCE);
-        descriptor.putLong(FIELDS + ".f2." + START, 50);
-        descriptor.putLong(FIELDS + ".f2." + END, 60);
+        descriptor.putString(
+                DataGenOptions.FIELDS + ".f2." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.START, 50);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.END, 60);
 
         List<RowData> results = runGenerator(SCHEMA, descriptor);
 
@@ -191,9 +183,10 @@ public class DataGenTableSourceFactoryTest {
     public void testSequenceCheckpointRestore() throws Exception {
         DescriptorProperties descriptor = new DescriptorProperties();
         descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-        descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
-        descriptor.putLong(FIELDS + ".f0." + START, 0);
-        descriptor.putLong(FIELDS + ".f0." + END, 100);
+        descriptor.putString(
+                DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0);
+        descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, 100);
 
         DynamicTableSource dynamicTableSource =
                 createTableSource(
@@ -225,8 +218,9 @@ public class DataGenTableSourceFactoryTest {
         try {
             DescriptorProperties descriptor = new DescriptorProperties();
             descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-            descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
-            descriptor.putLong(FIELDS + ".f0." + END, 100);
+            descriptor.putString(
+                    DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+            descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, 100);
 
             createTableSource(
                     ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -249,8 +243,9 @@ public class DataGenTableSourceFactoryTest {
         try {
             DescriptorProperties descriptor = new DescriptorProperties();
             descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-            descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
-            descriptor.putLong(FIELDS + ".f0." + START, 0);
+            descriptor.putString(
+                    DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+            descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0);
 
             createTableSource(
                     ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -294,8 +289,9 @@ public class DataGenTableSourceFactoryTest {
         try {
             DescriptorProperties descriptor = new DescriptorProperties();
             descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-            descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
-            descriptor.putLong(FIELDS + ".f0." + START, 0);
+            descriptor.putString(
+                    DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+            descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0);
 
             createTableSource(
                     ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -316,8 +312,9 @@ public class DataGenTableSourceFactoryTest {
         try {
             DescriptorProperties descriptor = new DescriptorProperties();
             descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-            descriptor.putString(FIELDS + ".f0." + KIND, RANDOM);
-            descriptor.putInt(FIELDS + ".f0." + LENGTH, 100);
+            descriptor.putString(
+                    DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM);
+            descriptor.putInt(DataGenOptions.FIELDS + ".f0." + DataGenOptions.LENGTH, 100);
 
             createTableSource(
                     ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),
@@ -338,9 +335,10 @@ public class DataGenTableSourceFactoryTest {
         try {
             DescriptorProperties descriptor = new DescriptorProperties();
             descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
-            descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE);
-            descriptor.putString(FIELDS + ".f0." + START, "Wrong");
-            descriptor.putString(FIELDS + ".f0." + END, "Wrong");
+            descriptor.putString(
+                    DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE);
+            descriptor.putString(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, "Wrong");
+            descriptor.putString(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, "Wrong");
 
             createTableSource(
                     ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),