You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/07/06 01:57:36 UTC

[flink] branch master updated: [FLINK-27806][table] Support binary & varbinary types for datagen connector

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ef9ce854a31 [FLINK-27806][table] Support binary & varbinary types for datagen connector
ef9ce854a31 is described below

commit ef9ce854a3169014001f39e0d5908c703453f2b8
Author: Ran Tao <ch...@gmail.com>
AuthorDate: Wed Jul 6 09:57:25 2022 +0800

    [FLINK-27806][table] Support binary & varbinary types for datagen connector
    
    This closes #19827.
---
 docs/content.zh/docs/connectors/table/datagen.md   |  4 +--
 docs/content/docs/connectors/table/datagen.md      | 14 ++++++--
 .../datagen/table/RandomGeneratorVisitor.java      | 41 ++++++++++++++++++++++
 .../datagen/table/SequenceGeneratorVisitor.java    | 34 ++++++++++++++++++
 .../factories/DataGenTableSourceFactoryTest.java   | 27 ++++++++++++--
 5 files changed, 114 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md
index 96dff1c11d9..7c07e7d2a5d 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -45,7 +45,7 @@ DataGen 连接器是内置的。
 
 每个列,都有两种生成数据的方法:
 
-- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。
+- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。
 
 - 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。
 
@@ -136,7 +136,7 @@ CREATE TABLE datagen (
       <td>可选</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
-      <td>随机生成器生成字符的长度,适用于 char、varchar、string。</td>
+      <td>随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string。</td>
     </tr>
     <tr>
       <td><h5>fields.#.start</h5></td>
diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md
index d4f9432aa9f..d557875b4dc 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -39,7 +39,7 @@ Usage
 -----
 
 By default, a DataGen table will create an unbounded number of rows with a random value for each column.
-For variable sized types, char/varchar/string/array/map/multiset, the length can be specified.
+For variable sized types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified.
 Additionally, a total number of rows can be specified, resulting in a bounded table.
 
 There also exists a sequence generator, where users specify a sequence of start and end values.
@@ -104,6 +104,16 @@ Types
             <td>random / sequence</td>
             <td></td>
         </tr>
+        <tr>
+            <td>BINARY</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td>VARBINARY</td>
+            <td>random / sequence</td>
+            <td></td>
+        </tr>
         <tr>
             <td>STRING</td>
             <td>random / sequence</td>
@@ -271,7 +281,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
-      <td>Size or length of the collection for generating char/varchar/string/array/map/multiset types.</td>
+      <td>Size or length of the collection for generating char/varchar/binary/varbinary/string/array/map/multiset types.</td>
     </tr>
     <tr>
       <td><h5>fields.#.start</h5></td>
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
index f2c4a311b76..4e1c9726334 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DayTimeIntervalType;
@@ -49,6 +50,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
 import org.apache.flink.table.types.logical.ZonedTimestampType;
@@ -68,6 +70,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
 
     public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
 
+    public static final int RANDOM_BYTES_LENGTH_DEFAULT = 100;
+
     private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
 
     private final ConfigOptions.OptionBuilder minKey;
@@ -135,6 +139,32 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
                 getRandomStringGenerator(config.get(lenOption)), lenOption);
     }
 
+    @Override
+    public DataGeneratorContainer visit(BinaryType binaryType) {
+        ConfigOption<Integer> lenOption =
+                key(DataGenConnectorOptionsUtil.FIELDS
+                                + "."
+                                + name
+                                + "."
+                                + DataGenConnectorOptionsUtil.LENGTH)
+                        .intType()
+                        .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
+        return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption);
+    }
+
+    @Override
+    public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
+        ConfigOption<Integer> lenOption =
+                key(DataGenConnectorOptionsUtil.FIELDS
+                                + "."
+                                + name
+                                + "."
+                                + DataGenConnectorOptionsUtil.LENGTH)
+                        .intType()
+                        .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
+        return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption);
+    }
+
     @Override
     public DataGeneratorContainer visit(TinyIntType tinyIntType) {
         ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
@@ -377,4 +407,15 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
             }
         };
     }
