You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/10 15:30:45 UTC
[4/5] incubator-flink git commit: [FLINK-1161] [streaming] Streaming
API type handling rework to support java 8 lambdas
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 43b3993..b0ab99f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -23,9 +23,11 @@ import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
@@ -35,13 +37,12 @@ import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
/**
* A {@link WindowedDataStream} represents a data stream that has been divided
@@ -225,8 +226,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
- return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
- dataStream.outTypeWrapper, dataStream.outTypeWrapper,
+ return dataStream.addFunction("NextGenWindowReduce", reduceFunction, getType(), getType(),
getReduceInvokable(reduceFunction));
}
@@ -245,9 +245,13 @@ public class WindowedDataStream<OUT> {
*/
public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
GroupReduceFunction<OUT, R> reduceFunction) {
- return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
- dataStream.outTypeWrapper, new FunctionTypeWrapper<R>(reduceFunction,
- GroupReduceFunction.class, 1), getReduceGroupInvokable(reduceFunction));
+
+ TypeInformation<OUT> inType = getType();
+ TypeInformation<R> outType = TypeExtractor
+ .getGroupReduceReturnTypes(reduceFunction, inType);
+
+ return dataStream.addFunction("NextGenWindowReduce", reduceFunction, inType, outType,
+ getReduceGroupInvokable(reduceFunction));
}
/**
@@ -261,7 +265,7 @@ public class WindowedDataStream<OUT> {
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
dataStream.checkFieldRange(positionToSum);
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
- dataStream.getClassAtPos(positionToSum), dataStream.getOutputType()));
+ dataStream.getClassAtPos(positionToSum), getType()));
}
/**
@@ -276,8 +280,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(String field) {
- return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
- getOutputType()));
+ return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
}
/**
@@ -290,7 +293,7 @@ public class WindowedDataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
dataStream.checkFieldRange(positionToMin);
- return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
AggregationType.MIN));
}
@@ -307,8 +310,8 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(String field) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MIN, false));
+ return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+ false));
}
/**
@@ -339,7 +342,7 @@ public class WindowedDataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
dataStream.checkFieldRange(positionToMinBy);
- return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
AggregationType.MINBY, first));
}
@@ -359,7 +362,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MINBY, first));
}
@@ -373,7 +376,7 @@ public class WindowedDataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
dataStream.checkFieldRange(positionToMax);
- return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
AggregationType.MAX));
}
@@ -390,8 +393,8 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(String field) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MAX, false));
+ return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+ false));
}
/**
@@ -422,7 +425,7 @@ public class WindowedDataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
dataStream.checkFieldRange(positionToMaxBy);
- return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
AggregationType.MAXBY, first));
}
@@ -442,7 +445,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MAXBY, first));
}
@@ -450,7 +453,7 @@ public class WindowedDataStream<OUT> {
StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("windowReduce",
- aggregator, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
+ aggregator, getType(), getType(), invokable);
return returnStream;
}
@@ -576,8 +579,8 @@ public class WindowedDataStream<OUT> {
*
* @return The output type.
*/
- public TypeInformation<OUT> getOutputType() {
- return dataStream.getOutputType();
+ public TypeInformation<OUT> getType() {
+ return dataStream.getType();
}
protected WindowedDataStream<OUT> copy() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4f1efd1..5c47592 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -24,22 +24,21 @@ import java.util.List;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -131,12 +130,14 @@ public abstract class StreamExecutionEnvironment {
public long getBufferTimeout() {
return this.bufferTimeout;
}
-
+
/**
- * Sets the default parallelism that will be used for the local execution environment created by
- * {@link #createLocalEnvironment()}.
+ * Sets the default parallelism that will be used for the local execution
+ * environment created by {@link #createLocalEnvironment()}.
*
- * @param degreeOfParallelism The degree of parallelism to use as the default local parallelism.
+ * @param degreeOfParallelism
+ * The degree of parallelism to use as the default local
+ * parallelism.
*/
public static void setDefaultLocalParallelism(int degreeOfParallelism) {
defaultLocalDop = degreeOfParallelism;
@@ -210,14 +211,14 @@ public abstract class StreamExecutionEnvironment {
"fromElements needs at least one element as argument");
}
- TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
- outTypeWrapper);
+ outTypeInfo);
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addStreamVertex(returnStream.getId(),
- new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+ new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
@@ -246,16 +247,16 @@ public abstract class StreamExecutionEnvironment {
throw new IllegalArgumentException("Collection must not be empty");
}
- TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator().next());
- DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
- outTypeWrapper);
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
+ DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "collection",
+ outTypeInfo);
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
- new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
- .iterator().next()), "source", SerializationUtils.serialize(function), 1);
+ new FromElementsFunction<OUT>(data)), null, outTypeInfo, "source",
+ SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
}
@@ -271,17 +272,16 @@ public abstract class StreamExecutionEnvironment {
* @param hostname
* The host name which a server socket bind.
* @param port
- * The port number which a server socket bind. A port number of
- * 0 means that the port number is automatically allocated.
+ * The port number which a server socket bind. A port number of 0
+ * means that the port number is automatically allocated.
* @param delimiter
- * A character which split received strings into records.
+ * A character which split received strings into records.
* @return A DataStream, containing the strings received from socket.
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
}
-
-
+
/**
* Creates a new DataStream that contains the strings received infinitely
* from socket. Received strings are decoded by the system's default
@@ -290,8 +290,8 @@ public abstract class StreamExecutionEnvironment {
* @param hostname
* The host name which a server socket bind.
* @param port
- * The port number which a server socket bind. A port number of
- * 0 means that the port number is automatically allocated.
+ * The port number which a server socket bind. A port number of 0
+ * means that the port number is automatically allocated.
* @return A DataStream, containing the strings received from socket.
*/
public DataStreamSource<String> socketTextStream(String hostname, int port) {
@@ -324,14 +324,14 @@ public abstract class StreamExecutionEnvironment {
* @return the data stream constructed
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
- TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
- SourceFunction.class, 0);
- DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
- outTypeWrapper);
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
+ function.getClass(), 0, null, null);
+
+ DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source", outTypeInfo);
try {
jobGraphBuilder.addStreamVertex(returnStream.getId(),
- new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
+ new SourceInvokable<OUT>(function), null, outTypeInfo, "source",
SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index f72f66e..4666a85 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -17,24 +17,25 @@
package org.apache.flink.streaming.api.invokable.operator;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
transient OUT outTuple;
- TypeWrapper<OUT> outTypeWrapper;
+ TypeSerializer<OUT> outTypeSerializer;
int[] fields;
int numFields;
- public ProjectInvokable(int[] fields, TypeWrapper<OUT> outTypeWrapper) {
+ public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) {
super(null);
this.fields = fields;
this.numFields = this.fields.length;
- this.outTypeWrapper = outTypeWrapper;
+ this.outTypeSerializer = outTypeInformation.createSerializer();
}
@Override
@@ -60,6 +61,6 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
@Override
public void open(Configuration config) throws Exception {
super.open(config);
- outTuple = outTypeWrapper.getTypeInfo().createSerializer().createInstance();
+ outTuple = outTypeSerializer.createInstance();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 2464ff2..0058c66 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.streamvertex;
import java.util.ArrayList;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
@@ -52,11 +51,8 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
}
private void setDeserializers() {
- TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1(userClassLoader);
- inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
-
- TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2(userClassLoader);
- inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
+ inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+ inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 9d65a21..090c1a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -17,7 +17,6 @@
package org.apache.flink.streaming.api.streamvertex;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
@@ -52,7 +51,7 @@ public class InputHandler<IN> {
@SuppressWarnings("unchecked")
protected void setConfigInputs() throws StreamVertexException {
- setDeserializer();
+ inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
@@ -74,14 +73,6 @@ public class InputHandler<IN> {
}
}
- private void setDeserializer() {
- TypeInformation<IN> inTupleTypeInfo = configuration
- .getTypeInfoIn1(streamVertex.userClassLoader);
- if (inTupleTypeInfo != null) {
- inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
- }
- }
-
private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
@SuppressWarnings({ "unchecked", "rawtypes" })
final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index cc67d6e..5381e22 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -98,9 +98,8 @@ public class OutputHandler<OUT> {
}
void setSerializers() {
- outTypeInfo = configuration.getTypeInfoOut1(streamVertex.userClassLoader);
- if (outTypeInfo != null) {
- outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
+ outSerializer = configuration.getTypeSerializerOut1(streamVertex.userClassLoader);
+ if (outSerializer != null) {
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
outSerializationDelegate.setInstance(outSerializer.createInstance());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
deleted file mode 100644
index 2839535..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ClassTypeWrapper.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class ClassTypeWrapper<T> extends TypeWrapper<T> {
- private static final long serialVersionUID = 1L;
-
- private Class<T> clazz;
-
- public ClassTypeWrapper(Class<T> clazz) {
- this.clazz = clazz;
- setTypeInfo();
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- in.defaultReadObject();
- setTypeInfo();
- }
-
- @Override
- protected void setTypeInfo() {
- if (clazz != null) {
- typeInfo = TypeExtractor.getForClass(clazz);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
deleted file mode 100644
index b4cd65d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/CombineTypeWrapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class CombineTypeWrapper<OUT1, OUT2> extends
- TypeWrapper<Tuple2<OUT1, OUT2>> {
-
- private static final long serialVersionUID = 1L;
- // Info about OUT
- private TypeWrapper<OUT1> outTypeWrapper1;
- private TypeWrapper<OUT2> outTypeWrapper2;
-
- public CombineTypeWrapper(TypeWrapper<OUT1> outTypeWrapper1,
- TypeWrapper<OUT2> outTypeWrapper2) {
- this.outTypeWrapper1 = outTypeWrapper1;
- this.outTypeWrapper2 = outTypeWrapper2;
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- in.defaultReadObject();
- setTypeInfo();
- }
-
- @Override
- protected void setTypeInfo() {
- typeInfo = new TupleTypeInfo<Tuple2<OUT1, OUT2>>(
- outTypeWrapper1.getTypeInfo(), outTypeWrapper2.getTypeInfo());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
deleted file mode 100644
index 4255912..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/FunctionTypeWrapper.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class FunctionTypeWrapper<T> extends TypeWrapper<T> {
- private static final long serialVersionUID = 1L;
-
- private Function function;
- private Class<? extends Function> functionSuperClass;
- private int typeParameterNumber;
-
- public FunctionTypeWrapper(Function function, Class<? extends Function> functionSuperClass,
- int typeParameterNumber) {
- this.function = function;
- this.functionSuperClass = functionSuperClass;
- this.typeParameterNumber = typeParameterNumber;
- setTypeInfo();
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- in.defaultReadObject();
- setTypeInfo();
- }
-
- @Override
- protected void setTypeInfo() {
- if (typeParameterNumber != -1) {
- typeInfo = TypeExtractor.createTypeInfo(functionSuperClass, function.getClass(),
- typeParameterNumber, null, null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
deleted file mode 100644
index 6bf90c4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ObjectTypeWrapper.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-public class ObjectTypeWrapper<T> extends
- TypeWrapper<T> {
- private static final long serialVersionUID = 1L;
-
- private T instance;
-
- public ObjectTypeWrapper(T instance) {
- this.instance = instance;
- setTypeInfo();
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- in.defaultReadObject();
- setTypeInfo();
- }
-
- @Override
- protected void setTypeInfo() {
- if (instance != null) {
- typeInfo = TypeExtractor.getForObject(instance);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
deleted file mode 100644
index 9e8d4b4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/ProjectTypeWrapper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-
-public class ProjectTypeWrapper<IN,OUT extends Tuple> extends
- TypeWrapper<OUT> {
- private static final long serialVersionUID = 1L;
-
-
- private TypeWrapper<IN> inType;
- Class<?>[] givenTypes;
- int[] fields;
-
- public ProjectTypeWrapper(TypeWrapper<IN> inType,int[] fields,Class<?>[] givenTypes) {
- this.inType = inType;
- this.givenTypes = givenTypes;
- this.fields = fields;
- setTypeInfo();
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException,
- ClassNotFoundException {
- in.defaultReadObject();
- setTypeInfo();
- }
-
- @Override
- protected void setTypeInfo() {
- TypeInformation<?>[] outTypes = extractFieldTypes();
- this.typeInfo = new TupleTypeInfo<OUT>(outTypes);
- }
-
- private TypeInformation<?>[] extractFieldTypes() {
-
- TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType.getTypeInfo();
- TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
-
- for(int i=0; i<fields.length; i++) {
-
- if(inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
- throw new IllegalArgumentException("Given types do not match types of input data set.");
- }
-
- fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
- }
-
- return fieldTypes;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
deleted file mode 100644
index a2e16b6..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeWrapper.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-public abstract class TypeWrapper<T>
- implements Serializable {
- private static final long serialVersionUID = 1L;
-
- protected transient TypeInformation<T> typeInfo = null;
-
- public TypeInformation<T> getTypeInfo() {
- if (typeInfo == null) {
- throw new RuntimeException("There is no TypeInformation in the wrapper");
- }
- return typeInfo;
- }
-
- protected abstract void setTypeInfo();
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
index 5157dcb..288d4ee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -23,12 +23,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
import org.apache.flink.streaming.util.MockInvokable;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
import org.junit.Test;
public class ProjectTest implements Serializable {
@@ -37,17 +37,18 @@ public class ProjectTest implements Serializable {
@Test
public void test() {
- TypeWrapper<Tuple5<Integer, String, Integer, String, Integer>> inTypeWrapper = new ObjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>>(
- new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+ TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
+ .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
+ 4));
int[] fields = new int[] { 4, 4, 3 };
Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
- TypeWrapper<Tuple3<Integer, Integer, String>> outTypeWrapper = new ProjectTypeWrapper<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
- inTypeWrapper, fields, classes);
-
+ @SuppressWarnings("unchecked")
ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
- fields, outTypeWrapper);
+ fields,
+ (TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
+ .extractFieldTypes(fields, classes, inType));
List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
deleted file mode 100644
index 1d2fd2e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/serialization/TypeSerializationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.junit.Test;
-
-public class TypeSerializationTest {
-
- private static class MyMap extends RichMapFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return null;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void functionTypeSerializationTest() {
- TypeWrapper<Integer> ser = new FunctionTypeWrapper<Integer>(new MyMap(),
- RichMapFunction.class, 0);
-
- byte[] serializedType = SerializationUtils.serialize(ser);
-
- TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
- .deserialize(serializedType);
-
- assertNotNull(ser.getTypeInfo());
- assertNotNull(ser2.getTypeInfo());
-
- assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void objectTypeSerializationTest() {
- Integer instance = Integer.valueOf(22);
-
- TypeWrapper<Integer> ser = new ObjectTypeWrapper<Integer>(instance);
-
- byte[] serializedType = SerializationUtils.serialize(ser);
-
- TypeWrapper<Integer> ser2 = (TypeWrapper<Integer>) SerializationUtils
- .deserialize(serializedType);
-
- assertNotNull(ser.getTypeInfo());
- assertNotNull(ser2.getTypeInfo());
-
- assertEquals(ser.getTypeInfo(), ser2.getTypeInfo());
- }
-}