You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/07/06 01:57:36 UTC
[flink] branch master updated: [FLINK-27806][table] Support binary & varbinary types for datagen connector
This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ef9ce854a31 [FLINK-27806][table] Support binary & varbinary types for datagen connector
ef9ce854a31 is described below
commit ef9ce854a3169014001f39e0d5908c703453f2b8
Author: Ran Tao <ch...@gmail.com>
AuthorDate: Wed Jul 6 09:57:25 2022 +0800
[FLINK-27806][table] Support binary & varbinary types for datagen connector
This closes #19827.
---
docs/content.zh/docs/connectors/table/datagen.md | 4 +--
docs/content/docs/connectors/table/datagen.md | 14 ++++++--
.../datagen/table/RandomGeneratorVisitor.java | 41 ++++++++++++++++++++++
.../datagen/table/SequenceGeneratorVisitor.java | 34 ++++++++++++++++++
.../factories/DataGenTableSourceFactoryTest.java | 27 ++++++++++++--
5 files changed, 114 insertions(+), 6 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md
index 96dff1c11d9..7c07e7d2a5d 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -45,7 +45,7 @@ DataGen 连接器是内置的。
每个列,都有两种生成数据的方法:
-- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。
+- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。
- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。
@@ -136,7 +136,7 @@ CREATE TABLE datagen (
<td>可选</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
- <td>随机生成器生成字符的长度,适用于 char、varchar、string。</td>
+ <td>随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string。</td>
</tr>
<tr>
<td><h5>fields.#.start</h5></td>
diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md
index d4f9432aa9f..d557875b4dc 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -39,7 +39,7 @@ Usage
-----
By default, a DataGen table will create an unbounded number of rows with a random value for each column.
-For variable sized types, char/varchar/string/array/map/multiset, the length can be specified.
+For variable sized types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified.
Additionally, a total number of rows can be specified, resulting in a bounded table.
There also exists a sequence generator, where users specify a sequence of start and end values.
@@ -104,6 +104,16 @@ Types
<td>random / sequence</td>
<td></td>
</tr>
+ <tr>
+ <td>BINARY</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
+ <tr>
+ <td>VARBINARY</td>
+ <td>random / sequence</td>
+ <td></td>
+ </tr>
<tr>
<td>STRING</td>
<td>random / sequence</td>
@@ -271,7 +281,7 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
- <td>Size or length of the collection for generating char/varchar/string/array/map/multiset types.</td>
+ <td>Size or length of the collection for generating char/varchar/binary/varbinary/string/array/map/multiset types.</td>
</tr>
<tr>
<td><h5>fields.#.start</h5></td>
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
index f2c4a311b76..4e1c9726334 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DayTimeIntervalType;
@@ -49,6 +50,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
@@ -68,6 +70,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;
+ public static final int RANDOM_BYTES_LENGTH_DEFAULT = 100;
+
private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;
private final ConfigOptions.OptionBuilder minKey;
@@ -135,6 +139,32 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
getRandomStringGenerator(config.get(lenOption)), lenOption);
}
+ @Override
+ public DataGeneratorContainer visit(BinaryType binaryType) {
+ ConfigOption<Integer> lenOption =
+ key(DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.LENGTH)
+ .intType()
+ .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
+ return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
+ ConfigOption<Integer> lenOption =
+ key(DataGenConnectorOptionsUtil.FIELDS
+ + "."
+ + name
+ + "."
+ + DataGenConnectorOptionsUtil.LENGTH)
+ .intType()
+ .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
+ return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption);
+ }
+
@Override
public DataGeneratorContainer visit(TinyIntType tinyIntType) {
ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
@@ -377,4 +407,15 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {
}
};
}
+
+ private static RandomGenerator<byte[]> getRandomBytesGenerator(int length) {
+ return new RandomGenerator<byte[]>() {
+ @Override
+ public byte[] next() {
+ byte[] arr = new byte[length];
+ random.getRandomGenerator().nextBytes(arr);
+ return arr;
+ }
+ };
+ }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java
index d1a174ca6fd..987912ecc5d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/SequenceGeneratorVisitor.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
@@ -35,8 +36,11 @@ import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.shaded.guava30.com.google.common.primitives.Longs;
+
import static org.apache.flink.configuration.ConfigOptions.key;
/** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
@@ -120,6 +124,22 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase {
longEnd);
}
+ @Override
+ public DataGeneratorContainer visit(BinaryType binaryType) {
+ return DataGeneratorContainer.of(
+ getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)),
+ longStart,
+ longEnd);
+ }
+
+ @Override
+ public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
+ return DataGeneratorContainer.of(
+ getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)),
+ longStart,
+ longEnd);
+ }
+
@Override
public DataGeneratorContainer visit(TinyIntType tinyIntType) {
return DataGeneratorContainer.of(
@@ -191,4 +211,18 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase {
}
};
}
+
+ private static SequenceGenerator<byte[]> getSequenceBytesGenerator(long start, long end) {
+ return new SequenceGenerator<byte[]>(start, end) {
+ @Override
+ public byte[] next() {
+ Long value = valuesToEmit.poll();
+ if (value != null) {
+ return Longs.toByteArray(value);
+ } else {
+ return new byte[0];
+ }
+ }
+ };
+ }
}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index dd85294cec9..285dc338e73 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -59,7 +59,9 @@ class DataGenTableSourceFactoryTest {
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()),
- Column.physical("f3", DataTypes.TIMESTAMP()));
+ Column.physical("f3", DataTypes.TIMESTAMP()),
+ Column.physical("f4", DataTypes.BINARY(2)),
+ Column.physical("f5", DataTypes.VARBINARY(4)));
@Test
void testDataTypeCoverage() throws Exception {
@@ -94,7 +96,10 @@ class DataGenTableSourceFactoryTest {
"c",
DataTypes.ROW(
DataTypes.FIELD(
- "d", DataTypes.TIMESTAMP()))))));
+ "d", DataTypes.TIMESTAMP()))))),
+ Column.physical("f21", DataTypes.BINARY(2)),
+ Column.physical("f22", DataTypes.BYTES()),
+ Column.physical("f23", DataTypes.VARBINARY(4)));
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
@@ -165,6 +170,20 @@ class DataGenTableSourceFactoryTest {
DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.MAX_PAST,
"5s");
+ descriptor.putString(
+ DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND,
+ DataGenConnectorOptionsUtil.RANDOM);
+ descriptor.putLong(
+ DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH,
+ 2);
+ descriptor.putString(
+ DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.KIND,
+ DataGenConnectorOptionsUtil.SEQUENCE);
+ descriptor.putLong(
+ DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.START, 1);
+ descriptor.putLong(
+ DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.END, 11);
+
final long begin = System.currentTimeMillis();
List<RowData> results = runGenerator(SCHEMA, descriptor);
final long end = System.currentTimeMillis();
@@ -176,6 +195,10 @@ class DataGenTableSourceFactoryTest {
assertThat(row.getLong(1)).isBetween(10L, 100L);
assertThat(row.getLong(2)).isEqualTo(i + 50);
assertThat(row.getTimestamp(3, 3).getMillisecond()).isBetween(begin - 5000, end);
+ assertThat(row.getBinary(4).length).isEqualTo(2);
+ // f5 is sequence bytes produced in sequence long [1, 11]
+ assertThat(row.getBinary(5).length).isEqualTo(8);
+ assertThat(row.getBinary(5)[row.getBinary(5).length - 1]).isEqualTo((byte) (i + 1));
}
}