You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:36 UTC
[03/12] [FLINK-1102] [streaming] Projection operator added to
DataStream
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c3231d7..e7a68d3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -35,7 +35,7 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -224,7 +224,7 @@ public abstract class StreamExecutionEnvironment {
"fromElements needs at least one element as argument");
}
- TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+ TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
outTypeWrapper);
@@ -259,7 +259,7 @@ public abstract class StreamExecutionEnvironment {
throw new IllegalArgumentException("Collection must not be empty");
}
- TypeSerializerWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
+ TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator()
.next());
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
outTypeWrapper);
@@ -305,7 +305,7 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
- TypeSerializerWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
+ TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
SourceFunction.class, 0);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
outTypeWrapper);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index b3cd57f..476c519 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -43,7 +43,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
- protected StreamRecordSerializer<IN> serializer;
+ protected StreamRecordSerializer<IN> inSerializer;
protected StreamRecord<IN> reuse;
protected boolean isMutable;
@@ -73,8 +73,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
StreamRecordSerializer<IN> serializer, boolean isMutable) {
this.collector = collector;
this.recordIterator = recordIterator;
- this.serializer = serializer;
- if(this.serializer != null){
+ this.inSerializer = serializer;
+ if(this.inSerializer != null){
this.reuse = serializer.createInstance();
}
this.isMutable = isMutable;
@@ -84,7 +84,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
* Re-initializes the object in which the next input record will be read in
*/
protected void resetReuse() {
- this.reuse = serializer.createInstance();
+ this.reuse = inSerializer.createInstance();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index f306dac..1cfc2d2 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -117,7 +117,7 @@ public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
@Override
public void open(Configuration config) throws Exception {
super.open(config);
- this.typeSerializer = serializer.getObjectSerializer();
+ this.typeSerializer = inSerializer.getObjectSerializer();
}
protected class StreamBatch implements Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
new file mode 100644
index 0000000..f72f66e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
+
+public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ transient OUT outTuple;
+ TypeWrapper<OUT> outTypeWrapper;
+ int[] fields;
+ int numFields;
+
+ public ProjectInvokable(int[] fields, TypeWrapper<OUT> outTypeWrapper) {
+ super(null);
+ this.fields = fields;
+ this.numFields = this.fields.length;
+ this.outTypeWrapper = outTypeWrapper;
+ }
+
+ @Override
+ protected void immutableInvoke() throws Exception {
+ mutableInvoke();
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while ((reuse = recordIterator.next(reuse)) != null) {
+ callUserFunctionAndLogException();
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ for (int i = 0; i < this.numFields; i++) {
+ outTuple.setField(reuse.getField(fields[i]), i);
+ }
+ collector.collect(outTuple);
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+ outTuple = outTypeWrapper.getTypeInfo().createSerializer().createInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
index ea1a750..4255912 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-public class FunctionTypeWrapper<T> extends TypeSerializerWrapper<T> {
+public class FunctionTypeWrapper<T> extends TypeWrapper<T> {
private static final long serialVersionUID = 1L;
private Function function;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
index 487c0a3..6bf90c4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.flink.api.java.typeutils.TypeExtractor;
public class ObjectTypeWrapper<T> extends
- TypeSerializerWrapper<T> {
+ TypeWrapper<T> {
private static final long serialVersionUID = 1L;
private T instance;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
new file mode 100644
index 0000000..9e8d4b4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public class ProjectTypeWrapper<IN,OUT extends Tuple> extends
+ TypeWrapper<OUT> {
+ private static final long serialVersionUID = 1L;
+
+
+ private TypeWrapper<IN> inType;
+ Class<?>[] givenTypes;
+ int[] fields;
+
+ public ProjectTypeWrapper(TypeWrapper<IN> inType,int[] fields,Class<?>[] givenTypes) {
+ this.inType = inType;
+ this.givenTypes = givenTypes;
+ this.fields = fields;
+ setTypeInfo();
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException,
+ ClassNotFoundException {
+ in.defaultReadObject();
+ setTypeInfo();
+ }
+
+ @Override
+ protected void setTypeInfo() {
+ TypeInformation<?>[] outTypes = extractFieldTypes();
+ this.typeInfo = new TupleTypeInfo<OUT>(outTypes);
+ }
+
+ private TypeInformation<?>[] extractFieldTypes() {
+
+ TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType.getTypeInfo();
+ TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+
+ for(int i=0; i<fields.length; i++) {
+
+ if(inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
+ throw new IllegalArgumentException("Given types do not match types of input data set.");
+ }
+
+ fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+ }
+
+ return fieldTypes;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
deleted file mode 100644
index ca7a9d2..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeSerializerWrapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public abstract class TypeSerializerWrapper<T>
- implements Serializable {
- private static final long serialVersionUID = 1L;
-
- protected transient TypeInformation<T> typeInfo = null;
-
- public TypeInformation<T> getTypeInfo() {
- if (typeInfo == null) {
- throw new RuntimeException("There is no TypeInformation in the wrapper");
- }
- return typeInfo;
- }
-
- protected abstract void setTypeInfo();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
new file mode 100644
index 0000000..a2e16b6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public abstract class TypeWrapper<T>
+ implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ protected transient TypeInformation<T> typeInfo = null;
+
+ public TypeInformation<T> getTypeInfo() {
+ if (typeInfo == null) {
+ throw new RuntimeException("There is no TypeInformation in the wrapper");
+ }
+ return typeInfo;
+ }
+
+ protected abstract void setTypeInfo();
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
new file mode 100644
index 0000000..5157dcb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.junit.Test;
+
+public class ProjectTest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void test() {
+
+ TypeWrapper<Tuple5<Integer, String, Integer, String, Integer>> inTypeWrapper = new ObjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>>(
+ new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+
+ int[] fields = new int[] { 4, 4, 3 };
+ Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
+
+ TypeWrapper<Tuple3<Integer, Integer, String>> outTypeWrapper = new ProjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+ inTypeWrapper, fields, classes);
+
+ ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+ fields, outTypeWrapper);
+
+ List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
+ input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+ input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
+ input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
+ input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
+
+ List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
+ expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
+ expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+ expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
+ expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
+
+ assertEquals(expected, MockInvokable.createAndExecute(invokable, input));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index f470c76..0119f04 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -116,6 +116,11 @@ public class StreamVertexTest {
fail();
} catch (IllegalArgumentException e) {
}
+ try {
+ env.generateSequence(1, 10).project(2);
+ fail();
+ } catch (RuntimeException e) {
+ }
try {
env.readTextFile("random/path/that/is/not/valid");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
index c4682dd..e8b96c5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
@@ -17,20 +17,24 @@
package org.apache.flink.streaming.util;
+import java.io.Serializable;
import java.util.Collection;
+import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.util.Collector;
public class MockCollector<T> implements Collector<T> {
private Collection<T> outputs;
-
+
public MockCollector(Collection<T> outputs) {
this.outputs = outputs;
}
@Override
public void collect(T record) {
- outputs.add(record);
+ T copied = SerializationUtils.deserialize(SerializationUtils
+ .serialize((Serializable) record));
+ outputs.add(copied);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4175dca8/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
index d9cde4f..8795126 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
@@ -38,12 +38,12 @@ public class TypeSerializationTest {
@SuppressWarnings("unchecked")
@Test
public void functionTypeSerializationTest() {
- TypeSerializerWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
+ TypeWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
RichMapFunction.class, 0);
byte[] serializedType = SerializationUtils.serialize(ser);
- TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
+ TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
.deserialize(serializedType);
assertNotNull(ser.getTypeInfo());
@@ -57,11 +57,11 @@ public class TypeSerializationTest {
public void objectTypeSerializationTest() {
Integer instance = new Integer(22);
- TypeSerializerWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
+ TypeWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
byte[] serializedType = SerializationUtils.serialize(ser);
- TypeSerializerWrapper<Integer> ser2 = (TypeSerializerWrapper<Integer>) SerializationUtils
+ TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
.deserialize(serializedType);
assertNotNull(ser.getTypeInfo());