You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:14 UTC

[37/51] [abbrv] git commit: [streaming] Directed emit API updated to use split and select

[streaming] Directed emit API updated to use split and select


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/910f74d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/910f74d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/910f74d7

Branch: refs/heads/master
Commit: 910f74d7bbef96e2a258601666daca326cfcad62
Parents: 776bd3f
Author: gyfora <gy...@gmail.com>
Authored: Sun Aug 3 19:20:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200

----------------------------------------------------------------------
 .../apache/flink/streaming/api/DataStream.java  | 88 ++++++++++++--------
 .../flink/streaming/api/NamedDataStream.java    | 33 ++++++++
 .../flink/streaming/api/SplitDataStream.java    | 48 +++++++++++
 .../api/collector/DirectedOutputTest.java       |  7 +-
 4 files changed, 138 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index f17dd1b..c648ab2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -124,7 +124,15 @@ public class DataStream<T> {
 		this.iterationID = dataStream.iterationID;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 	}
-	
+
+	/**
+	 * Creates a copy of the DataStream
+	 * 
+	 * @return The copy
+	 */
+	protected DataStream<T> copy() {
+		return new DataStream<T>(this);
+	}
 
 	/**
 	 * Partitioning strategy on the stream.
@@ -197,7 +205,7 @@ public class DataStream<T> {
 
 		jobGraphBuilder.setParallelism(id, degreeOfParallelism);
 
-		return new DataStream<T>(this);
+		return this;
 	}
 
 	/**
@@ -218,7 +226,7 @@ public class DataStream<T> {
 	 *            The name to set
 	 * @return The named DataStream.
 	 */
