You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 01:59:26 UTC
[flink] branch release-1.10 updated: [FLINK-18539][datastream] Fix
StreamExecutionEnvironment#addSource(SourceFunction,
TypeInformation) doesn't use the user defined type information
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 2a3b642 [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information
2a3b642 is described below
commit 2a3b642b1efb957f3d4f20502c40398786ab1469
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Jul 14 09:58:18 2020 +0800
[FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information
This closes #12878
---
.../environment/StreamExecutionEnvironment.java | 2 +-
.../api/StreamExecutionEnvironmentTest.java | 33 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 70f2cc1..d7922fc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1573,7 +1573,7 @@ public class StreamExecutionEnvironment {
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
- if (function instanceof ResultTypeQueryable) {
+ if (typeInfo == null && function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
}
if (typeInfo == null) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 65ca0ae..df48cdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -31,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
@@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest {
}
}
+ @Test
+ public void testAddSourceWithUserDefinedTypeInfo() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Row> source1 = env.addSource(new RowSourceFunction(), Types.ROW(Types.STRING));
+ // the source type information should be the user defined type
+ assertEquals(Types.ROW(Types.STRING), source1.getType());
+
+ DataStreamSource<Row> source2 = env.addSource(new RowSourceFunction());
+ // the source type information should be derived from RowSourceFunction#getProducedType
+ assertEquals(new GenericTypeInfo<>(Row.class), source2.getType());
+ }
+
/////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////
@@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest {
super(num, string);
}
}
+
+ private static class RowSourceFunction implements SourceFunction<Row>, ResultTypeQueryable<Row> {
+ private static final long serialVersionUID = 5216362688122691404L;
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return TypeInformation.of(Row.class);
+ }
+
+ @Override
+ public void run(SourceContext<Row> ctx) throws Exception {
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
}