You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 06:23:44 UTC

[flink] branch release-1.11 updated: [FLINK-17894][table] RowGenerator in datagen connector should be serializable

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 3f73694  [FLINK-17894][table] RowGenerator in datagen connector should be serializable
3f73694 is described below

commit 3f73694e9c005e62d661ed785c01bdd7060c1485
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon May 25 14:21:39 2020 +0800

    [FLINK-17894][table] RowGenerator in datagen connector should be serializable
    
    
    This closes #12310
---
 .../table/factories/DataGenTableSourceFactory.java | 94 +++++++++++++---------
 .../factories/DataGenTableSourceFactoryTest.java   |  4 +
 2 files changed, 61 insertions(+), 37 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index d874894..9a07f8f 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -132,12 +132,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 			case CHAR:
 			case VARCHAR:
 				int length = options.get(lenKey);
-				return new RandomGenerator<StringData>() {
-					@Override
-					public StringData next() {
-						return StringData.fromString(random.nextHexString(length));
-					}
-				};
+				return getRandomStringGenerator(length);
 			case TINYINT:
 				return RandomGenerator.byteGenerator(
 						options.get(minKey.intType().defaultValue((int) Byte.MIN_VALUE)).byteValue(),
@@ -167,6 +162,15 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		}
 	}
 
+	private static RandomGenerator<StringData> getRandomStringGenerator(int length) {
+		return new RandomGenerator<StringData>() {
+			@Override
+			public StringData next() {
+				return StringData.fromString(random.nextHexString(length));
+			}
+		};
+	}
+
 	private DataGenerator createSequenceGenerator(String name, DataType type, ReadableConfig options) {
 		OptionBuilder startKey = key(FIELDS + "." + name + "." + START);
 		OptionBuilder endKey = key(FIELDS + "." + name + "." + END);
@@ -179,14 +183,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		switch (type.getLogicalType().getTypeRoot()) {
 			case CHAR:
 			case VARCHAR:
-			return new SequenceGenerator<StringData>(
+			return getSequenceStringGenerator(
 					options.get(startKey.longType().noDefaultValue()),
-					options.get(endKey.longType().noDefaultValue())) {
-				@Override
-				public StringData next() {
-					return StringData.fromString(valuesToEmit.poll().toString());
-				}
-			};
+					options.get(endKey.longType().noDefaultValue()));
 			case TINYINT:
 				return SequenceGenerator.byteGenerator(
 						options.get(startKey.intType().noDefaultValue()).byteValue(),
@@ -216,6 +215,15 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		}
 	}
 
+	private static SequenceGenerator<StringData> getSequenceStringGenerator(long start, long end) {
+		return new SequenceGenerator<StringData>(start, end) {
+			@Override
+			public StringData next() {
+				return StringData.fromString(valuesToEmit.poll().toString());
+			}
+		};
+	}
+
 	/**
 	 * A {@link StreamTableSource} that emits each number from a given interval exactly once,
 	 * possibly in parallel. See {@link StatefulSequenceSource}.
@@ -239,7 +247,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 
 		@VisibleForTesting
 		DataGeneratorSource<RowData> createSource() {
-			return new DataGeneratorSource<>(new DataGenTableSource.RowGenerator(), rowsPerSecond);
+			return new DataGeneratorSource<>(
+					new RowGenerator(fieldGenerators, schema.getFieldNames()),
+					rowsPerSecond);
 		}
 
 		@Override
@@ -256,37 +266,47 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		public ChangelogMode getChangelogMode() {
 			return ChangelogMode.insertOnly();
 		}
+	}
 
-		private class RowGenerator implements DataGenerator<RowData> {
+	private static class RowGenerator implements DataGenerator<RowData> {
 
-			@Override
-			public void open(
-					String name,
-					FunctionInitializationContext context,
-					RuntimeContext runtimeContext) throws Exception {
-				for (int i = 0; i < fieldGenerators.length; i++) {
-					fieldGenerators[i].open(schema.getFieldName(i).get(), context, runtimeContext);
-				}
+		private static final long serialVersionUID = 1L;
+
+		private final DataGenerator[] fieldGenerators;
+		private final String[] fieldNames;
+
+		private RowGenerator(DataGenerator[] fieldGenerators, String[] fieldNames) {
+			this.fieldGenerators = fieldGenerators;
+			this.fieldNames = fieldNames;
+		}
+
+		@Override
+		public void open(
+				String name,
+				FunctionInitializationContext context,
+				RuntimeContext runtimeContext) throws Exception {
+			for (int i = 0; i < fieldGenerators.length; i++) {
+				fieldGenerators[i].open(fieldNames[i], context, runtimeContext);
 			}
+		}
 
-			@Override
-			public boolean hasNext() {
-				for (DataGenerator generator : fieldGenerators) {
-					if (!generator.hasNext()) {
-						return false;
-					}
+		@Override
+		public boolean hasNext() {
+			for (DataGenerator generator : fieldGenerators) {
+				if (!generator.hasNext()) {
+					return false;
 				}
-				return true;
 			}
+			return true;
+		}
 
-			@Override
-			public RowData next() {
-				GenericRowData row = new GenericRowData(schema.getFieldCount());
-				for (int i = 0; i < fieldGenerators.length; i++) {
-					row.setField(i, fieldGenerators[i].next());
-				}
-				return row;
+		@Override
+		public RowData next() {
+			GenericRowData row = new GenericRowData(fieldNames.length);
+			for (int i = 0; i < fieldGenerators.length; i++) {
+				row.setField(i, fieldGenerators[i].next());
 			}
+			return row;
 		}
 	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 60b1bc1..827e698 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.DataGenTableSourceFactory.DataGenTableSource;
+import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -91,6 +92,9 @@ public class DataGenTableSourceFactoryTest {
 		DataGenTableSource dataGenTableSource = (DataGenTableSource) source;
 		DataGeneratorSource<RowData> gen = dataGenTableSource.createSource();
 
+		// test java serialization.
+		gen = InstantiationUtil.clone(gen);
+
 		StreamSource<RowData, DataGeneratorSource<RowData>> src = new StreamSource<>(gen);
 		AbstractStreamOperatorTestHarness<RowData> testHarness =
 				new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);