-	public DataStream<T> name(String name) {
+	protected DataStream<T> name(String name) {
 		// TODO copy DataStream?
 		if (name == "") {
 			throw new IllegalArgumentException("User defined name must not be empty string");
@@ -241,7 +249,7 @@ public class DataStream<T> {
 	 * @return The connected DataStream.
 	 */
 	public DataStream<T> connectWith(DataStream<T>... streams) {
-		DataStream<T> returnStream = new DataStream<T>(this);
+		DataStream<T> returnStream = this.copy();
 
 		for (DataStream<T> stream : streams) {
 			addConnection(returnStream, stream);
@@ -270,16 +278,18 @@ public class DataStream<T> {
 	 * 
 	 * @param outputSelector
 	 *            The user defined OutputSelector for directing the tuples.
-	 * @return The directed DataStream.
+	 * @return The {@link SplitDataStream}
 	 */
-	public DataStream<T> directTo(OutputSelector<T> outputSelector) {
+	public SplitDataStream<T> split(OutputSelector<T> outputSelector) {
 		try {
-			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+			for (String id : connectIDs) {
+				jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+			}
 		} catch (SerializationException e) {
 			throw new RuntimeException("Cannot serialize OutputSelector");
 		}
 
-		return this;
+		return new SplitDataStream<T>(this);
 	}
 
 	/**
@@ -340,7 +350,7 @@ public class DataStream<T> {
 	}
 
 	private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
-		DataStream<T> returnStream = new DataStream<T>(this);
+		DataStream<T> returnStream = this.copy();
 
 		for (int i = 0; i < returnStream.partitioners.size(); i++) {
 			returnStream.partitioners.set(i, partitioner);
@@ -383,8 +393,7 @@ public class DataStream<T> {
 	 */
 	public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
 			DataStream<T2> otherStream) {
-		return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
-				coMapper,
+		return addCoFunction("coMap", this.copy(), otherStream.copy(), coMapper,
 				new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
 				new CoMapInvokable<T, T2, R>(coMapper));
 	}
@@ -490,7 +499,7 @@ public class DataStream<T> {
 			final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
 			UserTaskInvokable<T, R> functionInvokable) {
 
-		DataStream<T> inputStream = new DataStream<T>(this);
+		DataStream<T> inputStream = this.copy();
 		StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
 
 		try {
@@ -507,6 +516,10 @@ public class DataStream<T> {
 			inputStream.iterationflag = false;
 		}
 
+		if (inputStream instanceof NamedDataStream) {
+			returnStream.name(inputStream.userDefinedName);
+		}
+
 		return returnStream;
 	}
 
@@ -527,6 +540,17 @@ public class DataStream<T> {
 		connectGraph(inputStream1, returnStream.getId(), 1);
 		connectGraph(inputStream2, returnStream.getId(), 2);
 
+		if ((inputStream1 instanceof NamedDataStream) && (inputStream2 instanceof NamedDataStream)) {
+			throw new RuntimeException("An operator cannot have two names");
+		} else {
+			if (inputStream1 instanceof NamedDataStream) {
+				returnStream.name(inputStream1.userDefinedName);
+			}
+
+			if (inputStream2 instanceof NamedDataStream) {
+				returnStream.name(inputStream2.userDefinedName);
+			}
+		}
 		// TODO consider iteration
 
 		return returnStream;
@@ -564,7 +588,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream.
 	 */
 	public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
-		return addSink(new DataStream<T>(this), sinkFunction);
+		return addSink(this.copy(), sinkFunction);
 	}
 
 	/**
@@ -575,7 +599,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream.
 	 */
 	public DataStream<T> print() {
-		DataStream<T> inputStream = new DataStream<T>(this);
+		DataStream<T> inputStream = this.copy();
 		PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
 		DataStream<T> returnStream = addSink(inputStream, printFunction, null);
 
@@ -603,6 +627,10 @@ public class DataStream<T> {
 
 		inputStream.connectGraph(inputStream, returnStream.getId(), 0);
 
+		if (this.copy() instanceof NamedDataStream) {
+			returnStream.name(inputStream.userDefinedName);
+		}
+
 		return returnStream;
 	}
 
@@ -617,8 +645,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path) {
-		writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
-		return new DataStream<T>(this);
+		return writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
 	}
 
 	/**
@@ -635,8 +662,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, long millis) {
-		writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
-		return new DataStream<T>(this);
+		return writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
 	}
 
 	/**
@@ -654,8 +680,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, int batchSize) {
-		writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
-		return new DataStream<T>(this);
+		return writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
 	}
 
 	/**
@@ -677,8 +702,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, long millis, T endTuple) {
-		writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
-		return new DataStream<T>(this);
+		return writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
 	}
 
 	/**
@@ -701,8 +725,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsText(String path, int batchSize, T endTuple) {
-		writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
-		return new DataStream<T>(this);
+		return writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
 	}
 
 	/**
@@ -771,8 +794,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path) {
-		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
-		return new DataStream<T>(this);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
 	}
 
 	/**
@@ -789,8 +811,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, long millis) {
-		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
-		return new DataStream<T>(this);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
 	}
 
 	/**
@@ -808,8 +829,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, int batchSize) {
-		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
-		return new DataStream<T>(this);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
 	}
 
 	/**
@@ -831,8 +851,7 @@ public class DataStream<T> {
 	 * @return The closed DataStream
 	 */
 	public DataStream<T> writeAsCsv(String path, long millis, T endTuple) {
-		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
-		return new DataStream<T>(this);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
 	}
 
 	/**
@@ -856,8 +875,7 @@ public class DataStream<T> {
 	 */
 	public DataStream<T> writeAsCsv(String path, int batchSize, T endTuple) {
 		setMutability(false);
-		writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
-		return new DataStream<T>(this);
+		return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
 	}
 
 	/**
@@ -942,6 +960,6 @@ public class DataStream<T> {
 		jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
 				degreeOfParallelism);
 
-		return new DataStream<T>(this);
+		return this.copy();
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java
new file mode 100755
index 0000000..1edfa6f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+public class NamedDataStream<T> extends DataStream<T> {
+
+	protected NamedDataStream(DataStream<T> dataStream) {
+		super(dataStream);
+	}
+	
+	@Override
+	protected DataStream<T> copy() {
+		return new NamedDataStream<T>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
new file mode 100755
index 0000000..b4bbe52
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+public class SplitDataStream<T> extends DataStream<T> {
+
+	protected SplitDataStream(DataStream<T> dataStream) {
+		super(dataStream);
+	}
+
+	/**
+	 * Sets the output name for which the vertex will receive tuples from the
+	 * preceding Directed stream
+	 * 
+	 * @param outputName
+	 *            The output name for which the operator will receive the input.
+	 * @return Returns the modified DataStream
+	 */
+	public NamedDataStream<T> select(String outputName) {
+
+		userDefinedName = outputName;
+
+		return new NamedDataStream<T>(this);
+	}
+
+	@Override
+	protected DataStream<T> copy() {
+		return new SplitDataStream<T>(this);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e0da783..074992b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.SplitDataStream;
 import org.apache.flink.streaming.api.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
@@ -95,9 +96,9 @@ public class DirectedOutputTest {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		DataStream<Long> s = env.generateSequence(1, 6).directTo(new MySelector());
-		DataStream<Long> ds1 = s.map(new PlusTwo()).name("ds1").addSink(new EvenSink());
-		DataStream<Long> ds2 = s.map(new PlusTwo()).name("ds2").addSink(new OddSink());
+		SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector());
+		DataStream<Long> ds1 = s.select("ds1").shuffle().map(new PlusTwo()).addSink(new EvenSink());
+		DataStream<Long> ds2 = s.select("ds2").map(new PlusTwo()).addSink(new OddSink());
 		DataStream<Long> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
 
 		env.execute();