You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 06:23:44 UTC
[flink] branch release-1.11 updated: [FLINK-17894][table]
RowGenerator in datagen connector should be serializable
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 3f73694 [FLINK-17894][table] RowGenerator in datagen connector should be serializable
3f73694 is described below
commit 3f73694e9c005e62d661ed785c01bdd7060c1485
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon May 25 14:21:39 2020 +0800
[FLINK-17894][table] RowGenerator in datagen connector should be serializable
This closes #12310
---
.../table/factories/DataGenTableSourceFactory.java | 94 +++++++++++++---------
.../factories/DataGenTableSourceFactoryTest.java | 4 +
2 files changed, 61 insertions(+), 37 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index d874894..9a07f8f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -132,12 +132,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
case CHAR:
case VARCHAR:
int length = options.get(lenKey);
- return new RandomGenerator<StringData>() {
- @Override
- public StringData next() {
- return StringData.fromString(random.nextHexString(length));
- }
- };
+ return getRandomStringGenerator(length);
case TINYINT:
return RandomGenerator.byteGenerator(
options.get(minKey.intType().defaultValue((int) Byte.MIN_VALUE)).byteValue(),
@@ -167,6 +162,15 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
}
}
+ private static RandomGenerator<StringData> getRandomStringGenerator(int length) {
+ return new RandomGenerator<StringData>() {
+ @Override
+ public StringData next() {
+ return StringData.fromString(random.nextHexString(length));
+ }
+ };
+ }
+
private DataGenerator createSequenceGenerator(String name, DataType type, ReadableConfig options) {
OptionBuilder startKey = key(FIELDS + "." + name + "." + START);
OptionBuilder endKey = key(FIELDS + "." + name + "." + END);
@@ -179,14 +183,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
switch (type.getLogicalType().getTypeRoot()) {
case CHAR:
case VARCHAR:
- return new SequenceGenerator<StringData>(
+ return getSequenceStringGenerator(
options.get(startKey.longType().noDefaultValue()),
- options.get(endKey.longType().noDefaultValue())) {
- @Override
- public StringData next() {
- return StringData.fromString(valuesToEmit.poll().toString());
- }
- };
+ options.get(endKey.longType().noDefaultValue()));
case TINYINT:
return SequenceGenerator.byteGenerator(
options.get(startKey.intType().noDefaultValue()).byteValue(),
@@ -216,6 +215,15 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
}
}
+ private static SequenceGenerator<StringData> getSequenceStringGenerator(long start, long end) {
+ return new SequenceGenerator<StringData>(start, end) {
+ @Override
+ public StringData next() {
+ return StringData.fromString(valuesToEmit.poll().toString());
+ }
+ };
+ }
+
/**
* A {@link StreamTableSource} that emits each number from a given interval exactly once,
* possibly in parallel. See {@link StatefulSequenceSource}.
@@ -239,7 +247,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
@VisibleForTesting
DataGeneratorSource<RowData> createSource() {
- return new DataGeneratorSource<>(new DataGenTableSource.RowGenerator(), rowsPerSecond);
+ return new DataGeneratorSource<>(
+ new RowGenerator(fieldGenerators, schema.getFieldNames()),
+ rowsPerSecond);
}
@Override
@@ -256,37 +266,47 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
+ }
- private class RowGenerator implements DataGenerator<RowData> {
+ private static class RowGenerator implements DataGenerator<RowData> {
- @Override
- public void open(
- String name,
- FunctionInitializationContext context,
- RuntimeContext runtimeContext) throws Exception {
- for (int i = 0; i < fieldGenerators.length; i++) {
- fieldGenerators[i].open(schema.getFieldName(i).get(), context, runtimeContext);
- }
+ private static final long serialVersionUID = 1L;
+
+ private final DataGenerator[] fieldGenerators;
+ private final String[] fieldNames;
+
+ private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
+ this.fieldGenerators = fieldGenerators;
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public void open(
+ String name,
+ FunctionInitializationContext context,
+ RuntimeContext runtimeContext) throws Exception {
+ for (int i = 0; i < fieldGenerators.length; i++) {
+ fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
}
+ }
- @Override
- public boolean hasNext() {
- for (DataGenerator generator : fieldGenerators) {
- if (!generator.hasNext()) {
- return false;
- }
+ @Override
+ public boolean hasNext() {
+ for (DataGenerator generator : fieldGenerators) {
+ if (!generator.hasNext()) {
+ return false;
}
- return true;
}
+ return true;
+ }
- @Override
- public RowData next() {
- GenericRowData row = new GenericRowData(schema.getFieldCount());
- for (int i = 0; i < fieldGenerators.length; i++) {
- row.setField(i, fieldGenerators[i].next());
- }
- return row;
+ @Override
+ public RowData next() {
+ GenericRowData row = new GenericRowData(fieldNames.length);
+ for (int i = 0; i < fieldGenerators.length; i++) {
+ row.setField(i, fieldGenerators[i].next());
}
+ return row;
}
}
}
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 60b1bc1..827e698 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.DataGenTableSourceFactory.DataGenTableSource;
+import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -91,6 +92,9 @@ public class DataGenTableSourceFactoryTest {
DataGenTableSource dataGenTableSource = (DataGenTableSource) source;
DataGeneratorSource<RowData> gen = dataGenTableSource.createSource();
+ // test java serialization.
+ gen = InstantiationUtil.clone(gen);
+
StreamSource<RowData, DataGeneratorSource<RowData>> src = new StreamSource<>(gen);
AbstractStreamOperatorTestHarness<RowData> testHarness =
new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);