You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:09 UTC

[2/9] flink git commit: [FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable

[FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable

This change rewrites the FromElementsFunction to work with arbitrary types. The elements
are serialized to a byte array (using a TypeSerializer) when the FromElementsFunction is
constructed and deserialized when run is called.

This closes #848


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a030c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a030c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a030c3

Branch: refs/heads/master
Commit: e4a030c307597ff7ebf7aa7cc4435947820db3a0
Parents: 0c49891
Author: Johannes Reifferscheid <j....@gmail.com>
Authored: Wed Jun 17 15:35:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../environment/StreamExecutionEnvironment.java |  6 +-
 .../functions/source/FromElementsFunction.java  | 70 ++++++++++++++++----
 .../flink/streaming/api/SourceFunctionTest.java | 18 +++--
 .../api/scala/StreamExecutionEnvironment.scala  |  4 +-
 4 files changed, 74 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5e7be8d..f98a9f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -473,7 +473,7 @@ public abstract class StreamExecutionEnvironment {
 
 		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
 
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 
 		return addSource(function, "Elements source").returns(typeInfo);
 	}
@@ -504,7 +504,7 @@ public abstract class StreamExecutionEnvironment {
 		}
 
 		TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 		checkCollection(data, typeInfo.getTypeClass());
 
 		return addSource(function, "Collection Source").returns(typeInfo);
@@ -529,7 +529,7 @@ public abstract class StreamExecutionEnvironment {
 			throw new IllegalArgumentException("Collection must not be empty");
 		}
 
-		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
 		checkCollection(data, typeInfo.getTypeClass());
 
 		return addSource(function, "Collection Source").returns(typeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 736cc73..63eb0ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -17,37 +17,81 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Iterator;
 
 public class FromElementsFunction<T> implements SourceFunction<T> {
 	
 	private static final long serialVersionUID = 1L;
 
-	private Iterable<T> iterable;
+	private final TypeSerializer<T> serializer;
+	private final byte[] elements;
 
 	private volatile boolean isRunning = true;
 
-	public FromElementsFunction(T... elements) {
-		this.iterable = Arrays.asList(elements);
-	}
+	public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
+		this(serializer, new Iterable<T>() {
+			@Override
+			public Iterator<T> iterator() {
+				return new Iterator<T>() {
+					int index = 0;
+
+					@Override
+					public boolean hasNext() {
+						return index < elements.length;
+					}
+
+					@Override
+					public T next() {
+						return elements[index++];
+					}
 
-	public FromElementsFunction(Collection<T> elements) {
-		this.iterable = elements;
+					@Override
+					public void remove() {
+						throw new UnsupportedOperationException();
+					}
+				};
+			}
+		});
 	}
 
-	public FromElementsFunction(Iterable<T> elements) {
-		this.iterable = elements;
+	public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) {
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+
+		try {
+			for (T element : elements) {
+				serializer.serialize(element, wrapper);
+			}
+		} catch (IOException e) {
+			// ByteArrayOutputStream doesn't throw IOExceptions when written to
+		}
+		// closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything.
+
+		this.serializer = serializer;
+		this.elements = baos.toByteArray();
 	}
 
 	@Override
 	public void run(SourceContext<T> ctx) throws Exception {
-		Iterator<T> it = iterable.iterator();
+		T value = serializer.createInstance();
+		ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+		DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
 
-		while (isRunning && it.hasNext()) {
-			ctx.collect(it.next());
+		while (isRunning && bais.available() > 0) {
+			value = serializer.deserialize(value, input);
+			ctx.collect(value);
 		}
+		// closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index 7a78205..f0fe63d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.util.SourceFunctionUtil;
@@ -33,18 +35,22 @@ public class SourceFunctionTest {
 	@Test
 	public void fromElementsTest() throws Exception {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
-				1,
-				2,
-				3));
+		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
+				new FromElementsFunction<Integer>(
+						IntSerializer.INSTANCE,
+						1,
+						2,
+						3)));
 		assertEquals(expectedList, actualList);
 	}
 
 	@Test
 	public void fromCollectionTest() throws Exception {
 		List<Integer> expectedList = Arrays.asList(1, 2, 3);
-		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
-				Arrays.asList(1, 2, 3)));
+		List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
+				CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>(
+						IntSerializer.INSTANCE,
+						Arrays.asList(1, 2, 3))));
 		assertEquals(expectedList, actualList);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 968e07f..7215a4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     require(data != null, "Data must not be null.")
     val typeInfo = implicitly[TypeInformation[T]]
 
-    val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
-      .asJavaCollection(data))
+    val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig),
+      scala.collection.JavaConversions.asJavaCollection(data))
 
     javaEnv.addSource(sourceFunction).returns(typeInfo)
   }