+
+    private static RandomGenerator<byte[]> getRandomBytesGenerator(int length) {
+        return new RandomGenerator<byte[]>() {
+            @Override
+            public byte[] next() {
+                byte[] arr = new byte[length];
+                random.getRandomGenerator().nextBytes(arr);
+                return arr;
+            }
+        };
+    }
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java
index d1a174ca6fd..987912ecc5d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
 import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -35,8 +36,11 @@ import org.apache.flink.table.types.logical.FloatType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.SmallIntType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Longs;
+
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
@@ -120,6 +124,22 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase {
                 longEnd);
     }
 
+    @Override
+    public DataGeneratorContainer visit(BinaryType binaryType) {
+        return DataGeneratorContainer.of(
+                getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)),
+                longStart,
+                longEnd);
+    }
+
+    @Override
+    public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
+        return DataGeneratorContainer.of(
+                getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)),
+                longStart,
+                longEnd);
+    }
+
     @Override
     public DataGeneratorContainer visit(TinyIntType tinyIntType) {
         return DataGeneratorContainer.of(
@@ -191,4 +211,18 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase {
             }
         };
     }
+
+    private static SequenceGenerator<byte[]> getSequenceBytesGenerator(long start, long end) {
+        return new SequenceGenerator<byte[]>(start, end) {
+            @Override
+            public byte[] next() {
+                Long value = valuesToEmit.poll();
+                if (value != null) {
+                    return Longs.toByteArray(value);
+                } else {
+                    return new byte[0];
+                }
+            }
+        };
+    }
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index dd85294cec9..285dc338e73 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -59,7 +59,9 @@ class DataGenTableSourceFactoryTest {
                     Column.physical("f0", DataTypes.STRING()),
                     Column.physical("f1", DataTypes.BIGINT()),
                     Column.physical("f2", DataTypes.BIGINT()),
-                    Column.physical("f3", DataTypes.TIMESTAMP()));
+                    Column.physical("f3", DataTypes.TIMESTAMP()),
+                    Column.physical("f4", DataTypes.BINARY(2)),
+                    Column.physical("f5", DataTypes.VARBINARY(4)));
 
     @Test
     void testDataTypeCoverage() throws Exception {
@@ -94,7 +96,10 @@ class DataGenTableSourceFactoryTest {
                                                 "c",
                                                 DataTypes.ROW(
                                                         DataTypes.FIELD(
-                                                                "d", DataTypes.TIMESTAMP()))))));
+                                                                "d", DataTypes.TIMESTAMP()))))),
+                        Column.physical("f21", DataTypes.BINARY(2)),
+                        Column.physical("f22", DataTypes.BYTES()),
+                        Column.physical("f23", DataTypes.VARBINARY(4)));
 
         DescriptorProperties descriptor = new DescriptorProperties();
         descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
@@ -165,6 +170,20 @@ class DataGenTableSourceFactoryTest {
                 DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.MAX_PAST,
                 "5s");
 
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.RANDOM);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH,
+                2);
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.SEQUENCE);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.START, 1);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.END, 11);
+
         final long begin = System.currentTimeMillis();
         List<RowData> results = runGenerator(SCHEMA, descriptor);
         final long end = System.currentTimeMillis();
@@ -176,6 +195,10 @@ class DataGenTableSourceFactoryTest {
             assertThat(row.getLong(1)).isBetween(10L, 100L);
             assertThat(row.getLong(2)).isEqualTo(i + 50);
             assertThat(row.getTimestamp(3, 3).getMillisecond()).isBetween(begin - 5000, end);
+            assertThat(row.getBinary(4).length).isEqualTo(2);
+            // f5 is sequence bytes produced in sequence long [1, 11]
+            assertThat(row.getBinary(5).length).isEqualTo(8);
+            assertThat(row.getBinary(5)[row.getBinary(5).length - 1]).isEqualTo((byte) (i + 1));
         }
     }