You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:07 UTC
[30/51] [abbrv] git commit: [streaming] DataStream type refactor
[streaming] DataStream type refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f932700a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f932700a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f932700a
Branch: refs/heads/master
Commit: f932700ae4e7e6110b729afa8530ac029ed0b700
Parents: 910f74d
Author: gyfora <gy...@gmail.com>
Authored: Sun Aug 3 22:47:23 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 509 +++++++++----------
.../streaming/api/IterativeDataStream.java | 19 +-
.../flink/streaming/api/SplitDataStream.java | 7 +-
.../flink/streaming/api/StreamOperator.java | 8 +-
.../api/collector/DirectedOutputTest.java | 59 ++-
5 files changed, 318 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index c648ab2..10f8114 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -77,16 +77,14 @@ public class DataStream<T> {
protected String id;
protected int degreeOfParallelism;
protected String userDefinedName;
- protected List<String> connectIDs;
- protected List<StreamPartitioner<T>> partitioners;
- protected boolean iterationflag;
- protected Integer iterationID;
+ protected StreamPartitioner<T> partitioner;
+ protected List<DataStream<T>> connectedStreams;
protected JobGraphBuilder jobGraphBuilder;
/**
* Create a new {@link DataStream} in the given execution environment with
- * partitioning set to shuffle by default.
+ * partitioning set to forward by default.
*
* @param environment
* StreamExecutionEnvironment
@@ -104,7 +102,9 @@ public class DataStream<T> {
this.environment = environment;
this.degreeOfParallelism = environment.getDegreeOfParallelism();
this.jobGraphBuilder = environment.getJobGraphBuilder();
- initConnections();
+ this.partitioner = new ForwardPartitioner<T>();
+ this.connectedStreams = new ArrayList<DataStream<T>>();
+ this.connectedStreams.add(this.copy());
}
/**
@@ -118,11 +118,13 @@ public class DataStream<T> {
this.id = dataStream.id;
this.degreeOfParallelism = dataStream.degreeOfParallelism;
this.userDefinedName = dataStream.userDefinedName;
- this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
- this.partitioners = new ArrayList<StreamPartitioner<T>>(dataStream.partitioners);
- this.iterationflag = dataStream.iterationflag;
- this.iterationID = dataStream.iterationID;
+ this.partitioner = dataStream.partitioner;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
+ this.connectedStreams = new ArrayList<DataStream<T>>();
+ for (DataStream<T> stream : dataStream.connectedStreams) {
+ this.connectedStreams.add(stream.copy());
+ }
+
}
/**
@@ -142,17 +144,6 @@ public class DataStream<T> {
}
/**
- * Initialize the connection and partitioning among the connected
- * {@link DataStream}s.
- */
- private void initConnections() {
- connectIDs = new ArrayList<String>();
- connectIDs.add(getId());
- partitioners = new ArrayList<StreamPartitioner<T>>();
- partitioners.add(new ForwardPartitioner<T>());
- }
-
- /**
* Returns the ID of the {@link DataStream}.
*
* @return ID of the DataStream
@@ -162,31 +153,12 @@ public class DataStream<T> {
}
/**
- * Sets the mutability of the operator represented by the DataStream. If the
- * operator is set to mutable, the tuples received in the user defined
- * functions, will be reused after the function call. Setting an operator to
- * mutable greatly reduces garbage collection overhead and thus scalability.
- *
- * @param isMutable
- * The mutability of the operator.
- * @return The DataStream with mutability set.
- */
- public DataStream<T> setMutability(boolean isMutable) {
- jobGraphBuilder.setMutability(id, isMutable);
- return this;
- }
-
- /**
- * Sets the maximum time frequency (ms) for the flushing of the output
- * buffer. By default the output buffers flush only when they are full.
+ * Gets the degree of parallelism for this operator.
*
- * @param timeoutMillis
- * The maximum time between two output flushes.
- * @return The DataStream with buffer timeout set.
+ * @return The parallelism set for this operator.
*/
- public DataStream<T> setBufferTimeout(long timeoutMillis) {
- jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
- return this;
+ public int getParallelism() {
+ return this.degreeOfParallelism;
}
/**
@@ -202,39 +174,37 @@ public class DataStream<T> {
throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
}
this.degreeOfParallelism = dop;
-
+
jobGraphBuilder.setParallelism(id, degreeOfParallelism);
-
+
return this;
}
/**
- * Gets the degree of parallelism for this operator.
+ * Sets the mutability of the operator represented by the DataStream. If the
+ * operator is set to mutable, the tuples received in the user defined
+ * functions, will be reused after the function call. Setting an operator to
+ * mutable greatly reduces garbage collection overhead and thus scalability.
*
- * @return The parallelism set for this operator.
+ * @param isMutable
+ * The mutability of the operator.
+ * @return The DataStream with mutability set.
*/
- public int getParallelism() {
- return this.degreeOfParallelism;
+ public DataStream<T> setMutability(boolean isMutable) {
+ jobGraphBuilder.setMutability(id, isMutable);
+ return this;
}
/**
- * Gives the data transformation(vertex) a user defined name in order to use
- * with directed outputs. The {@link OutputSelector} of the input vertex
- * should use this name for directed emits.
+ * Sets the maximum time frequency (ms) for the flushing of the output
+ * buffer. By default the output buffers flush only when they are full.
*
- * @param name
- * The name to set
- * @return The named DataStream.
+ * @param timeoutMillis
+ * The maximum time between two output flushes.
+ * @return The DataStream with buffer timeout set.
*/
- protected DataStream<T> name(String name) {
- // TODO copy DataStream?
- if (name == "") {
- throw new IllegalArgumentException("User defined name must not be empty string");
- }
-
- userDefinedName = name;
- jobGraphBuilder.setUserDefinedName(id, name);
-
+ public DataStream<T> setBufferTimeout(long timeoutMillis) {
+ jobGraphBuilder.setBufferTimeout(id, timeoutMillis);
return this;
}
@@ -258,19 +228,6 @@ public class DataStream<T> {
}
/**
- * Connects two DataStreams
- *
- * @param returnStream
- * The other DataStream will connected to this
- * @param stream
- * This DataStream will be connected to returnStream
- */
- private void addConnection(DataStream<T> returnStream, DataStream<T> stream) {
- returnStream.connectIDs.addAll(stream.connectIDs);
- returnStream.partitioners.addAll(stream.partitioners);
- }
-
- /**
* Operator used for directing tuples to specific named outputs. Sets an
* {@link OutputSelector} for the vertex. The tuples emitted from this
* vertex will be sent to the output names selected by the OutputSelector.
@@ -282,8 +239,9 @@ public class DataStream<T> {
*/
public SplitDataStream<T> split(OutputSelector<T> outputSelector) {
try {
- for (String id : connectIDs) {
- jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+ for (DataStream<T> stream : connectedStreams) {
+ jobGraphBuilder.setOutputSelector(stream.id,
+ SerializationUtils.serialize(outputSelector));
}
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize OutputSelector");
@@ -349,16 +307,6 @@ public class DataStream<T> {
return setConnectionType(new DistributePartitioner<T>());
}
- private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
- DataStream<T> returnStream = this.copy();
-
- for (int i = 0; i < returnStream.partitioners.size(); i++) {
- returnStream.partitioners.set(i, partitioner);
- }
-
- return returnStream;
- }
-
/**
* Applies a Map transformation on a {@link DataStream}. The transformation
* calls a {@link RichMapFunction} for each element of the DataStream. Each
@@ -377,28 +325,6 @@ public class DataStream<T> {
}
/**
- * Applies a CoMap transformation on two separate {@link DataStream}s. The
- * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
- * of the first DataStream (on which .coMapWith was called) and
- * {@link CoMapFunction#map2(Tuple)} for each element of the second
- * DataStream. Each CoMapFunction call returns exactly one element.
- *
- * @param coMapper
- * The CoMapFunction used to jointly transform the two input
- * DataStreams
- * @param otherStream
- * The DataStream that will be transformed with
- * {@link CoMapFunction#map2(Tuple)}
- * @return The transformed DataStream
- */
- public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
- DataStream<T2> otherStream) {
- return addCoFunction("coMap", this.copy(), otherStream.copy(), coMapper,
- new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
- new CoMapInvokable<T, T2, R>(coMapper));
- }
-
- /**
* Applies a FlatMap transformation on a {@link DataStream}. The
* transformation calls a {@link RichFlatMapFunction} for each element of
* the DataStream. Each RichFlatMapFunction call can return any number of
@@ -418,19 +344,25 @@ public class DataStream<T> {
}
/**
- * Applies a Filter transformation on a {@link DataStream}. The
- * transformation calls a {@link RichFilterFunction} for each element of the
- * DataStream and retains only those element for which the function returns
- * true. Elements for which the function returns false are filtered.
+ * Applies a CoMap transformation on two separate {@link DataStream}s. The
+ * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
+ * of the first DataStream (on which .coMapWith was called) and
+ * {@link CoMapFunction#map2(Tuple)} for each element of the second
+ * DataStream. Each CoMapFunction call returns exactly one element.
*
- * @param filter
- * The RichFilterFunction that is called for each element of the
- * DataSet.
- * @return The filtered DataStream.
+ * @param coMapper
+ * The CoMapFunction used to jointly transform the two input
+ * DataStreams
+ * @param otherStream
+ * The DataStream that will be transformed with
+ * {@link CoMapFunction#map2(Tuple)}
+ * @return The transformed DataStream
*/
- public StreamOperator<T, T> filter(RichFilterFunction<T> filter) {
- return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
- RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
+ public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
+ DataStream<T2> otherStream) {
+ return addCoFunction("coMap", this.copy(), otherStream.copy(), coMapper,
+ new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
+ new CoMapInvokable<T, T2, R>(coMapper));
}
/**
@@ -480,115 +412,19 @@ public class DataStream<T> {
}
/**
- * Internal function for passing the user defined functions to the JobGraph
- * of the job.
- *
- * @param functionName
- * name of the function
- * @param function
- * the user defined function
- * @param functionInvokable
- * the wrapping JobVertex instance
- * @param <T>
- * type of the input stream
- * @param <R>
- * type of the return stream
- * @return the data stream constructed
- */
- private <R> StreamOperator<T, R> addFunction(String functionName,
- final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
- UserTaskInvokable<T, R> functionInvokable) {
-
- DataStream<T> inputStream = this.copy();
- StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
-
- try {
- jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
- functionName, SerializationUtils.serialize(function), degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize user defined function");
- }
-
- connectGraph(inputStream, returnStream.getId(), 0);
-
- if (inputStream.iterationflag) {
- returnStream.addIterationSource(inputStream.iterationID.toString());
- inputStream.iterationflag = false;
- }
-
- if (inputStream instanceof NamedDataStream) {
- returnStream.name(inputStream.userDefinedName);
- }
-
- return returnStream;
- }
-
- protected <T1, T2, R> DataStream<R> addCoFunction(String functionName,
- DataStream<T1> inputStream1, DataStream<T2> inputStream2,
- final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
- CoInvokable<T1, T2, R> functionInvokable) {
-
- DataStream<R> returnStream = new DataStream<R>(environment, functionName);
-
- try {
- jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
- functionName, SerializationUtils.serialize(function), degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize user defined function");
- }
-
- connectGraph(inputStream1, returnStream.getId(), 1);
- connectGraph(inputStream2, returnStream.getId(), 2);
-
- if ((inputStream1 instanceof NamedDataStream) && (inputStream2 instanceof NamedDataStream)) {
- throw new RuntimeException("An operator cannot have two names");
- } else {
- if (inputStream1 instanceof NamedDataStream) {
- returnStream.name(inputStream1.userDefinedName);
- }
-
- if (inputStream2 instanceof NamedDataStream) {
- returnStream.name(inputStream2.userDefinedName);
- }
- }
- // TODO consider iteration
-
- return returnStream;
- }
-
- /**
- * Internal function for assembling the underlying
- * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
- * the outputs of the given input stream to the specified output stream
- * given by the outputID.
- *
- * @param inputStream
- * input data stream
- * @param outputID
- * ID of the output
- * @param typeNumber
- * Number of the type (used at co-functions)
- */
- private <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
- for (int i = 0; i < inputStream.connectIDs.size(); i++) {
- String inputID = inputStream.connectIDs.get(i);
- StreamPartitioner<X> partitioner = inputStream.partitioners.get(i);
-
- jobGraphBuilder.setEdge(inputID, outputID, partitioner, typeNumber);
- }
- }
-
- /**
- * Adds the given sink to this DataStream. Only streams with sinks added
- * will be executed once the {@link StreamExecutionEnvironment#execute()}
- * method is called.
+ * Applies a Filter transformation on a {@link DataStream}. The
+ * transformation calls a {@link RichFilterFunction} for each element of the
+ * DataStream and retains only those element for which the function returns
+ * true. Elements for which the function returns false are filtered.
*
- * @param sinkFunction
- * The object containing the sink's invoke function.
- * @return The closed DataStream.
+ * @param filter
+ * The RichFilterFunction that is called for each element of the
+ * DataSet.
+ * @return The filtered DataStream.
*/
- public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
- return addSink(this.copy(), sinkFunction);
+ public StreamOperator<T, T> filter(RichFilterFunction<T> filter) {
+ return addFunction("filter", filter, new FunctionTypeWrapper<T, Tuple, T>(filter,
+ RichFilterFunction.class, 0, -1, 0), new FilterInvokable<T>(filter));
}
/**
@@ -608,32 +444,6 @@ public class DataStream<T> {
return returnStream;
}
- private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
- return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<T, Tuple, T>(
- sinkFunction, SinkFunction.class, 0, -1, 0));
- }
-
- private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction,
- TypeSerializerWrapper<T, Tuple, T> typeWrapper) {
- DataStream<T> returnStream = new DataStream<T>(environment, "sink");
-
- try {
- jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
- typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
- degreeOfParallelism);
- } catch (SerializationException e) {
- throw new RuntimeException("Cannot serialize SinkFunction");
- }
-
- inputStream.connectGraph(inputStream, returnStream.getId(), 0);
-
- if (this.copy() instanceof NamedDataStream) {
- returnStream.name(inputStream.userDefinedName);
- }
-
- return returnStream;
- }
-
/**
* Writes a DataStream to the file specified by path in text format. For
* every element of the DataStream the result of {@link Object#toString()}
@@ -956,10 +766,193 @@ public class DataStream<T> {
protected <R> DataStream<T> addIterationSource(String iterationID) {
DataStream<R> returnStream = new DataStream<R>(environment, "iterationSource");
-
+
jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism);
-
+
return this.copy();
}
+
+ /**
+ * Internal function for passing the user defined functions to the JobGraph
+ * of the job.
+ *
+ * @param functionName
+ * name of the function
+ * @param function
+ * the user defined function
+ * @param functionInvokable
+ * the wrapping JobVertex instance
+ * @param <T>
+ * type of the input stream
+ * @param <R>
+ * type of the return stream
+ * @return the data stream constructed
+ */
+ private <R> StreamOperator<T, R> addFunction(String functionName,
+ final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
+ UserTaskInvokable<T, R> functionInvokable) {
+
+ DataStream<T> inputStream = this.copy();
+ StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
+
+ try {
+ jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, typeWrapper,
+ functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ connectGraph(inputStream, returnStream.getId(), 0);
+
+ if (inputStream instanceof IterativeDataStream) {
+ returnStream.addIterationSource(((IterativeDataStream<T>) inputStream).iterationID
+ .toString());
+ }
+
+ if (inputStream instanceof NamedDataStream) {
+ returnStream.name(inputStream.userDefinedName);
+ }
+
+ return returnStream;
+ }
+
+ protected <T1, T2, R> DataStream<R> addCoFunction(String functionName,
+ DataStream<T1> inputStream1, DataStream<T2> inputStream2,
+ final AbstractRichFunction function, TypeSerializerWrapper<T1, T2, R> typeWrapper,
+ CoInvokable<T1, T2, R> functionInvokable) {
+
+ DataStream<R> returnStream = new DataStream<R>(environment, functionName);
+
+ try {
+ jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, typeWrapper,
+ functionName, SerializationUtils.serialize(function), degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize user defined function");
+ }
+
+ connectGraph(inputStream1, returnStream.getId(), 1);
+ connectGraph(inputStream2, returnStream.getId(), 2);
+
+ if ((inputStream1 instanceof NamedDataStream) && (inputStream2 instanceof NamedDataStream)) {
+ throw new RuntimeException("An operator cannot have two names");
+ } else {
+ if (inputStream1 instanceof NamedDataStream) {
+ returnStream.name(inputStream1.userDefinedName);
+ }
+
+ if (inputStream2 instanceof NamedDataStream) {
+ returnStream.name(inputStream2.userDefinedName);
+ }
+ }
+ // TODO consider iteration
+
+ return returnStream;
+ }
+
+ /**
+ * Gives the data transformation(vertex) a user defined name in order to use
+ * with directed outputs. The {@link OutputSelector} of the input vertex
+ * should use this name for directed emits.
+ *
+ * @param name
+ * The name to set
+ * @return The named DataStream.
+ */
+ protected DataStream<T> name(String name) {
+ // TODO copy DataStream?
+ if (name == "") {
+ throw new IllegalArgumentException("User defined name must not be empty string");
+ }
+
+ userDefinedName = name;
+ jobGraphBuilder.setUserDefinedName(id, name);
+
+ return this;
+ }
+
+ /**
+ * Connects two DataStreams
+ *
+ * @param returnStream
+ * The other DataStream will connected to this
+ * @param stream
+ * This DataStream will be connected to returnStream
+ */
+ private void addConnection(DataStream<T> returnStream, DataStream<T> stream) {
+ if ((stream instanceof NamedDataStream) || (returnStream instanceof NamedDataStream)) {
+ if (!returnStream.userDefinedName.equals(stream.userDefinedName)) {
+ throw new RuntimeException("Error: Connected NamedDataStreams must have same names");
+ }
+ }
+ returnStream.connectedStreams.add(stream.copy());
+ }
+
+ private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
+ DataStream<T> returnStream = this.copy();
+
+ for (DataStream<T> stream : returnStream.connectedStreams) {
+ stream.partitioner = partitioner;
+ }
+
+ return returnStream;
+ }
+
+ /**
+ * Internal function for assembling the underlying
+ * {@link org.apache.flink.nephele.jobgraph.JobGraph} of the job. Connects
+ * the outputs of the given input stream to the specified output stream
+ * given by the outputID.
+ *
+ * @param inputStream
+ * input data stream
+ * @param outputID
+ * ID of the output
+ * @param typeNumber
+ * Number of the type (used at co-functions)
+ */
+ private <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) {
+ for (DataStream<X> stream : inputStream.connectedStreams) {
+ jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber);
+ }
+ }
+
+ /**
+ * Adds the given sink to this DataStream. Only streams with sinks added
+ * will be executed once the {@link StreamExecutionEnvironment#execute()}
+ * method is called.
+ *
+ * @param sinkFunction
+ * The object containing the sink's invoke function.
+ * @return The closed DataStream.
+ */
+ public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
+ return addSink(this.copy(), sinkFunction);
+ }
+
+ private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
+ return addSink(inputStream, sinkFunction, new FunctionTypeWrapper<T, Tuple, T>(
+ sinkFunction, SinkFunction.class, 0, -1, 0));
+ }
+
+ private DataStream<T> addSink(DataStream<T> inputStream, SinkFunction<T> sinkFunction,
+ TypeSerializerWrapper<T, Tuple, T> typeWrapper) {
+ DataStream<T> returnStream = new DataStream<T>(environment, "sink");
+
+ try {
+ jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction),
+ typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
+ degreeOfParallelism);
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize SinkFunction");
+ }
+
+ inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+
+ if (this.copy() instanceof NamedDataStream) {
+ returnStream.name(inputStream.userDefinedName);
+ }
+
+ return returnStream;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
index bfce834..d5a5f78 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/IterativeDataStream.java
@@ -31,12 +31,17 @@ import org.apache.flink.streaming.partitioner.ForwardPartitioner;
public class IterativeDataStream<T> extends StreamOperator<T, T> {
static Integer iterationCount = 0;
+ protected Integer iterationID;
protected IterativeDataStream(DataStream<T> dataStream) {
super(dataStream);
iterationID = iterationCount;
iterationCount++;
- iterationflag = true;
+ }
+
+ protected IterativeDataStream(DataStream<T> dataStream, Integer iterationID) {
+ super(dataStream);
+ this.iterationID = iterationID;
}
/**
@@ -68,8 +73,7 @@ public class IterativeDataStream<T> extends StreamOperator<T, T> {
* when used with directed emits
*
*/
- public <R> DataStream<T> closeWith(DataStream<T> iterationResult,
- String iterationName) {
+ public <R> DataStream<T> closeWith(DataStream<T> iterationResult, String iterationName) {
DataStream<R> returnStream = new DataStream<R>(environment, "iterationSink");
jobGraphBuilder.addIterationSink(returnStream.getId(), iterationResult.getId(),
@@ -78,11 +82,16 @@ public class IterativeDataStream<T> extends StreamOperator<T, T> {
jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
iterationResult.getParallelism());
- for (int i = 0; i < iterationResult.connectIDs.size(); i++) {
- String inputID = iterationResult.connectIDs.get(i);
+ for (DataStream<T> stream : iterationResult.connectedStreams) {
+ String inputID = stream.getId();
jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(), 0);
}
return iterationResult;
}
+
+ @Override
+ protected DataStream<T> copy() {
+ return new IterativeDataStream<T>(this, iterationID);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
index b4bbe52..627aa42 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
@@ -34,10 +34,9 @@ public class SplitDataStream<T> extends DataStream<T> {
* @return Returns the modified DataStream
*/
public NamedDataStream<T> select(String outputName) {
-
- userDefinedName = outputName;
-
- return new NamedDataStream<T>(this);
+ NamedDataStream<T> returnStream = new NamedDataStream<T>(this);
+ returnStream.userDefinedName = outputName;
+ return returnStream;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
index 7edde1c..5ecd930 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamOperator.java
@@ -19,8 +19,7 @@
package org.apache.flink.streaming.api;
-
-public class StreamOperator<IN, OUT > extends DataStream<OUT> {
+public class StreamOperator<IN, OUT> extends DataStream<OUT> {
protected StreamOperator(StreamExecutionEnvironment environment, String operatorType) {
super(environment, operatorType);
@@ -30,4 +29,9 @@ public class StreamOperator<IN, OUT > extends DataStream<OUT> {
super(dataStream);
}
+ @Override
+ protected DataStream<OUT> copy() {
+ return new StreamOperator<IN, OUT>(this);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f932700a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 074992b..e2991b4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -20,6 +20,7 @@
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collection;
@@ -38,9 +39,9 @@ public class DirectedOutputTest {
static HashSet<Long> evenSet = new HashSet<Long>();
static HashSet<Long> oddSet = new HashSet<Long>();
-
+
private static class PlusTwo extends RichMapFunction<Long, Long> {
-
+
private static final long serialVersionUID = 1L;
@Override
@@ -59,7 +60,7 @@ public class DirectedOutputTest {
evenSet.add(tuple);
}
}
-
+
private static class OddSink extends SinkFunction<Long> {
private static final long serialVersionUID = 1L;
@@ -69,26 +70,24 @@ public class DirectedOutputTest {
oddSet.add(tuple);
}
}
-
-
+
private static class MySelector extends OutputSelector<Long> {
-
+
private static final long serialVersionUID = 1L;
@Override
public void select(Long tuple, Collection<String> outputs) {
int mod = (int) (tuple % 2);
switch (mod) {
- case 0:
- outputs.add("ds1");
- break;
- case 1:
- outputs.add("ds2");
- break;
+ case 0:
+ outputs.add("ds1");
+ break;
+ case 1:
+ outputs.add("ds2");
+ break;
}
}
}
-
@SuppressWarnings("unused")
@Test
@@ -102,11 +101,41 @@ public class DirectedOutputTest {
DataStream<Long> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
env.execute();
-
+
HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
HashSet<Long> expectedOdd = new HashSet<Long>(Arrays.asList(3L, 5L, 7L));
-
+
assertEquals(expectedEven, evenSet);
assertEquals(expectedOdd, oddSet);
}
+
+ @SuppressWarnings({ "unchecked" })
+ @Test
+ public void directNamingTest() {
+ LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ SplitDataStream<Long> s = env.generateSequence(1, 10).split(new MySelector());
+ try {
+ s.select("ds2").connectWith(s.select("ds1"));
+ fail();
+ } catch (Exception e) {
+ // Exception thrown
+ }
+ try {
+ s.shuffle().connectWith(s.select("ds1"));
+ fail();
+ } catch (Exception e) {
+ // Exception thrown
+ }
+ try {
+ s.select("ds2").connectWith(s);
+ fail();
+ } catch (Exception e) {
+ // Exception thrown
+ }
+ s.connectWith(s);
+ s.select("ds2").connectWith(s.select("ds2"));
+
+ }
}