You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:09 UTC
[2/9] flink git commit: [FLINK-2124] [streaming] Fix behavior of
FromElementsFunction when T isn't Serializable
[FLINK-2124] [streaming] Fix behavior of FromElementsFunction when T isn't Serializable
This change rewrites the FromElementsFunction to work with arbitrary types. The elements
are serialized to a byte array (using a TypeSerializer) when the FromElementsFunction is
constructed and deserialized when run is called.
This closes #848
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a030c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a030c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a030c3
Branch: refs/heads/master
Commit: e4a030c307597ff7ebf7aa7cc4435947820db3a0
Parents: 0c49891
Author: Johannes Reifferscheid <j....@gmail.com>
Authored: Wed Jun 17 15:35:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200
----------------------------------------------------------------------
.../environment/StreamExecutionEnvironment.java | 6 +-
.../functions/source/FromElementsFunction.java | 70 ++++++++++++++++----
.../flink/streaming/api/SourceFunctionTest.java | 18 +++--
.../api/scala/StreamExecutionEnvironment.scala | 4 +-
4 files changed, 74 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5e7be8d..f98a9f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -473,7 +473,7 @@ public abstract class StreamExecutionEnvironment {
TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data[0]);
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
return addSource(function, "Elements source").returns(typeInfo);
}
@@ -504,7 +504,7 @@ public abstract class StreamExecutionEnvironment {
}
TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
checkCollection(data, typeInfo.getTypeClass());
return addSource(function, "Collection Source").returns(typeInfo);
@@ -529,7 +529,7 @@ public abstract class StreamExecutionEnvironment {
throw new IllegalArgumentException("Collection must not be empty");
}
- SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+ SourceFunction<OUT> function = new FromElementsFunction<OUT>(typeInfo.createSerializer(getConfig()), data);
checkCollection(data, typeInfo.getTypeClass());
return addSource(function, "Collection Source").returns(typeInfo);
http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
index 736cc73..63eb0ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -17,37 +17,81 @@
package org.apache.flink.streaming.api.functions.source;
-import java.util.Arrays;
-import java.util.Collection;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Iterator;
public class FromElementsFunction<T> implements SourceFunction<T> {
private static final long serialVersionUID = 1L;
- private Iterable<T> iterable;
+ private final TypeSerializer<T> serializer;
+ private final byte[] elements;
private volatile boolean isRunning = true;
- public FromElementsFunction(T... elements) {
- this.iterable = Arrays.asList(elements);
- }
+ public FromElementsFunction(TypeSerializer<T> serializer, final T... elements) {
+ this(serializer, new Iterable<T>() {
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>() {
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return index < elements.length;
+ }
+
+ @Override
+ public T next() {
+ return elements[index++];
+ }
- public FromElementsFunction(Collection<T> elements) {
- this.iterable = elements;
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ });
}
- public FromElementsFunction(Iterable<T> elements) {
- this.iterable = elements;
+ public FromElementsFunction(TypeSerializer<T> serializer, Iterable<T> elements) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ OutputViewDataOutputStreamWrapper wrapper = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
+
+ try {
+ for (T element : elements) {
+ serializer.serialize(element, wrapper);
+ }
+ } catch (IOException e) {
+ // ByteArrayOutputStream doesn't throw IOExceptions when written to
+ }
+ // closing the DataOutputStream would just flush the ByteArrayOutputStream, which in turn doesn't do anything.
+
+ this.serializer = serializer;
+ this.elements = baos.toByteArray();
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
- Iterator<T> it = iterable.iterator();
+ T value = serializer.createInstance();
+ ByteArrayInputStream bais = new ByteArrayInputStream(elements);
+ DataInputView input = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
- while (isRunning && it.hasNext()) {
- ctx.collect(it.next());
+ while (isRunning && bais.available() > 0) {
+ value = serializer.deserialize(value, input);
+ ctx.collect(value);
}
+ // closing the DataInputStream would just close the ByteArrayInputStream, which doesn't do anything
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
index 7a78205..f0fe63d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java
@@ -23,6 +23,8 @@ import static org.mockito.Mockito.mock;
import java.util.Arrays;
import java.util.List;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.util.SourceFunctionUtil;
@@ -33,18 +35,22 @@ public class SourceFunctionTest {
@Test
public void fromElementsTest() throws Exception {
List<Integer> expectedList = Arrays.asList(1, 2, 3);
- List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
- 1,
- 2,
- 3));
+ List<Integer> actualList = SourceFunctionUtil.runSourceFunction(CommonTestUtils.createCopySerializable(
+ new FromElementsFunction<Integer>(
+ IntSerializer.INSTANCE,
+ 1,
+ 2,
+ 3)));
assertEquals(expectedList, actualList);
}
@Test
public void fromCollectionTest() throws Exception {
List<Integer> expectedList = Arrays.asList(1, 2, 3);
- List<Integer> actualList = SourceFunctionUtil.runSourceFunction(new FromElementsFunction<Integer>(
- Arrays.asList(1, 2, 3)));
+ List<Integer> actualList = SourceFunctionUtil.runSourceFunction(
+ CommonTestUtils.createCopySerializable(new FromElementsFunction<Integer>(
+ IntSerializer.INSTANCE,
+ Arrays.asList(1, 2, 3))));
assertEquals(expectedList, actualList);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4a030c3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 968e07f..7215a4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -284,8 +284,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
require(data != null, "Data must not be null.")
val typeInfo = implicitly[TypeInformation[T]]
- val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions
- .asJavaCollection(data))
+ val sourceFunction = new FromElementsFunction[T](typeInfo.createSerializer(getConfig),
+ scala.collection.JavaConversions.asJavaCollection(data))
javaEnv.addSource(sourceFunction).returns(typeInfo)
}