You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/10 15:30:46 UTC
[5/5] incubator-flink git commit: [FLINK-1161] [streaming] Streaming
API type handling rework to support java 8 lambdas
[FLINK-1161] [streaming] Streaming API type handling rework to support java 8 lambdas
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/51c1f677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/51c1f677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/51c1f677
Branch: refs/heads/master
Commit: 51c1f67791307c2b9355171f7398d104befc8de5
Parents: 94c8e3f
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Dec 8 17:12:01 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100
----------------------------------------------------------------------
.../flink/streaming/api/JobGraphBuilder.java | 85 +++--
.../flink/streaming/api/StreamConfig.java | 93 +++---
.../api/datastream/ConnectedDataStream.java | 119 +++----
.../streaming/api/datastream/DataStream.java | 130 ++++----
.../api/datastream/DataStreamSink.java | 85 ++---
.../api/datastream/DataStreamSource.java | 8 +-
.../api/datastream/GroupedDataStream.java | 8 +-
.../datastream/SingleOutputStreamOperator.java | 6 +-
.../api/datastream/SplitDataStream.java | 2 +-
.../api/datastream/StreamJoinOperator.java | 8 +-
.../api/datastream/StreamProjection.java | 307 ++++++++++---------
.../api/datastream/WindowedDataStream.java | 49 +--
.../environment/StreamExecutionEnvironment.java | 56 ++--
.../invokable/operator/ProjectInvokable.java | 11 +-
.../api/streamvertex/CoStreamVertex.java | 8 +-
.../api/streamvertex/InputHandler.java | 11 +-
.../api/streamvertex/OutputHandler.java | 5 +-
.../util/serialization/ClassTypeWrapper.java | 46 ---
.../util/serialization/CombineTypeWrapper.java | 50 ---
.../util/serialization/FunctionTypeWrapper.java | 53 ----
.../util/serialization/ObjectTypeWrapper.java | 47 ---
.../util/serialization/ProjectTypeWrapper.java | 70 -----
.../util/serialization/TypeWrapper.java | 38 ---
.../api/invokable/operator/ProjectTest.java | 19 +-
.../serialization/TypeSerializationTest.java | 72 -----
25 files changed, 493 insertions(+), 893 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index 8a8595a..c45164a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
@@ -40,7 +41,6 @@ import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,10 +66,10 @@ public class JobGraphBuilder {
private Map<String, List<StreamPartitioner<?>>> connectionTypes;
private Map<String, String> operatorNames;
private Map<String, StreamInvokable<?, ?>> invokableObjects;
- private Map<String, TypeWrapper<?>> typeWrapperIn1;
- private Map<String, TypeWrapper<?>> typeWrapperIn2;
- private Map<String, TypeWrapper<?>> typeWrapperOut1;
- private Map<String, TypeWrapper<?>> typeWrapperOut2;
+ private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
+ private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
+ private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
+ private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
private Map<String, byte[]> serializedFunctions;
private Map<String, byte[]> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>> vertexClasses;
@@ -98,10 +98,10 @@ public class JobGraphBuilder {
connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
operatorNames = new HashMap<String, String>();
invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
- typeWrapperIn1 = new HashMap<String, TypeWrapper<?>>();
- typeWrapperIn2 = new HashMap<String, TypeWrapper<?>>();
- typeWrapperOut1 = new HashMap<String, TypeWrapper<?>>();
- typeWrapperOut2 = new HashMap<String, TypeWrapper<?>>();
+ typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>();
+ typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
+ typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
+ typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
serializedFunctions = new HashMap<String, byte[]>();
outputSelectors = new HashMap<String, byte[]>();
vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
@@ -124,10 +124,10 @@ public class JobGraphBuilder {
* Name of the vertex
* @param invokableObject
* User defined operator
- * @param inTypeWrapper
- * Input type wrapper for serialization
- * @param outTypeWrapper
- * Output type wrapper for serialization
+ * @param inTypeInfo
+ * Input type for serialization
+ * @param outTypeInfo
+ * Output typ for serialization
* @param operatorName
* Operator type
* @param serializedFunction
@@ -136,14 +136,19 @@ public class JobGraphBuilder {
* Number of parallel instances created
*/
public <IN, OUT> void addStreamVertex(String vertexName,
- StreamInvokable<IN, OUT> invokableObject, TypeWrapper<?> inTypeWrapper,
- TypeWrapper<?> outTypeWrapper, String operatorName, byte[] serializedFunction,
+ StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo,
+ TypeInformation<OUT> outTypeInfo, String operatorName, byte[] serializedFunction,
int parallelism) {
addVertex(vertexName, StreamVertex.class, invokableObject, operatorName,
serializedFunction, parallelism);
- addTypeWrappers(vertexName, inTypeWrapper, null, outTypeWrapper, null);
+ StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+ inTypeInfo) : null;
+ StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>(
+ outTypeInfo) : null;
+
+ addTypeSerializers(vertexName, inSerializer, null, outSerializer, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: {}", vertexName);
@@ -224,14 +229,16 @@ public class JobGraphBuilder {
}
public <IN1, IN2, OUT> void addCoTask(String vertexName,
- CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeWrapper<?> in1TypeWrapper,
- TypeWrapper<?> in2TypeWrapper, TypeWrapper<?> outTypeWrapper, String operatorName,
- byte[] serializedFunction, int parallelism) {
+ CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo,
+ TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo,
+ String operatorName, byte[] serializedFunction, int parallelism) {
addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName,
serializedFunction, parallelism);
- addTypeWrappers(vertexName, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, null);
+ addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo),
+ new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>(
+ outTypeInfo), null);
if (LOG.isDebugEnabled()) {
LOG.debug("CO-TASK: {}", vertexName);
@@ -273,12 +280,13 @@ public class JobGraphBuilder {
iterationTailCount.put(vertexName, 0);
}
- private void addTypeWrappers(String vertexName, TypeWrapper<?> in1, TypeWrapper<?> in2,
- TypeWrapper<?> out1, TypeWrapper<?> out2) {
- typeWrapperIn1.put(vertexName, in1);
- typeWrapperIn2.put(vertexName, in2);
- typeWrapperOut1.put(vertexName, out1);
- typeWrapperOut2.put(vertexName, out2);
+ private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
+ StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1,
+ StreamRecordSerializer<?> out2) {
+ typeSerializersIn1.put(vertexName, in1);
+ typeSerializersIn2.put(vertexName, in2);
+ typeSerializersOut1.put(vertexName, out1);
+ typeSerializersOut2.put(vertexName, out2);
}
/**
@@ -315,10 +323,10 @@ public class JobGraphBuilder {
config.setMutability(mutability.get(vertexName));
config.setBufferTimeout(bufferTimeout.get(vertexName));
- config.setTypeWrapperIn1(typeWrapperIn1.get(vertexName));
- config.setTypeWrapperIn2(typeWrapperIn2.get(vertexName));
- config.setTypeWrapperOut1(typeWrapperOut1.get(vertexName));
- config.setTypeWrapperOut2(typeWrapperOut2.get(vertexName));
+ config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
+ config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName));
+ config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName));
+ config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName));
// Set vertex config
config.setUserInvokable(invokableObject);
@@ -482,19 +490,10 @@ public class JobGraphBuilder {
operatorNames.put(to, operatorNames.get(from));
serializedFunctions.put(to, serializedFunctions.get(from));
- typeWrapperIn1.put(to, typeWrapperOut1.get(from));
- typeWrapperIn2.put(to, typeWrapperOut2.get(from));
- typeWrapperOut1.put(to, typeWrapperOut1.get(from));
- typeWrapperOut2.put(to, typeWrapperOut2.get(from));
- }
-
- public TypeInformation<?> getInTypeInfo(String id) {
- System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
- return typeWrapperIn1.get(id).getTypeInfo();
- }
-
- public TypeInformation<?> getOutTypeInfo(String id) {
- return typeWrapperOut1.get(id).getTypeInfo();
+ typeSerializersIn1.put(to, typeSerializersOut1.get(from));
+ typeSerializersIn2.put(to, typeSerializersOut2.get(from));
+ typeSerializersOut1.put(to, typeSerializersOut1.get(from));
+ typeSerializersOut2.put(to, typeSerializersOut2.get(from));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 3dba376..31af9cb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -25,15 +25,14 @@ import java.util.Map;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
import org.apache.flink.util.InstantiationUtil;
public class StreamConfig {
@@ -54,6 +53,12 @@ public class StreamConfig {
private static final String USER_FUNCTION = "userfunction";
private static final String BUFFER_TIMEOUT = "bufferTimeout";
private static final String OPERATOR_STATES = "operatorStates";
+ private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
+ private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
+ private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
+ private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+ private static final String MUTABILITY = "isMutable";
+ private static final String ITERATON_WAIT = "iterationWait";
// DEFAULT VALUES
@@ -61,10 +66,7 @@ public class StreamConfig {
private static final long DEFAULT_TIMEOUT = 0;
- // STRINGS
-
- private static final String MUTABILITY = "isMutable";
- private static final String ITERATON_WAIT = "iterationWait";
+ // CONFIG METHODS
private Configuration config;
@@ -76,65 +78,64 @@ public class StreamConfig {
return config;
}
- // CONFIGS
-
- private static final String TYPE_WRAPPER_IN_1 = "typeWrapper_in_1";
- private static final String TYPE_WRAPPER_IN_2 = "typeWrapper_in_2";
- private static final String TYPE_WRAPPER_OUT_1 = "typeWrapper_out_1";
- private static final String TYPE_WRAPPER_OUT_2 = "typeWrapper_out_2";
-
- public void setTypeWrapperIn1(TypeWrapper<?> typeWrapper) {
- setTypeWrapper(TYPE_WRAPPER_IN_1, typeWrapper);
+ public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
}
- public void setTypeWrapperIn2(TypeWrapper<?> typeWrapper) {
- setTypeWrapper(TYPE_WRAPPER_IN_2, typeWrapper);
+ public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
}
- public void setTypeWrapperOut1(TypeWrapper<?> typeWrapper) {
- setTypeWrapper(TYPE_WRAPPER_OUT_1, typeWrapper);
+ public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
}
- public void setTypeWrapperOut2(TypeWrapper<?> typeWrapper) {
- setTypeWrapper(TYPE_WRAPPER_OUT_2, typeWrapper);
+ public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
}
- public <T> TypeInformation<T> getTypeInfoIn1(ClassLoader cl) {
- return getTypeInfo(TYPE_WRAPPER_IN_1, cl);
- }
-
- public <T> TypeInformation<T> getTypeInfoIn2(ClassLoader cl) {
- return getTypeInfo(TYPE_WRAPPER_IN_2, cl);
- }
-
- public <T> TypeInformation<T> getTypeInfoOut1(ClassLoader cl) {
- return getTypeInfo(TYPE_WRAPPER_OUT_1, cl);
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_IN_1, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
}
- public <T> TypeInformation<T> getTypeInfoOut2(ClassLoader cl) {
- return getTypeInfo(TYPE_WRAPPER_OUT_2, cl);
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_IN_2, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
}
- private void setTypeWrapper(String key, TypeWrapper<?> typeWrapper) {
- config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_OUT_1, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
}
@SuppressWarnings("unchecked")
- private <T> TypeInformation<T> getTypeInfo(String key, ClassLoader cl) {
-
- TypeWrapper<T> typeWrapper;
+ public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
try {
- typeWrapper = (TypeWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config, key,
- cl);
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_OUT_2, cl);
} catch (Exception e) {
- throw new RuntimeException("Cannot load typeinfo");
- }
- if (typeWrapper != null) {
- return typeWrapper.getTypeInfo();
- } else {
- return null;
+ throw new RuntimeException("Could not instantiate serializer.");
}
+ }
+ private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+ config.setBytes(key, SerializationUtils.serialize(typeWrapper));
}
public void setMutability(boolean isMutable) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 1621752..6336e68 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
@@ -44,10 +46,6 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.serialization.CombineTypeWrapper;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
import org.apache.flink.util.Collector;
/**
@@ -122,7 +120,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The type of the first input
*/
public TypeInformation<IN1> getInputType1() {
- return dataStream1.getOutputType();
+ return dataStream1.getType();
}
/**
@@ -131,7 +129,7 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The type of the second input
*/
public TypeInformation<IN2> getInputType2() {
- return dataStream2.getOutputType();
+ return dataStream2.getType();
}
/**
@@ -403,15 +401,11 @@ public class ConnectedDataStream<IN1, IN2> {
* @return The transformed {@link DataStream}
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
- FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
- CoMapFunction.class, 0);
- FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coMapper,
- CoMapFunction.class, 1);
- FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coMapper,
- CoMapFunction.class, 2);
-
- return addCoFunction("coMap", coMapper, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
- new CoMapInvokable<IN1, IN2, OUT>(coMapper));
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
+ coMapper.getClass(), 2, null, null);
+
+ return addCoFunction("coMap", coMapper, outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
+ coMapper));
}
/**
@@ -431,15 +425,11 @@ public class ConnectedDataStream<IN1, IN2> {
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
- FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coFlatMapper,
- CoFlatMapFunction.class, 0);
- FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coFlatMapper,
- CoFlatMapFunction.class, 1);
- FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coFlatMapper,
- CoFlatMapFunction.class, 2);
-
- return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
- outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
+ coFlatMapper.getClass(), 2, null, null);
+
+ return addCoFunction("coFlatMap", coFlatMapper, outTypeInfo,
+ new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
}
/**
@@ -460,14 +450,10 @@ public class ConnectedDataStream<IN1, IN2> {
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
- FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
- CoReduceFunction.class, 0);
- FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
- CoReduceFunction.class, 1);
- FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
- CoReduceFunction.class, 2);
- return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
- getReduceInvokable(coReducer));
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
+ coReducer.getClass(), 2, null, null);
+
+ return addCoFunction("coReduce", coReducer, outTypeInfo, getReduceInvokable(coReducer));
}
/**
@@ -528,16 +514,12 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must be positive");
}
- FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coWindowFunction,
- CoWindowFunction.class, 0);
- FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coWindowFunction,
- CoWindowFunction.class, 1);
- FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coWindowFunction,
- CoWindowFunction.class, 2);
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
+ coWindowFunction.getClass(), 2, null, null);
- return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
- outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
- slideInterval, timestamp1, timestamp2));
+ return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
+ new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+ timestamp1, timestamp2));
}
protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
@@ -556,26 +538,23 @@ public class ConnectedDataStream<IN1, IN2> {
CrossFunction<IN1, IN2, OUT> crossFunction, long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
- TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
- .createSerializer().createInstance());
- TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
- .createSerializer().createInstance());
+ TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CrossFunction.class,
+ crossFunction.getClass(), 2, null, null);
- FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(crossFunction,
- CrossFunction.class, 2);
+ CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(
+ crossFunction);
- CrossWindowFunction<IN1, IN2, OUT> crossWindowFunction = new CrossWindowFunction<IN1, IN2, OUT>(crossFunction);
-
- return addGeneralWindowCombine(crossWindowFunction, in1TypeWrapper, in2TypeWrapper,
- outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
+ return addGeneralWindowCombine(crossWindowFunction, outTypeInfo, windowSize, slideInterval,
+ timestamp1, timestamp2);
}
- private static class CrossWindowFunction<IN1, IN2, OUT> implements CoWindowFunction<IN1, IN2, OUT> {
+ private static class CrossWindowFunction<IN1, IN2, OUT> implements
+ CoWindowFunction<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private CrossFunction<IN1, IN2, OUT> crossFunction;
-
+
public CrossWindowFunction(CrossFunction<IN1, IN2, OUT> crossFunction) {
this.crossFunction = crossFunction;
}
@@ -590,27 +569,22 @@ public class ConnectedDataStream<IN1, IN2> {
}
}
}
-
+
protected SingleOutputStreamOperator<Tuple2<IN1, IN2>, ?> addGeneralWindowJoin(
CoWindowFunction<IN1, IN2, Tuple2<IN1, IN2>> coWindowFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
- TypeWrapper<IN1> in1TypeWrapper = new ObjectTypeWrapper<IN1>(dataStream1.getOutputType()
- .createSerializer().createInstance());
- TypeWrapper<IN2> in2TypeWrapper = new ObjectTypeWrapper<IN2>(dataStream2.getOutputType()
- .createSerializer().createInstance());
-
- CombineTypeWrapper<IN1, IN2> outTypeWrapper = new CombineTypeWrapper<IN1, IN2>(
- in1TypeWrapper, in2TypeWrapper);
+ TypeInformation<Tuple2<IN1, IN2>> outType = new TupleTypeInfo<Tuple2<IN1, IN2>>(
+ getInputType1(), getInputType2());
- return addGeneralWindowCombine(coWindowFunction, in1TypeWrapper, in2TypeWrapper,
- outTypeWrapper, windowSize, slideInterval, timestamp1, timestamp2);
+ return addGeneralWindowCombine(coWindowFunction, outType, windowSize, slideInterval,
+ timestamp1, timestamp2);
}
private <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
- CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeWrapper<IN1> in1TypeWrapper,
- TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper, long windowSize,
- long slideInterval, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
+ long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
+ TimeStamp<IN2> timestamp2) {
if (windowSize < 1) {
throw new IllegalArgumentException("Window size must be positive");
@@ -619,23 +593,22 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must be positive");
}
- return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
- outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
- slideInterval, timestamp1, timestamp2));
+ return addCoFunction("coWindowReduce", coWindowFunction, outTypeInfo,
+ new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize, slideInterval,
+ timestamp1, timestamp2));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
- final Function function, TypeWrapper<IN1> in1TypeWrapper,
- TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper,
+ final Function function, TypeInformation<OUT> outTypeInfo,
CoInvokable<IN1, IN2, OUT> functionInvokable) {
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
- environment, functionName, outTypeWrapper);
+ environment, functionName, outTypeInfo);
try {
dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
- in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
+ getInputType1(), getInputType2(), outTypeInfo, functionName,
SerializationUtils.serialize((Serializable) function),
environment.getDegreeOfParallelism());
} catch (SerializationException e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index f0e4309..978f5fa 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -70,9 +71,6 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
import org.apache.flink.streaming.util.keys.PojoKeySelector;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
-import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -97,7 +95,7 @@ public class DataStream<OUT> {
protected List<String> userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
- protected final TypeWrapper<OUT> outTypeWrapper;
+ protected final TypeInformation<OUT> typeInfo;
protected List<DataStream<OUT>> mergedStreams;
protected final JobGraphBuilder jobGraphBuilder;
@@ -110,11 +108,11 @@ public class DataStream<OUT> {
* StreamExecutionEnvironment
* @param operatorType
* The type of the operator in the component
- * @param outTypeWrapper
- * Type of the output
+ * @param typeInfo
+ * Type of the datastream
*/
public DataStream(StreamExecutionEnvironment environment, String operatorType,
- TypeWrapper<OUT> outTypeWrapper) {
+ TypeInformation<OUT> typeInfo) {
if (environment == null) {
throw new NullPointerException("context is null");
}
@@ -127,7 +125,7 @@ public class DataStream<OUT> {
this.userDefinedNames = new ArrayList<String>();
this.selectAll = false;
this.partitioner = new DistributePartitioner<OUT>(true);
- this.outTypeWrapper = outTypeWrapper;
+ this.typeInfo = typeInfo;
this.mergedStreams = new ArrayList<DataStream<OUT>>();
this.mergedStreams.add(this);
}
@@ -146,7 +144,7 @@ public class DataStream<OUT> {
this.selectAll = dataStream.selectAll;
this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
- this.outTypeWrapper = dataStream.outTypeWrapper;
+ this.typeInfo = dataStream.typeInfo;
this.mergedStreams = new ArrayList<DataStream<OUT>>();
this.mergedStreams.add(this);
if (dataStream.mergedStreams.size() > 1) {
@@ -176,12 +174,12 @@ public class DataStream<OUT> {
}
/**
- * Gets the output type.
+ * Gets the type of the stream.
*
- * @return The output type.
+ * @return The type of the datastream.
*/
- public TypeInformation<OUT> getOutputType() {
- return this.outTypeWrapper.getTypeInfo();
+ public TypeInformation<OUT> getType() {
+ return this.typeInfo;
}
/**
@@ -230,7 +228,7 @@ public class DataStream<OUT> {
*/
public GroupedDataStream<OUT> groupBy(int... fields) {
- return groupBy(FieldsKeySelector.getSelector(getOutputType(), fields));
+ return groupBy(FieldsKeySelector.getSelector(getType(), fields));
}
@@ -248,7 +246,7 @@ public class DataStream<OUT> {
**/
public GroupedDataStream<OUT> groupBy(String... fields) {
- return groupBy(new PojoKeySelector<OUT>(getOutputType(), fields));
+ return groupBy(new PojoKeySelector<OUT>(getType(), fields));
}
@@ -277,7 +275,7 @@ public class DataStream<OUT> {
public DataStream<OUT> partitionBy(int... fields) {
return setConnectionType(new FieldsPartitioner<OUT>(FieldsKeySelector.getSelector(
- getOutputType(), fields)));
+ getType(), fields)));
}
/**
@@ -290,8 +288,8 @@ public class DataStream<OUT> {
*/
public DataStream<OUT> partitionBy(String... fields) {
- return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(
- getOutputType(), fields)));
+ return setConnectionType(new FieldsPartitioner<OUT>(new PojoKeySelector<OUT>(getType(),
+ fields)));
}
/**
@@ -387,13 +385,10 @@ public class DataStream<OUT> {
* @return The transformed {@link DataStream}.
*/
public <R> SingleOutputStreamOperator<R, ?> map(MapFunction<OUT, R> mapper) {
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(mapper,
- MapFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(mapper,
- MapFunction.class, 1);
- return addFunction("map", mapper, inTypeWrapper, outTypeWrapper, new MapInvokable<OUT, R>(
- mapper));
+ TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(mapper, getType());
+
+ return addFunction("map", mapper, getType(), outType, new MapInvokable<OUT, R>(mapper));
}
/**
@@ -413,13 +408,11 @@ public class DataStream<OUT> {
* @return The transformed {@link DataStream}.
*/
public <R> SingleOutputStreamOperator<R, ?> flatMap(FlatMapFunction<OUT, R> flatMapper) {
- FunctionTypeWrapper<OUT> inTypeWrapper = new FunctionTypeWrapper<OUT>(flatMapper,
- FlatMapFunction.class, 0);
- FunctionTypeWrapper<R> outTypeWrapper = new FunctionTypeWrapper<R>(flatMapper,
- FlatMapFunction.class, 1);
- return addFunction("flatMap", flatMapper, inTypeWrapper, outTypeWrapper,
- new FlatMapInvokable<OUT, R>(flatMapper));
+ TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType());
+
+ return addFunction("flatMap", flatMapper, getType(), outType, new FlatMapInvokable<OUT, R>(
+ flatMapper));
}
/**
@@ -434,9 +427,9 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
+
+ return addFunction("reduce", reducer, getType(), getType(), new StreamReduceInvokable<OUT>(
+ reducer));
}
/**
@@ -454,11 +447,7 @@ public class DataStream<OUT> {
* @return The filtered DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
- FunctionTypeWrapper<OUT> typeWrapper = new FunctionTypeWrapper<OUT>(filter,
- FilterFunction.class, 0);
-
- return addFunction("filter", filter, typeWrapper, typeWrapper, new FilterInvokable<OUT>(
- filter));
+ return addFunction("filter", filter, getType(), getType(), new FilterInvokable<OUT>(filter));
}
/**
@@ -543,7 +532,7 @@ public class DataStream<OUT> {
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
checkFieldRange(positionToSum);
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
- getClassAtPos(positionToSum), getOutputType()));
+ getClassAtPos(positionToSum), getType()));
}
/**
@@ -559,8 +548,7 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(String field) {
- return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field,
- getOutputType()));
+ return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
}
/**
@@ -573,7 +561,7 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
checkFieldRange(positionToMin);
- return aggregate(ComparableAggregator.getAggregator(positionToMin, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
AggregationType.MIN));
}
@@ -590,8 +578,8 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> min(String field) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MIN, false));
+ return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+ false));
}
/**
@@ -604,7 +592,7 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
checkFieldRange(positionToMax);
- return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
AggregationType.MAX));
}
@@ -621,8 +609,8 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> max(String field) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
- AggregationType.MAX, false));
+ return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+ false));
}
/**
@@ -641,7 +629,7 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MINBY, first));
}
@@ -661,7 +649,7 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
- return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MAXBY, first));
}
@@ -694,7 +682,7 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
checkFieldRange(positionToMinBy);
- return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
AggregationType.MINBY, first));
}
@@ -727,7 +715,7 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
checkFieldRange(positionToMaxBy);
- return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+ return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
AggregationType.MAXBY, first));
}
@@ -737,11 +725,9 @@ public class DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<Long, ?> count() {
- TypeWrapper<OUT> inTypeWrapper = outTypeWrapper;
- TypeWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(Long.valueOf(0));
+ TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0));
- return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
- new CounterInvokable<OUT>());
+ return addFunction("counter", null, getType(), outTypeInfo, new CounterInvokable<OUT>());
}
/**
@@ -803,7 +789,7 @@ public class DataStream<OUT> {
public DataStreamSink<OUT> print() {
DataStream<OUT> inputStream = this.copy();
PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
- DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, outTypeWrapper);
+ DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, getType());
return returnStream;
}
@@ -923,7 +909,7 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
- path, format, millis, endTuple), inputStream.outTypeWrapper);
+ path, format, millis, endTuple), inputStream.typeInfo);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -951,7 +937,7 @@ public class DataStream<OUT> {
WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
- inputStream.outTypeWrapper);
+ inputStream.typeInfo);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -1074,7 +1060,7 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
- path, format, millis, endTuple), inputStream.outTypeWrapper);
+ path, format, millis, endTuple), inputStream.typeInfo);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -1102,7 +1088,7 @@ public class DataStream<OUT> {
WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
- inputStream.outTypeWrapper);
+ inputStream.typeInfo);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -1112,7 +1098,7 @@ public class DataStream<OUT> {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
- outTypeWrapper, outTypeWrapper, invokable);
+ typeInfo, typeInfo, invokable);
return returnStream;
}
@@ -1142,16 +1128,16 @@ public class DataStream<OUT> {
* @return the data stream constructed
*/
protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
- final Function function, TypeWrapper<OUT> inTypeWrapper, TypeWrapper<R> outTypeWrapper,
- StreamInvokable<OUT, R> functionInvokable) {
+ final Function function, TypeInformation<OUT> inTypeInfo,
+ TypeInformation<R> outTypeInfo, StreamInvokable<OUT, R> functionInvokable) {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
- functionName, outTypeWrapper);
+ functionName, outTypeInfo);
try {
- jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeWrapper,
- outTypeWrapper, functionName,
+ jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeInfo,
+ outTypeInfo, functionName,
SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
@@ -1220,18 +1206,16 @@ public class DataStream<OUT> {
}
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream, SinkFunction<OUT> sinkFunction) {
- return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<OUT>(sinkFunction,
- SinkFunction.class, 0));
+ return addSink(inputStream, sinkFunction, getType());
}
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
- SinkFunction<OUT> sinkFunction, TypeWrapper<OUT> inTypeWrapper) {
- DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
- outTypeWrapper);
+ SinkFunction<OUT> sinkFunction, TypeInformation<OUT> inTypeInfo) {
+ DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo);
try {
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
- sinkFunction), inTypeWrapper, null, "sink", SerializationUtils
+ sinkFunction), inTypeInfo, null, "sink", SerializationUtils
.serialize(sinkFunction), degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SinkFunction");
@@ -1252,7 +1236,7 @@ public class DataStream<OUT> {
@SuppressWarnings("rawtypes")
protected Class<?> getClassAtPos(int pos) {
Class<?> type;
- TypeInformation<OUT> outTypeInfo = outTypeWrapper.getTypeInfo();
+ TypeInformation<OUT> outTypeInfo = getType();
if (outTypeInfo.isTupleType()) {
type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 6bf6f43..369c3eb 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -1,44 +1,45 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
-
-/**
- * Represents the end of a DataStream.
- *
- * @param <IN>
- * The type of the DataStream closed by the sink.
- */
-public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
-
- protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<IN> outTypeWrapper) {
- super(environment, operatorType, outTypeWrapper);
- }
-
- protected DataStreamSink(DataStream<IN> dataStream) {
- super(dataStream);
- }
-
- @Override
- protected DataStreamSink<IN> copy() {
- throw new RuntimeException("Data stream sinks cannot be copied");
- }
-
-}
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Represents the end of a DataStream.
+ *
+ * @param <IN>
+ * The type of the DataStream closed by the sink.
+ */
+public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> {
+
+ protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType,
+ TypeInformation<IN> outTypeInfo) {
+ super(environment, operatorType, outTypeInfo);
+ }
+
+ protected DataStreamSink(DataStream<IN> dataStream) {
+ super(dataStream);
+ }
+
+ @Override
+ protected DataStreamSink<IN> copy() {
+ throw new RuntimeException("Data stream sinks cannot be copied");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 5b2747f..978ea42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -17,8 +17,8 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* The DataStreamSource represents the starting point of a DataStream.
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.util.serialization.TypeWrapper;
*/
public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataStreamSource<OUT>> {
- public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeWrapper<OUT> outTypeWrapper) {
- super(environment, operatorType, outTypeWrapper);
+ public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, TypeInformation<OUT> outTypeInfo) {
+ super(environment, operatorType, outTypeInfo);
}
public DataStreamSource(DataStream<OUT> dataStream) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 6b07cca..32f664f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
/**
* A GroupedDataStream represents a {@link DataStream} which has been
@@ -62,9 +61,8 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
- return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), new GroupedReduceInvokable<OUT>(reducer, keySelector));
+ return addFunction("groupReduce", reducer, getType(), getType(),
+ new GroupedReduceInvokable<OUT>(reducer, keySelector));
}
/**
@@ -184,7 +182,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
keySelector);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
- outTypeWrapper, outTypeWrapper, invokable);
+ typeInfo, typeInfo, invokable);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 714807c..76da27c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,12 +23,12 @@ import java.util.Map.Entry;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
/**
* The SingleOutputStreamOperator represents a user defined transformation
@@ -43,8 +43,8 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
DataStream<OUT> {
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment,
- String operatorType, TypeWrapper<OUT> outTypeWrapper) {
- super(environment, operatorType, outTypeWrapper);
+ String operatorType, TypeInformation<OUT> outTypeInfo) {
+ super(environment, operatorType, outTypeInfo);
setBufferTimeout(environment.getBufferTimeout());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index e1c091c..5a8f038 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -44,7 +44,7 @@ public class SplitDataStream<OUT> {
* @return The output type.
*/
public TypeInformation<OUT> getOutputType() {
- return dataStream.getOutputType();
+ return dataStream.getType();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
index ba6e75e..89c80ab 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamJoinOperator.java
@@ -60,7 +60,7 @@ public class StreamJoinOperator<I1, I2> extends
*/
public JoinPredicate<I1, I2> where(int... fields) {
return new JoinPredicate<I1, I2>(op, FieldsKeySelector.getSelector(
- op.input1.getOutputType(), fields));
+ op.input1.getType(), fields));
}
/**
@@ -76,7 +76,7 @@ public class StreamJoinOperator<I1, I2> extends
* {@link JoinPredicate#equalTo} to continue the Join.
*/
public JoinPredicate<I1, I2> where(String... fields) {
- return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getOutputType(),
+ return new JoinPredicate<I1, I2>(op, new PojoKeySelector<I1>(op.input1.getType(),
fields));
}
@@ -135,7 +135,7 @@ public class StreamJoinOperator<I1, I2> extends
* @return The joined data stream.
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(int... fields) {
- return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getOutputType(),
+ return createJoinOperator(FieldsKeySelector.getSelector(op.input2.getType(),
fields));
}
@@ -154,7 +154,7 @@ public class StreamJoinOperator<I1, I2> extends
* @return The joined data stream.
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> equalTo(String... fields) {
- return createJoinOperator(new PojoKeySelector<I2>(op.input2.getOutputType(), fields));
+ return createJoinOperator(new PojoKeySelector<I2>(op.input2.getType(), fields));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/51c1f677/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 265e033..cc5f66e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.datastream;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple10;
@@ -43,21 +44,20 @@ import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.tuple.Tuple8;
import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.invokable.operator.ProjectInvokable;
-import org.apache.flink.streaming.util.serialization.ProjectTypeWrapper;
-import org.apache.flink.streaming.util.serialization.TypeWrapper;
public class StreamProjection<IN> {
private DataStream<IN> dataStream;
private int[] fieldIndexes;
- private TypeWrapper<IN> inTypeWrapper;
+ private TypeInformation<IN> inTypeInfo;
protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
this.dataStream = dataStream;
this.fieldIndexes = fieldIndexes;
- this.inTypeWrapper = dataStream.outTypeWrapper;
- if (!inTypeWrapper.getTypeInfo().isTupleType()) {
+ this.inTypeInfo = dataStream.typeInfo;
+ if (!inTypeInfo.isTupleType()) {
throw new RuntimeException("Only Tuple DataStreams can be projected");
}
}
@@ -80,12 +80,11 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple1<T0>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple1<T0>>(
- inTypeWrapper, fieldIndexes, types);
-
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outTypeWrapper));
-
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple1<T0>>(fieldIndexes, outType));
}
/**
@@ -109,11 +108,11 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple2<T0, T1>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple2<T0, T1>>(
- inTypeWrapper, fieldIndexes, types);
-
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outTypeWrapper));
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
}
/**
@@ -139,12 +138,11 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple3<T0, T1, T2>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple3<T0, T1, T2>>(
- inTypeWrapper, fieldIndexes, types);
-
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outTypeWrapper));
-
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
}
/**
@@ -172,12 +170,11 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple4<T0, T1, T2, T3>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple4<T0, T1, T2, T3>>(
- inTypeWrapper, fieldIndexes, types);
-
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outTypeWrapper));
-
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
}
/**
@@ -206,13 +203,11 @@ public class StreamProjection<IN> {
throw new IllegalArgumentException(
"Numbers of projected fields and types do not match.");
}
-
- TypeWrapper<Tuple5<T0, T1, T2, T3, T4>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple5<T0, T1, T2, T3, T4>>(
- inTypeWrapper, fieldIndexes, types);
-
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outTypeWrapper));
-
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
}
/**
@@ -245,12 +240,11 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple6<T0, T1, T2, T3, T4, T5>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(
- inTypeWrapper, fieldIndexes, types);
-
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes,
- outTypeWrapper));
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
}
/**
@@ -284,12 +278,14 @@ public class StreamProjection<IN> {
throw new IllegalArgumentException(
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(
- inTypeWrapper, fieldIndexes, types);
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
- outTypeWrapper));
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream
+ .addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
+ outType));
}
/**
@@ -325,12 +321,13 @@ public class StreamProjection<IN> {
throw new IllegalArgumentException(
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(
- inTypeWrapper, fieldIndexes, types);
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
- outTypeWrapper));
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
+ outType));
}
/**
@@ -369,11 +366,12 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(
- inTypeWrapper, fieldIndexes, types);
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
- outTypeWrapper));
+ outType));
}
/**
@@ -414,11 +412,12 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
- inTypeWrapper, fieldIndexes, types);
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -462,12 +461,13 @@ public class StreamProjection<IN> {
throw new IllegalArgumentException(
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
- inTypeWrapper, fieldIndexes, types);
- return dataStream.addFunction("projection", null, inTypeWrapper, outTypeWrapper,
- new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
- fieldIndexes, outTypeWrapper));
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
+ return dataStream.addFunction("projection", null, inTypeInfo, outType,
+ new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
+ fieldIndexes, outType));
}
/**
@@ -513,17 +513,18 @@ public class StreamProjection<IN> {
throw new IllegalArgumentException(
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
- inTypeWrapper, fieldIndexes, types);
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> outType = (TypeInformation<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
- fieldIndexes, outTypeWrapper));
-
+ fieldIndexes, outType));
}
/**
@@ -572,16 +573,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> outType = (TypeInformation<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -632,17 +634,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> outType = (TypeInformation<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
- fieldIndexes, outTypeWrapper));
-
+ fieldIndexes, outType));
}
/**
@@ -696,17 +698,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> outType = (TypeInformation<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
- fieldIndexes, outTypeWrapper));
-
+ fieldIndexes, outType));
}
/**
@@ -762,17 +764,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> outType = (TypeInformation<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
- fieldIndexes, outTypeWrapper));
-
+ fieldIndexes, outType));
}
/**
@@ -830,16 +832,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> outType = (TypeInformation<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -899,16 +902,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> outType = (TypeInformation<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -971,16 +975,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> outType = (TypeInformation<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -1045,17 +1050,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> outType = (TypeInformation<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
- fieldIndexes, outTypeWrapper));
-
+ fieldIndexes, outType));
}
/**
@@ -1123,16 +1128,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> outType = (TypeInformation<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -1202,16 +1208,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> outType = (TypeInformation<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -1284,16 +1291,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> outType = (TypeInformation<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -1368,16 +1376,17 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> outType = (TypeInformation<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
}
/**
@@ -1454,16 +1463,36 @@ public class StreamProjection<IN> {
"Numbers of projected fields and types do not match.");
}
- TypeWrapper<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outTypeWrapper = new ProjectTypeWrapper<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
- inTypeWrapper, fieldIndexes, types);
+ @SuppressWarnings("unchecked")
+ TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> outType = (TypeInformation<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>) extractFieldTypes(
+ fieldIndexes, types, inTypeInfo);
return dataStream
.addFunction(
"projection",
null,
- inTypeWrapper,
- outTypeWrapper,
+ inTypeInfo,
+ outType,
new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
- fieldIndexes, outTypeWrapper));
+ fieldIndexes, outType));
+ }
+
+ public static TypeInformation<?> extractFieldTypes(int[] fields, Class<?>[] givenTypes,
+ TypeInformation<?> inType) {
+
+ TupleTypeInfo<?> inTupleType = (TupleTypeInfo<?>) inType;
+ TypeInformation<?>[] fieldTypes = new TypeInformation[fields.length];
+
+ for (int i = 0; i < fields.length; i++) {
+
+ if (inTupleType.getTypeAt(fields[i]).getTypeClass() != givenTypes[i]) {
+ throw new IllegalArgumentException(
+ "Given types do not match types of input data set.");
+ }
+
+ fieldTypes[i] = inTupleType.getTypeAt(fields[i]);
+ }
+
+ return new TupleTypeInfo<Tuple>(fieldTypes);
}
}