You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 01:58:39 UTC
[flink] branch release-1.11 updated: [FLINK-18539][datastream] Fix
StreamExecutionEnvironment#addSource(SourceFunction,
TypeInformation) doesn't use the user defined type information
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 38495e7 [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information
38495e7 is described below
commit 38495e7378f6df5eead9a29448e3944f2b3ecbea
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Jul 14 09:57:36 2020 +0800
[FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information
This closes #12877
---
.../environment/StreamExecutionEnvironment.java | 2 +-
.../api/StreamExecutionEnvironmentTest.java | 33 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 59837ac..d4ddcbd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2170,7 +2170,7 @@ public class StreamExecutionEnvironment {
Class<?> baseSourceClass,
TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = typeInfo;
- if (source instanceof ResultTypeQueryable) {
+ if (resolvedTypeInfo == null && source instanceof ResultTypeQueryable) {
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
}
if (resolvedTypeInfo == null) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 65ca0ae..df48cdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -31,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
@@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest {
}
}
+ @Test
+ public void testAddSourceWithUserDefinedTypeInfo() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Row> source1 = env.addSource(new RowSourceFunction(), Types.ROW(Types.STRING));
+ // the source type information should be the user defined type
+ assertEquals(Types.ROW(Types.STRING), source1.getType());
+
+ DataStreamSource<Row> source2 = env.addSource(new RowSourceFunction());
+ // the source type information should be derived from RowSourceFunction#getProducedType
+ assertEquals(new GenericTypeInfo<>(Row.class), source2.getType());
+ }
+
/////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////
@@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest {
super(num, string);
}
}
+
+ private static class RowSourceFunction implements SourceFunction<Row>, ResultTypeQueryable<Row> {
+ private static final long serialVersionUID = 5216362688122691404L;
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return TypeInformation.of(Row.class);
+ }
+
+ @Override
+ public void run(SourceContext<Row> ctx) throws Exception {
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
}