You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:01 UTC
[24/51] [abbrv] git commit: [streaming] Wrapped serializers to make
component construction simpler
[streaming] Wrapped serializers to make component construction simpler
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/799424d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/799424d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/799424d1
Branch: refs/heads/master
Commit: 799424d1a3237a8d1b32451f4b3ad23b342cf59e
Parents: 2f704ae
Author: ghermann <re...@gmail.com>
Authored: Tue Jul 29 20:00:11 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:09 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 331 +++++++++++++---
.../streaming/api/IterativeDataStream.java | 21 +-
.../flink/streaming/api/StreamConfig.java | 87 +++--
.../api/StreamExecutionEnvironment.java | 380 ++-----------------
.../api/function/source/FileSourceFunction.java | 6 +-
.../api/function/source/FileStreamFunction.java | 10 +-
.../api/function/source/SourceFunction.java | 7 +-
.../api/invokable/SourceInvokable.java | 44 +++
.../api/invokable/StreamComponentInvokable.java | 10 +-
.../api/invokable/StreamRecordInvokable.java | 7 +-
.../api/invokable/UserSourceInvokable.java | 34 --
.../operator/StreamReduceInvokable.java | 1 -
.../api/invokable/operator/co/CoInvokable.java | 2 +-
.../AbstractStreamComponent.java | 78 ++--
.../streamcomponent/BlockingQueueBroker.java | 5 +-
.../api/streamcomponent/CoStreamTask.java | 36 +-
.../SingleInputAbstractStreamComponent.java | 50 +--
.../streamcomponent/StreamIterationSink.java | 8 +-
.../streamcomponent/StreamIterationSource.java | 6 +-
.../api/streamcomponent/StreamSink.java | 16 +-
.../api/streamcomponent/StreamSource.java | 17 +-
.../api/streamcomponent/StreamTask.java | 17 +-
.../util/serialization/FunctionTypeWrapper.java | 74 ++++
.../util/serialization/ObjectTypeWrapper.java | 61 +++
.../serialization/TypeSerializerWrapper.java | 57 +++
.../flink/streaming/api/WriteAsTextTest.java | 18 +-
.../api/invokable/operator/FilterTest.java | 2 +-
.../serialization/TypeSerializationTest.java | 91 +++++
28 files changed, 846 insertions(+), 630 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index d965bf2..1bde6a6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -22,6 +22,9 @@ package org.apache.flink.streaming.api;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.functions.FilterFunction;
import org.apache.flink.api.java.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.GroupReduceFunction;
@@ -29,14 +32,20 @@ import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
+import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
+import org.apache.flink.streaming.api.invokable.SinkInvokable;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
@@ -44,6 +53,8 @@ import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* A DataStream represents a stream of elements of the same type. A DataStream
@@ -66,12 +77,13 @@ public class DataStream<T extends Tuple> {
protected String id;
protected int degreeOfParallelism;
protected String userDefinedName;
- protected OutputSelector<T> outputSelector;
protected List<String> connectIDs;
protected List<StreamPartitioner<T>> partitioners;
protected boolean iterationflag;
protected Integer iterationID;
+ protected JobGraphBuilder jobGraphBuilder;
+
/**
* Create a new {@link DataStream} in the given execution environment with
* partitioning set to shuffle by default.
@@ -91,8 +103,8 @@ public class DataStream<T extends Tuple> {
this.id = operatorType + "-" + counter.toString();
this.environment = environment;
this.degreeOfParallelism = environment.getDegreeOfParallelism();
+ this.jobGraphBuilder = environment.getJobGraphBuilder();
initConnections();
-
}
/**
@@ -106,11 +118,11 @@ public class DataStream<T extends Tuple> {
this.id = dataStream.id;
this.degreeOfParallelism = dataStream.degreeOfParallelism;
this.userDefinedName = dataStream.userDefinedName;
- this.outputSelector = dataStream.outputSelector;
this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
this.partitioners = new ArrayList<StreamPartitioner<T>>(dataStream.partitioners);
this.iterationflag = dataStream.iterationflag;
this.iterationID = dataStream.iterationID;
+ this.jobGraphBuilder = dataStream.jobGraphBuilder;
}
/**
@@ -144,7 +156,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with mutability set.
*/
public DataStream<T> setMutability(boolean isMutable) {
- environment.setMutability(this, isMutable);
+ jobGraphBuilder.setMutability(id, isMutable);
return this;
}
@@ -157,7 +169,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with buffer timeout set.
*/
public DataStream<T> setBufferTimeout(long timeoutMillis) {
- environment.setBufferTimeout(this, timeoutMillis);
+ jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
return this;
}
@@ -175,10 +187,9 @@ public class DataStream<T extends Tuple> {
}
this.degreeOfParallelism = dop;
- environment.setOperatorParallelism(this);
+ jobGraphBuilder.setParallelism(id, degreeOfParallelism);
return new DataStream<T>(this);
-
}
/**
@@ -200,13 +211,14 @@ public class DataStream<T extends Tuple> {
* @return The named DataStream.
*/
public DataStream<T> name(String name) {
- // copy?
+ // TODO copy DataStream?
if (name == "") {
throw new IllegalArgumentException("User defined name must not be empty string");
}
userDefinedName = name;
- environment.setName(this, name);
+ jobGraphBuilder.setUserDefinedName(id, name);
+
return this;
}
@@ -236,13 +248,10 @@ public class DataStream<T extends Tuple> {
* The other DataStream will connected to this
* @param stream
* This DataStream will be connected to returnStream
- * @return Connected DataStream
*/
- private DataStream<T> addConnection(DataStream<T> returnStream, DataStream<T> stream) {
+ private void addConnection(DataStream<T> returnStream, DataStream<T> stream) {
returnStream.connectIDs.addAll(stream.connectIDs);
returnStream.partitioners.addAll(stream.partitioners);
-
- return returnStream;
}
/**
@@ -256,8 +265,12 @@ public class DataStream<T extends Tuple> {
* @return The directed DataStream.
*/
public DataStream<T> directTo(OutputSelector<T> outputSelector) {
- this.outputSelector = outputSelector;
- environment.addDirectedEmit(id, outputSelector);
+ try {
+ jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize OutputSelector");
+ }
+
return this;
}
@@ -323,10 +336,10 @@ public class DataStream<T extends Tuple> {
for (int i = 0; i < returnStream.partitioners.size(); i++) {
returnStream.partitioners.set(i, partitioner);
}
-
+
return returnStream;
}
-
+
/**
* Applies a Map transformation on a {@link DataStream}. The transformation
* calls a {@link MapFunction} for each element of the DataStream. Each
@@ -340,9 +353,8 @@ public class DataStream<T extends Tuple> {
* @return The transformed DataStream.
*/
public <R extends Tuple> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
- return environment.addFunction("map", new DataStream<T>(this), mapper,
- new MapInvokable<T, R>(mapper));
-
+ return addFunction("map", mapper, new FunctionTypeWrapper<T, Tuple, R>(mapper,
+ MapFunction.class, 0, -1, 1), new MapInvokable<T, R>(mapper));
}
/**
@@ -362,8 +374,10 @@ public class DataStream<T extends Tuple> {
*/
public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
- return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(
- otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
+ return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
+ coMapper,
+ new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
+ new CoMapInvokable<T, T2, R>(coMapper));
}
/**
@@ -381,8 +395,8 @@ public class DataStream<T extends Tuple> {
* @return The transformed DataStream.
*/
public <R extends Tuple> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
- return environment.addFunction("flatMap", new DataStream<T>(this), flatMapper,
- new FlatMapInvokable<T, R>(flatMapper));
+ return addFunction("flatMap", flatMapper, new FunctionTypeWrapper<T, Tuple, R>(flatMapper,
+ FlatMapFunction.class, 0, -1, 1), new FlatMapInvokable<T, R>(flatMapper));
}
/**
@@ -397,8 +411,8 @@ public class DataStream<T extends Tuple> {
* @return The filtered DataStream.
*/
public StreamOperator<T, T> filter(FilterFunction<T> filter) {
- return environment.addFunction("filter", new DataStream<T>(this), filter,
- new FilterInvokable<T>(filter));
+ return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
+ FilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
}
/**
@@ -418,8 +432,9 @@ public class DataStream<T extends Tuple> {
*/
public <R extends Tuple> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize) {
- return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
- new BatchReduceInvokable<T, R>(reducer, batchSize));
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new BatchReduceInvokable<T, R>(reducer,
+ batchSize));
}
/**
@@ -440,8 +455,93 @@ public class DataStream<T extends Tuple> {
*/
public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
long windowSize) {
- return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
- new WindowReduceInvokable<T, R>(reducer, windowSize));
+ return addFunction("batchReduce", reducer, new FunctionTypeWrapper<T, Tuple, R>(reducer,
+ GroupReduceFunction.class, 0, -1, 1), new WindowReduceInvokable<T, R>(reducer,
+ windowSize));
+ }
+
+ /**
+ * Internal function for passing the user defined functions to the JobGraph
+ * of the job.
+ *
+ * @param functionName
+ * name of the function
+ * @param function
+ * the user defined function
+ * @param functionInvokable
+ * the wrapping JobVertex instance
+ * @param <T>
+ * type of the input stream
+ * @param <R>
+ * type of the return stream
+ * @return the data stream constructed
+ */
+ private <R extends Tuple> StreamOperator<T, R> addFunction(String functionName,
+ final AbstractFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+ UserTaskInvokable<T, R> functionInvokable) {
+
+ DataStream<T> inputStream = new DataStream<T>(this);
+ StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
+
+ try {
+ jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
+ functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ connectGraph(inputStream, returnStream.getId(), 0);
+
+ if (inputStream.iterationflag) {
+ returnStream.addIterationSource(inputStream.iterationID.toString());
+ inputStream.iterationflag = false;
+ }
+
+ return returnStream;
+ }
+
+ protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(
+ String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2,
+ final AbstractFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
+ CoInvokable<T1, T2, R> functionInvokable) {
+
+ DataStream<R> returnStream = new DataStream<R>(environment, functionName);
+
+ try {
+ jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
+ functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ connectGraph(inputStream1, returnStream.getId(), 1);
+ connectGraph(inputStream2, returnStream.getId(), 2);
+
+ // TODO consider iteration
+
+ return returnStream;
+ }
+
+ /**
+ * Internal function for assembling the underlying
+ * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
+ * the outputs of the given input stream to the specified output stream
+ * given by the outputID.
+ *
+ * @param inputStream
+ * input data stream
+ * @param outputID
+ * ID of the output
+ * @param typeNumber
+ * Number of the type (used at co-functions)
+ */
+ <X extends Tuple> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+ for (int i = 0; i < inputStream.connectIDs.size(); i++) {
+ String inputID = inputStream.connectIDs.get(i);
+ StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
+
+ jobGraphBuilder.setEdge(inputID, outputID, partitioner, typeNumber);
+ }
}
/**
@@ -454,7 +554,7 @@ public class DataStream<T extends Tuple> {
* @return The modified DataStream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
- return environment.addSink(new DataStream<T>(this), sinkFunction);
+ return addSink(new DataStream<T>(this), sinkFunction);
}
/**
@@ -465,7 +565,35 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream.
*/
public DataStream<T> print() {
- return environment.print(new DataStream<T>(this));
+ DataStream<T> inputStream = new DataStream<T>(this);
+ PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
+ DataStream<T> returnStream = addSink(inputStream, printFunction, null);
+
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+
+ return returnStream;
+ }
+
+ private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
+ return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<T, Tuple, T>(
+ sinkFunction, SinkFunction.class, 0, -1, 0));
+ }
+
+ private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction,
+ TypeSerializerWrapper<T, Tuple, T> typeWrapper) {
+ DataStream<T> returnStream = new DataStream<T>(environment, "sink");
+
+ try {
+ jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
+ typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+ degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize SinkFunction");
+ }
+
+ inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+
+ return returnStream;
}
/**
@@ -479,7 +607,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path) {
- environment.writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
+ writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
return new DataStream<T>(this);
}
@@ -497,7 +625,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, long millis) {
- environment.writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
+ writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
return new DataStream<T>(this);
}
@@ -516,7 +644,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, int batchSize) {
- environment.writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
+ writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
return new DataStream<T>(this);
}
@@ -539,7 +667,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, long millis, T endTuple) {
- environment.writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
+ writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
return new DataStream<T>(this);
}
@@ -563,11 +691,66 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, int batchSize, T endTuple) {
- environment.writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
+ writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
return new DataStream<T>(this);
}
/**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * system time.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<T> writeAsText(DataStream<T> inputStream, String path,
+ WriteFormatAsText<T> format, long millis, T endTuple) {
+ DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
+ format, millis, endTuple), null);
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in text format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * batchSize.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<T> writeAsText(DataStream<T> inputStream, String path,
+ WriteFormatAsText<T> format, int batchSize, T endTuple) {
+ DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
+ format, batchSize, endTuple), null);
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
* Writes a DataStream to the file specified by path in text format. For
* every element of the DataStream the result of {@link Object#toString()}
* is written.
@@ -578,7 +761,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path) {
- environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
+ writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
return new DataStream<T>(this);
}
@@ -596,7 +779,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, long millis) {
- environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
+ writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
return new DataStream<T>(this);
}
@@ -615,7 +798,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, int batchSize) {
- environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
+ writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
return new DataStream<T>(this);
}
@@ -638,7 +821,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, long millis, T endTuple) {
- environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
+ writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
return new DataStream<T>(this);
}
@@ -663,11 +846,66 @@ public class DataStream<T extends Tuple> {
*/
public DataStream<T> writeAsCsv(String path, int batchSize, T endTuple) {
setMutability(false);
- environment.writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
+ writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
return new DataStream<T>(this);
}
/**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically, in every millis milliseconds. For
+ * every element of the DataStream the result of {@link Object#toString()}
+ * is written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param millis
+ * is the file update frequency
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * system time.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
+ WriteFormatAsCsv<T> format, long millis, T endTuple) {
+ DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
+ format, millis, endTuple));
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
+ * Writes a DataStream to the file specified by path in csv format. The
+ * writing is performed periodically in equally sized batches. For every
+ * element of the DataStream the result of {@link Object#toString()} is
+ * written.
+ *
+ * @param path
+ * is the path to the location where the tuples are written
+ * @param batchSize
+ * is the size of the batches, i.e. the number of tuples written
+ * to the file at a time
+ * @param endTuple
+ * is a special tuple indicating the end of the stream. If an
+ * endTuple is caught, the last pending batch of tuples will be
+ * immediately appended to the target file regardless of the
+ * batchSize.
+ *
+ * @return the data stream constructed
+ */
+ private DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
+ WriteFormatAsCsv<T> format, int batchSize, T endTuple) {
+ DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
+ format, batchSize, endTuple), null);
+ jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
+ jobGraphBuilder.setMutability(returnStream.getId(), false);
+ return returnStream;
+ }
+
+ /**
* Initiates an iterative part of the program that executes multiple times
* and feeds back data streams. The iterative part needs to be closed by
* calling {@link IterativeDataStream#closeWith(DataStream)}. The data
@@ -688,9 +926,12 @@ public class DataStream<T extends Tuple> {
return new IterativeDataStream<T>(this);
}
- protected DataStream<T> addIterationSource(String iterationID) {
- environment.addIterationSource(this, iterationID);
+ protected <R extends Tuple> DataStream<T> addIterationSource(String iterationID) {
+ DataStream<R> returnStream = new DataStream<R>(environment, "iterationSource");
+
+ jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
+ degreeOfParallelism);
+
return new DataStream<T>(this);
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
index c179f83..1cfb625 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
@@ -20,6 +20,7 @@
package org.apache.flink.streaming.api;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.partitioner.ForwardPartitioner;
/**
* The iterative data stream represents the start of an iteration in a
@@ -63,14 +64,26 @@ public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
*
* @param iterationResult
* The data stream that can be fed back to the next iteration.
- * @param directName
+ * @param iterationName
* Name of the iteration edge (backward edge to iteration head)
* when used with directed emits
*
*/
- public DataStream<T> closeWith(DataStream<T> iterationResult, String directName) {
- environment.addIterationSink(iterationResult, iterationID.toString(), directName);
+ public <R extends Tuple> DataStream<T> closeWith(DataStream<T> iterationResult,
+ String iterationName) {
+ DataStream<R> returnStream = new DataStream<R>(environment, "iterationSink");
+
+ jobGraphBuilder.addIterationSink(returnStream.getId(), iterationResult.getId(),
+ iterationID.toString(), iterationResult.getParallelism(), iterationName);
+
+ jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
+ iterationResult.getParallelism());
+
+ for (int i = 0; i < iterationResult.connectIDs.size(); i++) {
+ String inputID = iterationResult.connectIDs.get(i);
+ jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
+ }
+
return iterationResult;
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index fc4a1dd..3d49928 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
@@ -30,6 +31,7 @@ import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
public class StreamConfig {
private static final String INPUT_TYPE = "inputType_";
@@ -71,6 +73,23 @@ public class StreamConfig {
// CONFIGS
+ public void setTypeWrapper(
+ TypeSerializerWrapper<? extends Tuple, ? extends Tuple, ? extends Tuple> typeWrapper) {
+ config.setBytes("typeWrapper", SerializationUtils.serialize(typeWrapper));
+ }
+
+ @SuppressWarnings("unchecked")
+ public <IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> TypeSerializerWrapper<IN1, IN2, OUT> getTypeWrapper() {
+ byte[] serializedWrapper = config.getBytes("typeWrapper", null);
+
+ if (serializedWrapper == null) {
+ throw new RuntimeException("TypeSerializationWrapper must be set");
+ }
+
+ return (TypeSerializerWrapper<IN1, IN2, OUT>) SerializationUtils
+ .deserialize(serializedWrapper);
+ }
+
public void setMutability(boolean isMutable) {
config.setBoolean(MUTABILITY, isMutable);
}
@@ -87,25 +106,26 @@ public class StreamConfig {
return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
}
- public void setUserInvokableClass(Class<? extends StreamComponentInvokable> clazz) {
- config.setClass(USER_FUNCTION, clazz);
- }
+ public void setUserInvokable(StreamComponentInvokable<? extends Tuple> invokableObject) {
+ if (invokableObject != null) {
+ config.setClass(USER_FUNCTION, invokableObject.getClass());
- @SuppressWarnings("unchecked")
- public <T extends StreamComponentInvokable> Class<? extends T> getUserInvokableClass() {
- return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
- }
-
- public void setUserInvokableObject(StreamComponentInvokable invokableObject) {
- try {
- config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize invokable object "
- + invokableObject.getClass(), e);
+ try {
+ config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(invokableObject));
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize invokable object "
+ + invokableObject.getClass(), e);
+ }
}
}
- public <T extends StreamComponentInvokable> T getUserInvokableObject() {
+ // @SuppressWarnings("unchecked")
+ // public <T extends StreamComponentInvokable> Class<? extends T>
+ // getUserInvokableClass() {
+ // return (Class<? extends T>) config.getClass(USER_FUNCTION, null);
+ // }
+
+ public <T extends Tuple> StreamComponentInvokable<T> getUserInvokableObject() {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
} catch (Exception e) {
@@ -122,27 +142,29 @@ public class StreamConfig {
return config.getString(COMPONENT_NAME, null);
}
- public void setFunction(byte[] serializedFunction) {
- config.setBytes(FUNCTION, serializedFunction);
+ public void setFunction(byte[] serializedFunction, String functionName) {
+ if (serializedFunction != null) {
+ config.setBytes(FUNCTION, serializedFunction);
+ config.setString(FUNCTION_NAME, functionName);
+ }
}
public Object getFunction() {
try {
- return SerializationUtils.deserialize(config
- .getBytes(FUNCTION, null));
+ return SerializationUtils.deserialize(config.getBytes(FUNCTION, null));
} catch (SerializationException e) {
throw new RuntimeException("Cannot deserialize invokable object", e);
}
}
- public void setFunctionName(String functionName) {
- config.setString(FUNCTION_NAME, functionName);
- }
+ // public void setFunctionName(String functionName) {
+ // config.setString(FUNCTION_NAME, functionName);
+ // }
public String getFunctionName() {
return config.getString(FUNCTION_NAME, "");
}
-
+
public void setUserDefinedName(String userDefinedName) {
if (userDefinedName != null) {
config.setString(USER_DEFINED_NAME, userDefinedName);
@@ -158,8 +180,10 @@ public class StreamConfig {
}
public void setOutputSelector(byte[] outputSelector) {
- config.setBytes(OUTPUT_SELECTOR, outputSelector);
-
+ if (outputSelector != null) {
+ setDirectedEmit(true);
+ config.setBytes(OUTPUT_SELECTOR, outputSelector);
+ }
}
public <T extends Tuple> OutputSelector<T> getOutputSelector() {
@@ -174,7 +198,7 @@ public class StreamConfig {
public void setIterationId(String iterationId) {
config.setString(ITERATION_ID, iterationId);
}
-
+
public String getIterationId() {
return config.getString(ITERATION_ID, "iteration-0");
}
@@ -233,7 +257,16 @@ public class StreamConfig {
public int getInputType(int inputNumber) {
return config.getInteger(INPUT_TYPE + inputNumber, 0);
}
-
+
+ public void setFunctionClass(Class<? extends AbstractFunction> functionClass) {
+ config.setClass("functionClass", functionClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Class<? extends AbstractFunction> getFunctionClass() {
+ return (Class<? extends AbstractFunction>) config.getClass("functionClass", null);
+ }
+
@SuppressWarnings("unchecked")
protected static <T> T deserializeObject(byte[] serializedObject) throws IOException,
ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
index f56614d..4539126 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamExecutionEnvironment.java
@@ -24,27 +24,17 @@ import java.util.Collection;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
-import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
-import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.function.source.FileSourceFunction;
import org.apache.flink.streaming.api.function.source.FileStreamFunction;
import org.apache.flink.streaming.api.function.source.FromElementsFunction;
import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.invokable.SinkInvokable;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
/**
* {@link ExecutionEnvironment} for streaming jobs. An instance of it is
@@ -124,13 +114,14 @@ public abstract class StreamExecutionEnvironment {
this.degreeOfParallelism = degreeOfParallelism;
}
- protected void setMutability(DataStream<?> stream, boolean isMutable) {
- jobGraphBuilder.setMutability(stream.getId(), isMutable);
- }
-
- protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout) {
- jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
- }
+ // protected void setMutability(DataStream<?> stream, boolean isMutable) {
+ // jobGraphBuilder.setMutability(stream.getId(), isMutable);
+ // }
+ //
+ // protected void setBufferTimeout(DataStream<?> stream, long bufferTimeout)
+ // {
+ // jobGraphBuilder.setBufferTimeout(stream.getId(), bufferTimeout);
+ // }
/**
* Sets the number of hardware contexts (CPU cores / threads) used when
@@ -204,8 +195,11 @@ public abstract class StreamExecutionEnvironment {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
try {
- jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
- "elements", SerializationUtils.serialize(data[0]), 1);
+ SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
+ jobGraphBuilder.addSource(returnStream.getId(),
+ new SourceInvokable<Tuple1<X>>(function),
+ new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data[0], null, data[0]),
+ "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
}
@@ -232,8 +226,14 @@ public abstract class StreamExecutionEnvironment {
}
try {
- jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
- "elements", SerializationUtils.serialize((Serializable) data.toArray()[0]), 1);
+ SourceFunction<Tuple1<X>> function = new FromElementsFunction<X>(data);
+
+ jobGraphBuilder
+ .addSource(returnStream.getId(), new SourceInvokable<Tuple1<X>>(
+ new FromElementsFunction<X>(data)),
+ new ObjectTypeWrapper<Tuple1<X>, Tuple, Tuple1<X>>(data.toArray()[0],
+ null, data.toArray()[0]), "source", SerializationUtils
+ .serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
}
@@ -257,7 +257,7 @@ public abstract class StreamExecutionEnvironment {
/**
* Ads a data source thus opening a {@link DataStream}.
*
- * @param sourceFunction
+ * @param function
* the user defined function
* @param parallelism
* number of parallel instances of the function
@@ -265,13 +265,13 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return the data stream constructed
*/
- public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction,
- int parallelism) {
+ public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> function, int parallelism) {
DataStream<T> returnStream = new DataStream<T>(this, "source");
try {
- jobGraphBuilder.addSource(returnStream.getId(), sourceFunction, "source",
- SerializationUtils.serialize(sourceFunction), parallelism);
+ jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<T>(function),
+ new FunctionTypeWrapper<T, Tuple, T>(function, SourceFunction.class, 0, -1, 0),
+ "source", SerializationUtils.serialize(function), parallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
@@ -284,326 +284,6 @@ public abstract class StreamExecutionEnvironment {
}
// --------------------------------------------------------------------------------------------
- // Data stream operators and sinks
- // --------------------------------------------------------------------------------------------
-
- /**
- * Internal function for passing the user defined functions to the JobGraph
- * of the job.
- *
- * @param functionName
- * name of the function
- * @param inputStream
- * input data stream
- * @param function
- * the user defined function
- * @param functionInvokable
- * the wrapping JobVertex instance
- * @param <T>
- * type of the input stream
- * @param <R>
- * type of the return stream
- * @return the data stream constructed
- */
- protected <T extends Tuple, R extends Tuple> StreamOperator<T, R> addFunction(
- String functionName, DataStream<T> inputStream, final AbstractFunction function,
- UserTaskInvokable<T, R> functionInvokable) {
- StreamOperator<T, R> returnStream = new StreamOperator<T, R>(this, functionName);
-
- try {
- jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, functionName,
- SerializationUtils.serialize(function), degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize user defined function");
- }
-
- connectGraph(inputStream, returnStream.getId(), 0);
-
- if (inputStream.iterationflag) {
- returnStream.addIterationSource(inputStream.iterationID.toString());
- inputStream.iterationflag = false;
- }
-
- return returnStream;
- }
-
- protected <T1 extends Tuple, T2 extends Tuple, R extends Tuple> DataStream<R> addCoFunction(String functionName, DataStream<T1> inputStream1, DataStream<T2> inputStream2, final AbstractFunction function,
- CoInvokable<T1, T2, R> functionInvokable) {
-
- DataStream<R> returnStream = new DataStream<R>(this, functionName);
-
- try {
- jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, functionName,
- SerializationUtils.serialize(function), degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize user defined function");
- }
-
- connectGraph(inputStream1, returnStream.getId(), 1);
- connectGraph(inputStream2, returnStream.getId(), 2);
-
- // TODO consider iteration
-// if (inputStream.iterationflag) {
-// returnStream.addIterationSource(inputStream.iterationID.toString());
-// inputStream.iterationflag = false;
-// }
-
- return returnStream;
- }
-
- protected <T extends Tuple, R extends Tuple> void addIterationSource(DataStream<T> inputStream,
- String iterationID) {
- DataStream<R> returnStream = new DataStream<R>(this, "iterationSource");
-
- jobGraphBuilder.addIterationSource(returnStream.getId(), inputStream.getId(), iterationID,
- degreeOfParallelism);
-
- }
-
- protected <T extends Tuple, R extends Tuple> void addIterationSink(DataStream<T> inputStream,
- String iterationID, String iterationName) {
- DataStream<R> returnStream = new DataStream<R>(this, "iterationSink");
-
- jobGraphBuilder.addIterationSink(returnStream.getId(), inputStream.getId(), iterationID,
- inputStream.getParallelism(), iterationName);
-
- jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
-
- for (int i = 0; i < inputStream.connectIDs.size(); i++) {
- String inputID = inputStream.connectIDs.get(i);
- jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
- }
- }
-
- /**
- * Adds the given sink to this environment. Only streams with sinks added
- * will be executed once the {@link #execute()} method is called.
- *
- * @param inputStream
- * input data stream
- * @param sinkFunction
- * the user defined function
- * @param <T>
- * type of the returned stream
- * @return the data stream constructed
- */
- protected <T extends Tuple> DataStream<T> addSink(DataStream<T> inputStream,
- SinkFunction<T> sinkFunction) {
- DataStream<T> returnStream = new DataStream<T>(this, "sink");
-
- try {
- jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
- "sink", SerializationUtils.serialize(sinkFunction), degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize SinkFunction");
- }
-
- connectGraph(inputStream, returnStream.getId(), 0);
-
- return returnStream;
- }
-
- <T extends Tuple> void addDirectedEmit(String id, OutputSelector<T> outputSelector) {
- try {
- jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize OutputSelector");
- }
- }
-
- /**
- * Writes a DataStream to the standard output stream (stdout). For each
- * element of the DataStream the result of {@link Object#toString()} is
- * written.
- *
- * @param inputStream
- * the input data stream
- *
- * @param <T>
- * type of the returned stream
- * @return the data stream constructed
- */
- protected <T extends Tuple> DataStream<T> print(DataStream<T> inputStream) {
- DataStream<T> returnStream = addSink(inputStream, new PrintSinkFunction<T>());
-
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
-
- return returnStream;
- }
-
- /**
- * Writes a DataStream to the file specified by path in text format. The
- * writing is performed periodically, in every millis milliseconds. For
- * every element of the DataStream the result of {@link Object#toString()}
- * is written.
- *
- * @param path
- * is the path to the location where the tuples are written
- * @param millis
- * is the file update frequency
- * @param endTuple
- * is a special tuple indicating the end of the stream. If an
- * endTuple is caught, the last pending batch of tuples will be
- * immediately appended to the target file regardless of the
- * system time.
- *
- * @return the data stream constructed
- */
- protected <T extends Tuple> DataStream<T> writeAsText(DataStream<T> inputStream, String path,
- WriteFormatAsText<T> format, long millis, T endTuple) {
- DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
- format, millis, endTuple));
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
- jobGraphBuilder.setMutability(returnStream.getId(), false);
- return returnStream;
- }
-
- /**
- * Writes a DataStream to the file specified by path in text format. The
- * writing is performed periodically in equally sized batches. For every
- * element of the DataStream the result of {@link Object#toString()} is
- * written.
- *
- * @param path
- * is the path to the location where the tuples are written
- * @param batchSize
- * is the size of the batches, i.e. the number of tuples written
- * to the file at a time
- * @param endTuple
- * is a special tuple indicating the end of the stream. If an
- * endTuple is caught, the last pending batch of tuples will be
- * immediately appended to the target file regardless of the
- * batchSize.
- *
- * @return the data stream constructed
- */
- protected <T extends Tuple> DataStream<T> writeAsText(DataStream<T> inputStream, String path,
- WriteFormatAsText<T> format, int batchSize, T endTuple) {
- DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
- format, batchSize, endTuple));
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
- jobGraphBuilder.setMutability(returnStream.getId(), false);
- return returnStream;
- }
-
- /**
- * Writes a DataStream to the file specified by path in csv format. The
- * writing is performed periodically, in every millis milliseconds. For
- * every element of the DataStream the result of {@link Object#toString()}
- * is written.
- *
- * @param path
- * is the path to the location where the tuples are written
- * @param millis
- * is the file update frequency
- * @param endTuple
- * is a special tuple indicating the end of the stream. If an
- * endTuple is caught, the last pending batch of tuples will be
- * immediately appended to the target file regardless of the
- * system time.
- *
- * @return the data stream constructed
- */
- protected <T extends Tuple> DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
- WriteFormatAsCsv<T> format, long millis, T endTuple) {
- DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<T>(path,
- format, millis, endTuple));
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
- jobGraphBuilder.setMutability(returnStream.getId(), false);
- return returnStream;
- }
-
- /**
- * Writes a DataStream to the file specified by path in csv format. The
- * writing is performed periodically in equally sized batches. For every
- * element of the DataStream the result of {@link Object#toString()} is
- * written.
- *
- * @param path
- * is the path to the location where the tuples are written
- * @param batchSize
- * is the size of the batches, i.e. the number of tuples written
- * to the file at a time
- * @param endTuple
- * is a special tuple indicating the end of the stream. If an
- * endTuple is caught, the last pending batch of tuples will be
- * immediately appended to the target file regardless of the
- * batchSize.
- *
- * @return the data stream constructed
- */
- protected <T extends Tuple> DataStream<T> writeAsCsv(DataStream<T> inputStream, String path,
- WriteFormatAsCsv<T> format, int batchSize, T endTuple) {
- DataStream<T> returnStream = addSink(inputStream, new WriteSinkFunctionByBatches<T>(path,
- format, batchSize, endTuple));
- jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
- jobGraphBuilder.setMutability(returnStream.getId(), false);
- return returnStream;
- }
-
- /**
- * Internal function for assembling the underlying
- * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
- * the outputs of the given input stream to the specified output stream
- * given by the outputID.
- *
- * @param inputStream
- * input data stream
- * @param outputID
- * ID of the output
- * @param <T>
- * type of the input stream
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- private <T extends Tuple> void connectGraph(DataStream<T> inputStream, String outputID, int typeNumber) {
-
- for (int i = 0; i < inputStream.connectIDs.size(); i++) {
- String inputID = inputStream.connectIDs.get(i);
- StreamPartitioner<T> partitioner = inputStream.partitioners.get(i);
-
- jobGraphBuilder.setEdge(inputID, outputID, partitioner,
- typeNumber);
- }
- }
-
- protected <T extends Tuple> void setName(DataStream<T> stream, String name) {
- jobGraphBuilder.setUserDefinedName(stream.getId(), name);
- }
-
- /**
- * Sets the proper parallelism for the given operator in the JobGraph
- *
- * @param inputStream
- * DataStream corresponding to the operator
- * @param <T>
- * type of the operator
- */
- protected <T extends Tuple> void setOperatorParallelism(DataStream<T> inputStream) {
- jobGraphBuilder.setParallelism(inputStream.getId(), inputStream.degreeOfParallelism);
- }
-
- // /**
- // * Converts object to byte array using default java serialization
- // *
- // * @param object
- // * Object to be serialized
- // * @return Serialized object
- // */
- // static byte[] serializeToByteArray(Serializable object) {
- // SerializationUtils.serialize(object);
- // ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // ObjectOutputStream oos;
- // try {
- // oos = new ObjectOutputStream(baos);
- // oos.writeObject(object);
- // } catch (IOException e) {
- // throw new RuntimeException("Cannot serialize object: " + object);
- // }
- // return baos.toByteArray();
- // }
-
- // --------------------------------------------------------------------------------------------
// Instantiation of Execution Contexts
// --------------------------------------------------------------------------------------------
@@ -741,7 +421,7 @@ public abstract class StreamExecutionEnvironment {
*
* @return jobgraph
*/
- public JobGraphBuilder jobGB() {
+ public JobGraphBuilder getJobGraphBuilder() {
return jobGraphBuilder;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index f016cbc..f6c2c72 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -28,14 +28,14 @@ import org.apache.flink.util.Collector;
public class FileSourceFunction extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
-
+
private final String path;
private Tuple1<String> outTuple = new Tuple1<String>();
-
+
public FileSourceFunction(String path) {
this.path = path;
}
-
+
@Override
public void invoke(Collector<Tuple1<String>> collector) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(path));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
index c3ccedf..edadfc3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
@@ -26,19 +26,19 @@ import java.io.IOException;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.util.Collector;
-public class FileStreamFunction extends SourceFunction<Tuple1<String>>{
+public class FileStreamFunction extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
-
+
private final String path;
private Tuple1<String> outTuple = new Tuple1<String>();
-
+
public FileStreamFunction(String path) {
this.path = path;
}
-
+
@Override
public void invoke(Collector<Tuple1<String>> collector) throws IOException {
- while(true){
+ while (true) {
BufferedReader br = new BufferedReader(new FileReader(path));
String line = br.readLine();
while (line != null) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 70553bf..971533f 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -19,12 +19,13 @@
package org.apache.flink.streaming.api.function.source;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
-
+import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.util.Collector;
-public abstract class SourceFunction<OUT extends Tuple> extends UserSourceInvokable<OUT> {
+public abstract class SourceFunction<OUT extends Tuple> extends AbstractFunction {
private static final long serialVersionUID = 1L;
+ public abstract void invoke(Collector<OUT> collector) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
new file mode 100644
index 0000000..992a25e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+
+public class SourceInvokable<OUT extends Tuple> extends StreamComponentInvokable<OUT> implements
+ Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private SourceFunction<OUT> sourceFunction;
+
+ public SourceInvokable() {
+ }
+
+ public SourceInvokable(SourceFunction<OUT> sourceFunction) {
+ this.sourceFunction = sourceFunction;
+ }
+
+ public void invoke() throws Exception {
+ sourceFunction.invoke(collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
index 0e8ea98..daa7378 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamComponentInvokable.java
@@ -21,7 +21,10 @@ package org.apache.flink.streaming.api.invokable;
import java.io.Serializable;
-public abstract class StreamComponentInvokable implements Serializable {
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.util.Collector;
+
+public abstract class StreamComponentInvokable<OUT extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
@@ -29,6 +32,11 @@ public abstract class StreamComponentInvokable implements Serializable {
private String componentName;
@SuppressWarnings("unused")
private int channelID;
+ protected Collector<OUT> collector;
+
+ public void setCollector(Collector<OUT> collector) {
+ this.collector = collector;
+ }
public void setAttributes(String componentName, int channelID) {
this.componentName = componentName;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 6beec27..5be3c30 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -28,11 +28,10 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
- StreamComponentInvokable {
+ StreamComponentInvokable<OUT> {
private static final long serialVersionUID = 1L;
- protected Collector<OUT> collector;
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
StreamRecordSerializer<IN> serializer;
protected StreamRecord<IN> reuse;
@@ -41,7 +40,7 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
public void initialize(Collector<OUT> collector,
MutableObjectIterator<StreamRecord<IN>> recordIterator,
StreamRecordSerializer<IN> serializer, boolean isMutable) {
- this.collector = collector;
+ setCollector(collector);
this.recordIterator = recordIterator;
this.serializer = serializer;
this.reuse = serializer.createInstance();
@@ -51,7 +50,7 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
protected void resetReuse() {
this.reuse = serializer.createInstance();
}
-
+
protected StreamRecord<IN> loadNextRecord() {
try {
reuse = recordIterator.next(reuse);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java
deleted file mode 100644
index e85b563..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/UserSourceInvokable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.util.Collector;
-
-public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponentInvokable implements
- Serializable {
-
- private static final long serialVersionUID = 1L;
-
- public abstract void invoke(Collector<OUT> collector) throws Exception;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index e881d57..f7ea566 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -28,5 +28,4 @@ public abstract class StreamReduceInvokable<IN extends Tuple, OUT extends Tuple>
private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer;
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 85086f9..884e361 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
- StreamComponentInvokable {
+ StreamComponentInvokable<OUT> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index 22c079c..fcf87e2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -26,13 +26,8 @@ import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -43,17 +38,18 @@ import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.collector.StreamCollector;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
public abstract class AbstractStreamComponent<OUT extends Tuple> extends AbstractInvokable {
+ protected static final String SOURCE = "source";
+
private static final Log LOG = LogFactory.getLog(AbstractStreamComponent.class);
protected TupleTypeInfo<OUT> outTupleTypeInfo = null;
@@ -61,6 +57,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
protected SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
protected StreamConfig configuration;
+ protected TypeSerializerWrapper<? extends Tuple, ? extends Tuple, OUT> typeWrapper;
protected StreamCollector<OUT> collector;
protected int instanceID;
protected String name;
@@ -68,19 +65,27 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
protected boolean isMutable;
protected Object function;
protected String functionName;
-
+
protected static int newComponent() {
numComponents++;
return numComponents;
}
+ @Override
+ public void registerInputOutput() {
+ initialize();
+ setInputsOutputs();
+ setInvokable();
+ setCollector();
+ }
+
protected void initialize() {
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getComponentName();
this.isMutable = configuration.getMutability();
this.functionName = configuration.getFunctionName();
this.function = configuration.getFunction();
-
+ this.typeWrapper = configuration.getTypeWrapper();
}
protected Collector<OUT> setCollector() {
@@ -96,40 +101,12 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
}
protected void setSerializers() {
- try {
- if (functionName.equals("flatMap")) {
- setSerializer(function, FlatMapFunction.class, 1);
- } else if (functionName.equals("map")) {
- setSerializer(function, MapFunction.class, 1);
- } else if (functionName.equals("batchReduce")) {
- setSerializer(function, GroupReduceFunction.class, 1);
- } else if (functionName.equals("filter")) {
- setSerializer(function, FilterFunction.class, 0);
- } else if (functionName.equals("source")) {
- setSerializer(function, UserSourceInvokable.class, 0);
- } else if (functionName.equals("coMap")) {
- setSerializer(function, CoMapFunction.class, 2);
- } else if (functionName.equals("elements")) {
- outTupleTypeInfo = new TupleTypeInfo<OUT>(TypeExtractor.getForObject(function));
-
- outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
- outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(
- outTupleSerializer);
- } else {
- throw new Exception("Wrong operator name: " + functionName);
- }
- } catch (Exception e) {
- throw new StreamComponentException(e);
- }
-
+ setSerializer();
}
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- protected void setSerializer(Object function, Class<?> clazz, int typeParameter) {
- outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
- typeParameter, null, null);
- outTupleSerializer = new StreamRecordSerializer(outTupleTypeInfo.createSerializer());
+ protected void setSerializer() {
+ outTupleTypeInfo = typeWrapper.getOutputTupleTypeInfo();
+ outTupleSerializer = new StreamRecordSerializer<OUT>(outTupleTypeInfo.createSerializer());
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outTupleSerializer);
}
@@ -137,7 +114,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
setSerializers();
setCollector();
-
+
int numberOfOutputs = configuration.getNumberOfOutputs();
for (int i = 0; i < numberOfOutputs; i++) {
@@ -148,7 +125,7 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
private void setPartitioner(int outputNumber,
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) {
StreamPartitioner<OUT> outputPartitioner = null;
-
+
try {
outputPartitioner = configuration.getPartitioner(outputNumber);
@@ -189,8 +166,9 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
* Class of the invokable function
* @return The StreamComponent object
*/
- protected <T extends StreamComponentInvokable> T getInvokable() {
- return configuration.getUserInvokableObject();
+ @SuppressWarnings("unchecked")
+ protected <T extends StreamComponentInvokable<OUT>> T getInvokable() {
+ return (T) configuration.getUserInvokableObject();
}
protected <IN extends Tuple> MutableObjectIterator<StreamRecord<IN>> createInputIterator(
@@ -210,16 +188,8 @@ public abstract class AbstractStreamComponent<OUT extends Tuple> extends Abstrac
return (T) SerializationUtils.deserialize(serializedObject);
}
-
- @Override
- public void registerInputOutput() {
- initialize();
- setInputsOutputs();
- setInvokable();
- }
-
protected abstract void setInputsOutputs();
-
+
protected abstract void setInvokable();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
index c8eca70..52de3d1 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/BlockingQueueBroker.java
@@ -25,13 +25,14 @@ import org.apache.flink.runtime.iterative.concurrent.Broker;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>>{
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
/**
* Singleton instance
*/
private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
- private BlockingQueueBroker() {}
+ private BlockingQueueBroker() {
+ }
/**
* retrieve singleton instance
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 4d531f7..0c02c16 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.functions.AbstractFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
@@ -45,8 +44,8 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
AbstractStreamComponent<OUT> {
private static final Log LOG = LogFactory.getLog(CoStreamTask.class);
- protected StreamRecordSerializer<IN1> inTupleSerializer1 = null;
- protected StreamRecordSerializer<IN2> inTupleSerializer2 = null;
+ protected StreamRecordSerializer<IN1> inTupleDeserializer1 = null;
+ protected StreamRecordSerializer<IN2> inTupleDeserializer2 = null;
private MutableReader<IOReadableWritable> inputs1;
private MutableReader<IOReadableWritable> inputs2;
@@ -54,24 +53,25 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
MutableObjectIterator<StreamRecord<IN2>> inputIter2;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
- private CoInvokable<IN1, IN2, OUT> userFunction;
+ private CoInvokable<IN1, IN2, OUT> userInvokable;
private static int numTasks;
public CoStreamTask() {
outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
- userFunction = null;
+ userInvokable = null;
numTasks = newComponent();
instanceID = numTasks;
}
+ @Override
protected void setSerializers() {
String operatorName = configuration.getFunctionName();
Object function = configuration.getFunction();
try {
if (operatorName.equals("coMap")) {
- setSerializer(function, CoMapFunction.class, 2);
+ setSerializer();
setDeserializers(function, CoMapFunction.class);
} else {
throw new Exception("Wrong operator name!");
@@ -83,13 +83,11 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializers(Object function, Class<? extends AbstractFunction> clazz) {
- TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz,
- function.getClass(), 0, null, null);
- inTupleSerializer1 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+ TupleTypeInfo<IN1> inTupleTypeInfo = (TupleTypeInfo<IN1>) typeWrapper.getInputTupleTypeInfo1();
+ inTupleDeserializer1 = new StreamRecordSerializer<IN1>(inTupleTypeInfo.createSerializer());
- inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
- 1, null, null);
- inTupleSerializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+ inTupleTypeInfo = (TupleTypeInfo<IN1>) typeWrapper.getInputTupleTypeInfo2();
+ inTupleDeserializer2 = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
}
@Override
@@ -97,15 +95,15 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
setConfigOutputs(outputs);
setConfigInputs();
- inputIter1 = createInputIterator(inputs1, inTupleSerializer1);
- inputIter2 = createInputIterator(inputs2, inTupleSerializer2);
+ inputIter1 = createInputIterator(inputs1, inTupleDeserializer1);
+ inputIter2 = createInputIterator(inputs2, inTupleDeserializer2);
}
-
+
@Override
protected void setInvokable() {
- userFunction = getInvokable();
- userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
- inTupleSerializer2, isMutable);
+ userInvokable = getInvokable();
+ userInvokable.initialize(collector, inputIter1, inTupleDeserializer1, inputIter2,
+ inTupleDeserializer2, isMutable);
}
protected void setConfigInputs() throws StreamComponentException {
@@ -156,7 +154,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
output.initializeSerializers();
}
- userFunction.invoke();
+ userInvokable.invoke();
if (LOG.isDebugEnabled()) {
LOG.debug("TASK " + name + " invoke finished with instance id " + instanceID);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799424d1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
index 0b5b377..8355b78 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/SingleInputAbstractStreamComponent.java
@@ -19,62 +19,38 @@
package org.apache.flink.streaming.api.streamcomponent;
-import org.apache.flink.api.common.functions.AbstractFunction;
-import org.apache.flink.api.java.functions.FilterFunction;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.GroupReduceFunction;
-import org.apache.flink.api.java.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.invokable.UserSourceInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.MutableObjectIterator;
-public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple> extends
- AbstractStreamComponent<OUT> {
+public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT extends Tuple>
+ extends AbstractStreamComponent<OUT> {
protected StreamRecordSerializer<IN> inTupleSerializer = null;
protected MutableObjectIterator<StreamRecord<IN>> inputIter;
protected MutableReader<IOReadableWritable> inputs;
protected void setDeserializers() {
- try {
- if (functionName.equals("flatMap")) {
- setDeserializer(function, FlatMapFunction.class);
- } else if (functionName.equals("map")) {
- setDeserializer(function, MapFunction.class);
- } else if (functionName.equals("batchReduce")) {
- setDeserializer(function, GroupReduceFunction.class);
- } else if (functionName.equals("filter")) {
- setDeserializer(function, FilterFunction.class);
- } else if (functionName.equals("source")) {
- setSerializer(function, UserSourceInvokable.class, 0);
- } else if (functionName.equals("sink")) {
- setDeserializer(function, SinkFunction.class);
- } else {
- throw new Exception("Wrong operator name: " + functionName);
- }
-
- } catch (Exception e) {
- throw new StreamComponentException(e);
+ if (functionName.equals(SOURCE)) {
+ setSerializer();
+ } else {
+ setDeserializer();
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private void setDeserializer(Object function, Class<? extends AbstractFunction> clazz) {
- TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(),
- 0, null, null);
-
- inTupleSerializer = new StreamRecordSerializer(inTupleTypeInfo.createSerializer());
+ @SuppressWarnings("unchecked")
+ private void setDeserializer() {
+ TupleTypeInfo<IN> inTupleTypeInfo = (TupleTypeInfo<IN>) typeWrapper
+ .getInputTupleTypeInfo1();
+ inTupleSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo.createSerializer());
}
-
+
@SuppressWarnings("unchecked")
protected void setSinkSerializer() {
if (outSerializationDelegate != null) {
@@ -87,7 +63,7 @@ public abstract class SingleInputAbstractStreamComponent<IN extends Tuple, OUT e
@SuppressWarnings("unchecked")
protected void setConfigInputs() throws StreamComponentException {
setDeserializers();
-
+
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs < 2) {