You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/14 01:58:39 UTC

[flink] branch release-1.11 updated: [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 38495e7  [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information
38495e7 is described below

commit 38495e7378f6df5eead9a29448e3944f2b3ecbea
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Jul 14 09:57:36 2020 +0800

    [FLINK-18539][datastream] Fix StreamExecutionEnvironment#addSource(SourceFunction, TypeInformation) doesn't use the user defined type information
    
    This closes #12877
---
 .../environment/StreamExecutionEnvironment.java    |  2 +-
 .../api/StreamExecutionEnvironmentTest.java        | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 59837ac..d4ddcbd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -2170,7 +2170,7 @@ public class StreamExecutionEnvironment {
 			Class<?> baseSourceClass,
 			TypeInformation<OUT> typeInfo) {
 		TypeInformation<OUT> resolvedTypeInfo = typeInfo;
-		if (source instanceof ResultTypeQueryable) {
+		if (resolvedTypeInfo == null && source instanceof ResultTypeQueryable) {
 			resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
 		}
 		if (resolvedTypeInfo == null) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 65ca0ae..df48cdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -31,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 
@@ -253,6 +257,18 @@ public class StreamExecutionEnvironmentTest {
 		}
 	}
 
+	@Test
+	public void testAddSourceWithUserDefinedTypeInfo() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Row> source1 = env.addSource(new RowSourceFunction(), Types.ROW(Types.STRING));
+		// the source type information should be the user defined type
+		assertEquals(Types.ROW(Types.STRING), source1.getType());
+
+		DataStreamSource<Row> source2 = env.addSource(new RowSourceFunction());
+		// the source type information should be derived from RowSourceFunction#getProducedType
+		assertEquals(new GenericTypeInfo<>(Row.class), source2.getType());
+	}
+
 	/////////////////////////////////////////////////////////////
 	// Utilities
 	/////////////////////////////////////////////////////////////
@@ -315,4 +331,21 @@ public class StreamExecutionEnvironmentTest {
 			super(num, string);
 		}
 	}
+
+	private static class RowSourceFunction implements SourceFunction<Row>, ResultTypeQueryable<Row> {
+		private static final long serialVersionUID = 5216362688122691404L;
+
+		@Override
+		public TypeInformation<Row> getProducedType() {
+			return TypeInformation.of(Row.class);
+		}
+
+		@Override
+		public void run(SourceContext<Row> ctx) throws Exception {
+		}
+
+		@Override
+		public void cancel() {
+		}
+	}
 }