You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:14 UTC
[37/51] [abbrv] git commit: [streaming] Directed emit API updated to
use split and select
[streaming] Directed emit API updated to use split and select
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/910f74d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/910f74d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/910f74d7
Branch: refs/heads/master
Commit: 910f74d7bbef96e2a258601666daca326cfcad62
Parents: 776bd3f
Author: gyfora <gy...@gmail.com>
Authored: Sun Aug 3 19:20:09 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:22:50 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 88 ++++++++++++--------
.../flink/streaming/api/NamedDataStream.java | 33 ++++++++
.../flink/streaming/api/SplitDataStream.java | 48 +++++++++++
.../api/collector/DirectedOutputTest.java | 7 +-
4 files changed, 138 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index f17dd1b..c648ab2 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -124,7 +124,15 @@ public class DataStream<T> {
this.iterationID = dataStream.iterationID;
this.jobGraphBuilder = dataStream.jobGraphBuilder;
}
-
+
+ /**
+ * Creates a copy of the DataStream
+ *
+ * @return The copy
+ */
+ protected DataStream<T> copy() {
+ return new DataStream<T>(this);
+ }
/**
* Partitioning strategy on the stream.
@@ -197,7 +205,7 @@ public class DataStream<T> {
jobGraphBuilder.setParallelism(id, degreeOfParallelism);
- return new DataStream<T>(this);
+ return this;
}
/**
@@ -218,7 +226,7 @@ public class DataStream<T> {
* The name to set
* @return The named DataStream.
*/
- public DataStream<T> name(String name) {
+ protected DataStream<T> name(String name) {
// TODO copy DataStream?
if (name == "") {
throw new IllegalArgumentException("User defined name must not be empty string");
@@ -241,7 +249,7 @@ public class DataStream<T> {
* @return The connected DataStream.
*/
public DataStream<T> connectWith(DataStream<T>... streams) {
- DataStream<T> returnStream = new DataStream<T>(this);
+ DataStream<T> returnStream = this.copy();
for (DataStream<T> stream : streams) {
addConnection(returnStream, stream);
@@ -270,16 +278,18 @@ public class DataStream<T> {
*
* @param outputSelector
* The user defined OutputSelector for directing the tuples.
- * @return The directed DataStream.
+ * @return The {@link SplitDataStream}
*/
- public DataStream<T> directTo(OutputSelector<T> outputSelector) {
+ public SplitDataStream<T> split(OutputSelector<T> outputSelector) {
try {
- jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+ for (String id : connectIDs) {
+ jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
+ }
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize OutputSelector");
}
- return this;
+ return new SplitDataStream<T>(this);
}
/**
@@ -340,7 +350,7 @@ public class DataStream<T> {
}
private DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
- DataStream<T> returnStream = new DataStream<T>(this);
+ DataStream<T> returnStream = this.copy();
for (int i = 0; i < returnStream.partitioners.size(); i++) {
returnStream.partitioners.set(i, partitioner);
@@ -383,8 +393,7 @@ public class DataStream<T> {
*/
public <T2, R> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper,
DataStream<T2> otherStream) {
- return addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream),
- coMapper,
+ return addCoFunction("coMap", this.copy(), otherStream.copy(), coMapper,
new FunctionTypeWrapper<T, T2, R>(coMapper, CoMapFunction.class, 0, 1, 2),
new CoMapInvokable<T, T2, R>(coMapper));
}
@@ -490,7 +499,7 @@ public class DataStream<T> {
final AbstractRichFunction function, TypeSerializerWrapper<T, Tuple, R> typeWrapper,
UserTaskInvokable<T, R> functionInvokable) {
- DataStream<T> inputStream = new DataStream<T>(this);
+ DataStream<T> inputStream = this.copy();
StreamOperator<T, R> returnStream = new StreamOperator<T, R>(environment, functionName);
try {
@@ -507,6 +516,10 @@ public class DataStream<T> {
inputStream.iterationflag = false;
}
+ if (inputStream instanceof NamedDataStream) {
+ returnStream.name(inputStream.userDefinedName);
+ }
+
return returnStream;
}
@@ -527,6 +540,17 @@ public class DataStream<T> {
connectGraph(inputStream1, returnStream.getId(), 1);
connectGraph(inputStream2, returnStream.getId(), 2);
+ if ((inputStream1 instanceof NamedDataStream) && (inputStream2 instanceof NamedDataStream)) {
+ throw new RuntimeException("An operator cannot have two names");
+ } else {
+ if (inputStream1 instanceof NamedDataStream) {
+ returnStream.name(inputStream1.userDefinedName);
+ }
+
+ if (inputStream2 instanceof NamedDataStream) {
+ returnStream.name(inputStream2.userDefinedName);
+ }
+ }
// TODO consider iteration
return returnStream;
@@ -564,7 +588,7 @@ public class DataStream<T> {
* @return The closed DataStream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
- return addSink(new DataStream<T>(this), sinkFunction);
+ return addSink(this.copy(), sinkFunction);
}
/**
@@ -575,7 +599,7 @@ public class DataStream<T> {
* @return The closed DataStream.
*/
public DataStream<T> print() {
- DataStream<T> inputStream = new DataStream<T>(this);
+ DataStream<T> inputStream = this.copy();
PrintSinkFunction<T> printFunction = new PrintSinkFunction<T>();
DataStream<T> returnStream = addSink(inputStream, printFunction, null);
@@ -603,6 +627,10 @@ public class DataStream<T> {
inputStream.connectGraph(inputStream, returnStream.getId(), 0);
+ if (this.copy() instanceof NamedDataStream) {
+ returnStream.name(inputStream.userDefinedName);
+ }
+
return returnStream;
}
@@ -617,8 +645,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path) {
- writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
- return new DataStream<T>(this);
+ return writeAsText(this, path, new WriteFormatAsText<T>(), 1, null);
}
/**
@@ -635,8 +662,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, long millis) {
- writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
- return new DataStream<T>(this);
+ return writeAsText(this, path, new WriteFormatAsText<T>(), millis, null);
}
/**
@@ -654,8 +680,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, int batchSize) {
- writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
- return new DataStream<T>(this);
+ return writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, null);
}
/**
@@ -677,8 +702,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, long millis, T endTuple) {
- writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
- return new DataStream<T>(this);
+ return writeAsText(this, path, new WriteFormatAsText<T>(), millis, endTuple);
}
/**
@@ -701,8 +725,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsText(String path, int batchSize, T endTuple) {
- writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
- return new DataStream<T>(this);
+ return writeAsText(this, path, new WriteFormatAsText<T>(), batchSize, endTuple);
}
/**
@@ -771,8 +794,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path) {
- writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
- return new DataStream<T>(this);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), 1, null);
}
/**
@@ -789,8 +811,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, long millis) {
- writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
- return new DataStream<T>(this);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, null);
}
/**
@@ -808,8 +829,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, int batchSize) {
- writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
- return new DataStream<T>(this);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, null);
}
/**
@@ -831,8 +851,7 @@ public class DataStream<T> {
* @return The closed DataStream
*/
public DataStream<T> writeAsCsv(String path, long millis, T endTuple) {
- writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
- return new DataStream<T>(this);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), millis, endTuple);
}
/**
@@ -856,8 +875,7 @@ public class DataStream<T> {
*/
public DataStream<T> writeAsCsv(String path, int batchSize, T endTuple) {
setMutability(false);
- writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
- return new DataStream<T>(this);
+ return writeAsCsv(this, path, new WriteFormatAsCsv<T>(), batchSize, endTuple);
}
/**
@@ -942,6 +960,6 @@ public class DataStream<T> {
jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism);
- return new DataStream<T>(this);
+ return this.copy();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java
new file mode 100755
index 0000000..1edfa6f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/NamedDataStream.java
@@ -0,0 +1,33 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+public class NamedDataStream<T> extends DataStream<T> {
+
+ protected NamedDataStream(DataStream<T> dataStream) {
+ super(dataStream);
+ }
+
+ @Override
+ protected DataStream<T> copy() {
+ return new NamedDataStream<T>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
new file mode 100755
index 0000000..b4bbe52
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/SplitDataStream.java
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+public class SplitDataStream<T> extends DataStream<T> {
+
+ protected SplitDataStream(DataStream<T> dataStream) {
+ super(dataStream);
+ }
+
+ /**
+ * Sets the output name for which the vertex will receive tuples from the
+ * preceding Directed stream
+ *
+ * @param outputName
+ * The output name for which the operator will receive the input.
+ * @return Returns the modified DataStream
+ */
+ public NamedDataStream<T> select(String outputName) {
+
+ userDefinedName = outputName;
+
+ return new NamedDataStream<T>(this);
+ }
+
+ @Override
+ protected DataStream<T> copy() {
+ return new SplitDataStream<T>(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/910f74d7/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index e0da783..074992b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.SplitDataStream;
import org.apache.flink.streaming.api.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.util.LogUtils;
@@ -95,9 +96,9 @@ public class DirectedOutputTest {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
- DataStream<Long> s = env.generateSequence(1, 6).directTo(new MySelector());
- DataStream<Long> ds1 = s.map(new PlusTwo()).name("ds1").addSink(new EvenSink());
- DataStream<Long> ds2 = s.map(new PlusTwo()).name("ds2").addSink(new OddSink());
+ SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector());
+ DataStream<Long> ds1 = s.select("ds1").shuffle().map(new PlusTwo()).addSink(new EvenSink());
+ DataStream<Long> ds2 = s.select("ds2").map(new PlusTwo()).addSink(new OddSink());
DataStream<Long> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
env.execute();