You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/18 23:22:52 UTC
[5/5] incubator-flink git commit: [streaming] StreamInvokable rework
for simpler logic and easier use
[streaming] StreamInvokable rework for simpler logic and easier use
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c5e9a512
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c5e9a512
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c5e9a512
Branch: refs/heads/master
Commit: c5e9a512242e050b71635cacaaca7890fadc6b67
Parents: 88e64fc
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Dec 17 23:34:26 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Thu Dec 18 20:07:27 2014 +0100
----------------------------------------------------------------------
docs/streaming_guide.md | 10 -
.../flink/streaming/api/JobGraphBuilder.java | 8 -
.../flink/streaming/api/StreamConfig.java | 13 +-
.../streaming/api/datastream/DataStream.java | 9 +-
.../datastream/SingleOutputStreamOperator.java | 18 --
.../streaming/api/invokable/SinkInvokable.java | 14 +-
.../api/invokable/SourceInvokable.java | 22 +-
.../api/invokable/StreamInvokable.java | 87 +++-----
.../invokable/operator/CounterInvokable.java | 23 +-
.../api/invokable/operator/FilterInvokable.java | 24 +-
.../invokable/operator/FlatMapInvokable.java | 14 +-
.../operator/GroupedReduceInvokable.java | 4 +-
.../operator/GroupedWindowInvokable.java | 76 +++----
.../api/invokable/operator/MapInvokable.java | 14 +-
.../invokable/operator/ProjectInvokable.java | 11 +-
.../operator/StreamReduceInvokable.java | 14 +-
.../api/invokable/operator/WindowInvokable.java | 29 +--
.../operator/co/CoBatchReduceInvokable.java | 9 +-
.../api/invokable/operator/co/CoInvokable.java | 45 +---
.../operator/co/CoWindowInvokable.java | 5 -
.../api/streamvertex/CoStreamVertex.java | 37 +++-
.../api/streamvertex/StreamTaskContext.java | 40 ++++
.../api/streamvertex/StreamVertex.java | 48 +++-
.../streaming/api/AggregationFunctionTest.java | 36 +--
.../invokable/operator/CoBatchReduceTest.java | 6 +-
.../api/invokable/operator/CoFlatMapTest.java | 4 +-
.../operator/CoGroupedBatchReduceTest.java | 6 +-
.../invokable/operator/CoGroupedReduceTest.java | 6 +-
.../operator/CoGroupedWindowReduceTest.java | 6 +-
.../api/invokable/operator/CoMapTest.java | 4 +-
.../invokable/operator/CoStreamReduceTest.java | 4 +-
.../invokable/operator/CoWindowReduceTest.java | 6 +-
.../api/invokable/operator/CoWindowTest.java | 6 +-
.../operator/CounterInvokableTest.java | 4 +-
.../api/invokable/operator/FilterTest.java | 4 +-
.../api/invokable/operator/FlatMapTest.java | 4 +-
.../operator/GroupedReduceInvokableTest.java | 4 +-
.../operator/GroupedWindowInvokableTest.java | 16 +-
.../api/invokable/operator/MapTest.java | 4 +-
.../api/invokable/operator/ProjectTest.java | 4 +-
.../invokable/operator/StreamReduceTest.java | 4 +-
.../invokable/operator/WindowInvokableTest.java | 10 +-
.../flink/streaming/util/MockCoContext.java | 217 +++++++++++++++++++
.../flink/streaming/util/MockCoInvokable.java | 169 ---------------
.../flink/streaming/util/MockContext.java | 148 +++++++++++++
.../flink/streaming/util/MockInvokable.java | 105 ---------
46 files changed, 664 insertions(+), 687 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 6e7f932..c51afbc 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -609,16 +609,6 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
To maximise the throughput the user can call `.setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full.
To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation.
-### Mutability
-
-This is currently a beta feature and it is only supported for a subset of the available operators.
-
-Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
-Usage:
-
-~~~java
-operator.setMutability(isMutable)
-~~~
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index d63042a..d66e388 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -64,7 +64,6 @@ public class JobGraphBuilder {
private Map<String, List<Integer>> outEdgeType;
private Map<String, List<List<String>>> outEdgeNames;
private Map<String, List<Boolean>> outEdgeSelectAll;
- private Map<String, Boolean> mutability;
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<?>>> connectionTypes;
private Map<String, String> operatorNames;
@@ -97,7 +96,6 @@ public class JobGraphBuilder {
outEdgeType = new HashMap<String, List<Integer>>();
outEdgeNames = new HashMap<String, List<List<String>>>();
outEdgeSelectAll = new HashMap<String, List<Boolean>>();
- mutability = new HashMap<String, Boolean>();
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>();
operatorNames = new HashMap<String, String>();
@@ -302,7 +300,6 @@ public class JobGraphBuilder {
vertexClasses.put(vertexName, vertexClass);
setParallelism(vertexName, parallelism);
- mutability.put(vertexName, false);
invokableObjects.put(vertexName, invokableObject);
operatorNames.put(vertexName, operatorName);
serializedFunctions.put(vertexName, serializedFunction);
@@ -355,7 +352,6 @@ public class JobGraphBuilder {
StreamConfig config = new StreamConfig(vertex.getConfiguration());
- config.setMutability(mutability.get(vertexName));
config.setBufferTimeout(bufferTimeout.get(vertexName));
config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName));
@@ -447,10 +443,6 @@ public class JobGraphBuilder {
inputFormatList.put(vertexName, inputFormat);
}
- public void setMutability(String vertexName, boolean isMutable) {
- mutability.put(vertexName, isMutable);
- }
-
public void setBufferTimeout(String vertexName, long bufferTimeout) {
this.bufferTimeout.put(vertexName, bufferTimeout);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 1d863a7..9800b63 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -57,14 +57,11 @@ public class StreamConfig {
private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
- private static final String MUTABILITY = "isMutable";
private static final String ITERATON_WAIT = "iterationWait";
// DEFAULT VALUES
- private static final boolean DEFAULT_IS_MUTABLE = false;
-
- private static final long DEFAULT_TIMEOUT = 0;
+ private static final long DEFAULT_TIMEOUT = 100;
// CONFIG METHODS
@@ -138,14 +135,6 @@ public class StreamConfig {
config.setBytes(key, SerializationUtils.serialize(typeWrapper));
}
- public void setMutability(boolean isMutable) {
- config.setBoolean(MUTABILITY, isMutable);
- }
-
- public boolean getMutability() {
- return config.getBoolean(MUTABILITY, DEFAULT_IS_MUTABLE);
- }
-
public void setBufferTimeout(long timeout) {
config.setLong(BUFFER_TIMEOUT, timeout);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 474d57b..6e8da0a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -366,7 +366,7 @@ public class DataStream<OUT> {
* the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is to use
* output splitting to send a part of the closing data stream to the head.
- * Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
+ * Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for
* more information.
* <p>
* The iteration edge will be partitioned the same way as the first input of
@@ -940,7 +940,6 @@ public class DataStream<OUT> {
WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
path, format, millis, endTuple), inputStream.typeInfo);
- jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -968,7 +967,6 @@ public class DataStream<OUT> {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
inputStream.typeInfo);
- jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -1063,9 +1061,6 @@ public class DataStream<OUT> {
* @return The closed DataStream
*/
public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple) {
- if (this instanceof SingleOutputStreamOperator) {
- ((SingleOutputStreamOperator<?, ?>) this).setMutability(false);
- }
return writeAsCsv(this, path, new WriteFormatAsCsv<OUT>(), batchSize, endTuple);
}
@@ -1091,7 +1086,6 @@ public class DataStream<OUT> {
WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
path, format, millis, endTuple), inputStream.typeInfo);
- jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
@@ -1119,7 +1113,6 @@ public class DataStream<OUT> {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
inputStream.typeInfo);
- jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 3e1c940..016322b 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -71,24 +71,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
return this;
}
- /**
- * This is a beta feature, use with care </br><br/>
- * Sets the mutability of the operator. If the operator is set to mutable,
- * the tuples received in the user defined functions, will be reused after
- * the function call. Setting an operator to mutable reduces garbage
- * collection overhead and thus increases scalability. Please note that if a
- * {@link DataStream#batchReduce} or {@link DataStream#windowReduce} is used
- * as mutable, the user can only iterate through the iterator once in every
- * invoke.
- *
- * @param isMutable
- * The mutability of the operator.
- * @return The operator with mutability set.
- */
- public SingleOutputStreamOperator<OUT, O> setMutability(boolean isMutable) {
- jobGraphBuilder.setMutability(id, isMutable);
- return this;
- }
/**
* Sets the maximum time frequency (ms) for the flushing of the output
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index ec33224..74591a8 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -30,23 +30,15 @@ public class SinkInvokable<IN> extends StreamInvokable<IN, IN> {
}
@Override
- protected void immutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- callUserFunctionAndLogException();
- resetReuse();
- }
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
+ public void invoke() throws Exception {
+ while (readNext() != null) {
callUserFunctionAndLogException();
}
}
@Override
protected void callUserFunction() throws Exception {
- sinkFunction.invoke((IN) reuse.getObject());
+ sinkFunction.invoke((IN) nextRecord.getObject());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index 0cfe028..f1cf2c5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -21,38 +21,24 @@ import java.io.Serializable;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Serializable {
+public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements Serializable {
private static final long serialVersionUID = 1L;
private SourceFunction<OUT> sourceFunction;
-
public SourceInvokable(SourceFunction<OUT> sourceFunction) {
super(sourceFunction);
this.sourceFunction = sourceFunction;
}
@Override
- public void invoke() throws Exception {
- sourceFunction.invoke(collector);
- }
-
- @Override
- protected void immutableInvoke() throws Exception {
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
+ public void invoke() {
+ callUserFunctionAndLogException();
}
@Override
protected void callUserFunction() throws Exception {
+ sourceFunction.invoke(collector);
}
-
- @Override
- public SourceFunction<OUT> getSourceFunction(){
- return sourceFunction;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index e587b93..d19d7ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -17,16 +17,16 @@
package org.apache.flink.streaming.api.invokable;
+import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
@@ -45,9 +45,11 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
+ protected StreamTaskContext<OUT> taskContext;
+
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
protected StreamRecordSerializer<IN> inSerializer;
- protected StreamRecord<IN> reuse;
+ protected StreamRecord<IN> nextRecord;
protected boolean isMutable;
protected Collector<OUT> collector;
@@ -61,48 +63,43 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
/**
* Initializes the {@link StreamInvokable} for input and output handling
*
- * @param collector
- * Collector object for collecting the outputs for the operator
- * @param recordIterator
- * Iterator for reading in the input records
- * @param serializer
- * Serializer used to deserialize inputs
- * @param isMutable
- * Mutability setting for the operator
+ * @param taskContext
+ * StreamTaskContext representing the vertex
*/
- public void initialize(Collector<OUT> collector,
- MutableObjectIterator<StreamRecord<IN>> recordIterator,
- StreamRecordSerializer<IN> serializer, boolean isMutable) {
- this.collector = collector;
- this.recordIterator = recordIterator;
- this.inSerializer = serializer;
+ public void setup(StreamTaskContext<OUT> taskContext) {
+ this.collector = taskContext.getOutputCollector();
+ this.recordIterator = taskContext.getInput(0);
+ this.inSerializer = taskContext.getInputSerializer(0);
if (this.inSerializer != null) {
- this.reuse = serializer.createInstance();
+ this.nextRecord = inSerializer.createInstance();
}
- this.isMutable = isMutable;
- }
-
- /**
- * Re-initializes the object in which the next input record will be read in
- */
- protected void resetReuse() {
- this.reuse = inSerializer.createInstance();
+ this.taskContext = taskContext;
}
/**
- * Method that will be called if the mutability setting is set to immutable
+ * Method that will be called when the operator starts, should encode the
+ * processing logic
*/
- protected abstract void immutableInvoke() throws Exception;
+ public abstract void invoke() throws Exception;
- /**
- * Method that will be called if the mutability setting is set to mutable
+ /*
+ * Reads the next record from the reader iterator and stores it in the
+ * nextRecord variable
*/
- protected abstract void mutableInvoke() throws Exception;
+ protected StreamRecord<IN> readNext() {
+ this.nextRecord = inSerializer.createInstance();
+ try {
+ return nextRecord = recordIterator.next(nextRecord);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not read next record.");
+ }
+ }
/**
* The call of the user implemented function should be implemented here
*/
- protected abstract void callUserFunction() throws Exception;
+ protected void callUserFunction() throws Exception {
+ }
/**
* Method for logging exceptions thrown during the user function call
@@ -119,20 +116,6 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
}
/**
- * Method that will be called when the stream starts. The user should encode
- * the processing functionality in {@link #mutableInvoke()} and
- * {@link #immutableInvoke()}
- *
- */
- public void invoke() throws Exception {
- if (this.isMutable) {
- mutableInvoke();
- } else {
- immutableInvoke();
- }
- }
-
- /**
* Open method to be used if the user defined function extends the
* RichFunction class
*
@@ -141,9 +124,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
*/
public void open(Configuration parameters) throws Exception {
isRunning = true;
- if (userFunction instanceof RichFunction) {
- ((RichFunction) userFunction).open(parameters);
- }
+ FunctionUtils.openFunction(userFunction, parameters);
}
/**
@@ -154,16 +135,10 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
public void close() throws Exception {
isRunning = false;
collector.close();
- if (userFunction instanceof RichFunction) {
- ((RichFunction) userFunction).close();
- }
+ FunctionUtils.closeFunction(userFunction);
}
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(userFunction, t);
}
-
- public SourceFunction<OUT> getSourceFunction() {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 7924595..0267253 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -21,30 +21,17 @@ import org.apache.flink.streaming.api.invokable.StreamInvokable;
public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
private static final long serialVersionUID = 1L;
-
+
Long count = 0L;
-
+
public CounterInvokable() {
super(null);
}
-
- @Override
- protected void immutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- callUserFunctionAndLogException();
- resetReuse();
- }
- }
@Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- callUserFunctionAndLogException();
+ public void invoke() throws Exception {
+ while (readNext() != null) {
+ collector.collect(++count);
}
}
-
- @Override
- protected void callUserFunction() throws Exception {
- collector.collect(++count);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index a54b6ad..796196d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -25,37 +25,25 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
FilterFunction<IN> filterFunction;
+ private boolean collect;
public FilterInvokable(FilterFunction<IN> filterFunction) {
super(filterFunction);
this.filterFunction = filterFunction;
}
- private boolean canCollect;
-
@Override
- protected void immutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
+ public void invoke() throws Exception {
+ while (readNext() != null) {
callUserFunctionAndLogException();
- if (canCollect) {
- collector.collect(reuse.getObject());
- }
- resetReuse();
- }
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- callUserFunctionAndLogException();
- if (canCollect) {
- collector.collect(reuse.getObject());
+ if (collect) {
+ collector.collect(nextRecord.getObject());
}
}
}
@Override
protected void callUserFunction() throws Exception {
- canCollect = filterFunction.filter(reuse.getObject());
+ collect = filterFunction.filter(nextRecord.getObject());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 3452a82..8ff78eb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -31,23 +31,15 @@ public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
@Override
- protected void immutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- callUserFunctionAndLogException();
- resetReuse();
- }
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
+ public void invoke() throws Exception {
+ while (readNext() != null) {
callUserFunctionAndLogException();
}
}
@Override
protected void callUserFunction() throws Exception {
- flatMapper.flatMap(reuse.getObject(), collector);
+ flatMapper.flatMap(nextRecord.getObject(), collector);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
index d64fb6f..fdcf520 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -38,9 +38,9 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
@Override
protected void reduce() throws Exception {
- Object key = reuse.getKey(keySelector);
+ Object key = nextRecord.getKey(keySelector);
currentValue = values.get(key);
- nextValue = reuse.getObject();
+ nextValue = nextRecord.getObject();
if (currentValue != null) {
callUserFunctionAndLogException();
values.put(key, reduced);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index ae16be0..33348e4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -37,8 +37,6 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This invokable allows windowing based on {@link TriggerPolicy} and
@@ -80,8 +78,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
*/
private static final long serialVersionUID = -3469545957144404137L;
- private static final Logger LOG = LoggerFactory.getLogger(GroupedWindowInvokable.class);
-
private KeySelector<IN, ?> keySelector;
private Configuration parameters;
private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies;
@@ -226,23 +222,23 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
@Override
- protected void immutableInvoke() throws Exception {
+ public void invoke() throws Exception {
// Prevent empty data streams
- if ((reuse = recordIterator.next(reuse)) == null) {
+ if (readNext() == null) {
throw new RuntimeException("DataStream must not be empty");
}
// Continuously run
- while (reuse != null) {
- WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector.getKey(reuse
- .getObject()));
+ while (nextRecord != null) {
+ WindowInvokable<IN, OUT> groupInvokable = windowingGroups.get(keySelector
+ .getKey(nextRecord.getObject()));
if (groupInvokable == null) {
- groupInvokable = makeNewGroup(reuse);
+ groupInvokable = makeNewGroup(nextRecord);
}
// Run the precalls for central active triggers
for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
- Object[] result = trigger.preNotifyTrigger(reuse.getObject());
+ Object[] result = trigger.preNotifyTrigger(nextRecord.getObject());
for (Object in : result) {
// If central eviction is used, handle it here
@@ -260,7 +256,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// Process non-active central triggers
for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
- if (triggerPolicy.notifyTrigger(reuse.getObject())) {
+ if (triggerPolicy.notifyTrigger(nextRecord.getObject())) {
currentTriggerPolicies.add(triggerPolicy);
}
}
@@ -268,12 +264,12 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
if (currentTriggerPolicies.isEmpty()) {
// only add the element to its group
- groupInvokable.processRealElement(reuse.getObject());
+ groupInvokable.processRealElement(nextRecord.getObject());
checkForEmptyGroupBuffer(groupInvokable);
// If central eviction is used, handle it here
if (!centralEvictionPolicies.isEmpty()) {
- evictElements(centralEviction(reuse.getObject(), false));
+ evictElements(centralEviction(nextRecord.getObject(), false));
deleteOrderForCentralEviction.add(groupInvokable);
}
@@ -283,20 +279,21 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
if (group == groupInvokable) {
// process real with initialized policies
- group.processRealElement(reuse.getObject(), currentTriggerPolicies);
+ group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
} else {
// process like a fake but also initialized with
// policies
- group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
+ group.externalTriggerFakeElement(nextRecord.getObject(),
+ currentTriggerPolicies);
}
-
- //remove group in case it has an empty buffer
- //checkForEmptyGroupBuffer(group);
+
+ // remove group in case it has an empty buffer
+ // checkForEmptyGroupBuffer(group);
}
// If central eviction is used, handle it here
if (!centralEvictionPolicies.isEmpty()) {
- evictElements(centralEviction(reuse.getObject(), true));
+ evictElements(centralEviction(nextRecord.getObject(), true));
deleteOrderForCentralEviction.add(groupInvokable);
}
}
@@ -304,9 +301,8 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// clear current trigger list
currentTriggerPolicies.clear();
- // Recreate the reuse-StremRecord object and load next StreamRecord
- resetReuse();
- reuse = recordIterator.next(reuse);
+ // read next record
+ readNext();
}
// Stop all remaining threads from policies
@@ -358,7 +354,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
clonedDistributedEvictionPolicies);
}
- groupInvokable.initialize(collector, recordIterator, inSerializer, isMutable);
+ groupInvokable.setup(taskContext);
groupInvokable.open(this.parameters);
windowingGroups.put(keySelector.getKey(element.getObject()), groupInvokable);
@@ -366,21 +362,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
@Override
- protected void mutableInvoke() throws Exception {
- if (LOG.isInfoEnabled()) {
- LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
- }
- immutableInvoke();
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- // This method gets never called directly. The user function calls are
- // all delegated to the invokable instanced which handle/represent the
- // groups.
- }
-
- @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.parameters = parameters;
@@ -456,12 +437,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
* buffer.
*/
private void evictElements(int numToEvict) {
- HashSet<WindowInvokable<IN, OUT>> usedGroups=new HashSet<WindowInvokable<IN,OUT>>();
+ HashSet<WindowInvokable<IN, OUT>> usedGroups = new HashSet<WindowInvokable<IN, OUT>>();
for (; numToEvict > 0; numToEvict--) {
- WindowInvokable<IN, OUT> currentGroup=deleteOrderForCentralEviction.getFirst();
- //Do the eviction
+ WindowInvokable<IN, OUT> currentGroup = deleteOrderForCentralEviction.getFirst();
+ // Do the eviction
currentGroup.evictFirst();
- //Remember groups which possibly have an empty buffer after the eviction
+ // Remember groups which possibly have an empty buffer after the
+ // eviction
usedGroups.add(currentGroup);
try {
deleteOrderForCentralEviction.removeFirst();
@@ -471,13 +453,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
}
-
- //Remove groups with empty buffer
- for (WindowInvokable<IN, OUT> group:usedGroups){
+
+ // Remove groups with empty buffer
+ for (WindowInvokable<IN, OUT> group : usedGroups) {
checkForEmptyGroupBuffer(group);
}
}
-
+
/**
* Checks if the element buffer of a given windowing group is empty. If so,
* the group will be deleted.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 4feb4f3..6be96ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -31,22 +31,14 @@ public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
@Override
- protected void immutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- callUserFunctionAndLogException();
- resetReuse();
- }
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
+ public void invoke() throws Exception {
+ while (readNext() != null) {
callUserFunctionAndLogException();
}
}
@Override
protected void callUserFunction() throws Exception {
- collector.collect(mapper.map(reuse.getObject()));
+ collector.collect(mapper.map(nextRecord.getObject()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 4666a85..c9d9e5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -39,13 +39,8 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
}
@Override
- protected void immutableInvoke() throws Exception {
- mutableInvoke();
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
+ public void invoke() throws Exception {
+ while (readNext() != null) {
callUserFunctionAndLogException();
}
}
@@ -53,7 +48,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
@Override
protected void callUserFunction() throws Exception {
for (int i = 0; i < this.numFields; i++) {
- outTuple.setField(reuse.getField(fields[i]), i);
+ outTuple.setField(nextRecord.getField(fields[i]), i);
}
collector.collect(outTuple);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index d327c76..4bb78b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -34,22 +34,14 @@ public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> {
}
@Override
- protected void immutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
- reduce();
- resetReuse();
- }
- }
-
- @Override
- protected void mutableInvoke() throws Exception {
- while ((reuse = recordIterator.next(reuse)) != null) {
+ public void invoke() throws Exception {
+ while (readNext() != null) {
reduce();
}
}
protected void reduce() throws Exception {
- nextValue = reuse.getObject();
+ nextValue = nextRecord.getObject();
callUserFunctionAndLogException();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index 0e740bb..ea891c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -29,8 +29,6 @@ import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
@@ -39,8 +37,6 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
*/
private static final long serialVersionUID = -8038984294071650730L;
- private static final Logger LOG = LoggerFactory.getLogger(WindowInvokable.class);
-
private LinkedList<TriggerPolicy<IN>> triggerPolicies;
private LinkedList<EvictionPolicy<IN>> evictionPolicies;
private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
@@ -120,20 +116,19 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
}
@Override
- protected void immutableInvoke() throws Exception {
+ public void invoke() throws Exception {
// Prevent empty data streams
- if ((reuse = recordIterator.next(reuse)) == null) {
+ if (readNext() == null) {
throw new RuntimeException("DataStream must not be empty");
}
// Continuously run
- while (reuse != null) {
- processRealElement(reuse.getObject());
+ while (nextRecord != null) {
+ processRealElement(nextRecord.getObject());
- // Recreate the reuse-StremRecord object and load next StreamRecord
- resetReuse();
- reuse = recordIterator.next(reuse);
+ // Load next StreamRecord
+ readNext();
}
// Stop all remaining threads from policies
@@ -146,14 +141,6 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
}
- @Override
- protected void mutableInvoke() throws Exception {
- if (LOG.isInfoEnabled()) {
- LOG.info("There is currently no mutable implementation of this operator. Immutable version is used.");
- }
- immutableInvoke();
- }
-
/**
* This method gets called in case of an grouped windowing in case central
* trigger occurred and the arriving element causing the trigger is not part
@@ -363,10 +350,10 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
*
* @return true in case the buffer is empty otherwise false.
*/
- protected boolean isBufferEmpty(){
+ protected boolean isBufferEmpty() {
return buffer.isEmpty();
}
-
+
/**
* This method does the final reduce at the end of the stream and emits the
* result.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
index edf5a8f..4ed49fd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchReduceInvokable.java
@@ -63,7 +63,7 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
}
@Override
- public void immutableInvoke() throws Exception {
+ public void invoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
if (next == 0) {
@@ -100,13 +100,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
return batch2;
}
- @Override
- // TODO: implement mutableInvoke for reduce
- protected void mutableInvoke() throws Exception {
- System.out.println("Immutable setting is used");
- immutableInvoke();
- }
-
protected void reduce1(StreamBatch<IN1> batch) {
this.currentBatch1 = batch;
callUserFunctionAndLogException1();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 25ed62c..604873e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,19 +45,18 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
protected TypeSerializer<IN1> serializer1;
protected TypeSerializer<IN2> serializer2;
- public void initialize(Collector<OUT> collector,
- CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
- StreamRecordSerializer<IN1> serializer1, StreamRecordSerializer<IN2> serializer2,
- boolean isMutable) {
- this.collector = collector;
+ @Override
+ public void setup(StreamTaskContext<OUT> taskContext) {
+ this.collector = taskContext.getOutputCollector();
+
+ this.recordIterator = taskContext.getCoReader();
+
+ this.srSerializer1 = taskContext.getInputSerializer(0);
+ this.srSerializer2 = taskContext.getInputSerializer(1);
- this.recordIterator = recordIterator;
- this.reuse1 = serializer1.createInstance();
- this.reuse2 = serializer2.createInstance();
+ this.reuse1 = srSerializer1.createInstance();
+ this.reuse2 = srSerializer2.createInstance();
- this.srSerializer1 = serializer1;
- this.srSerializer2 = serializer2;
- this.isMutable = isMutable;
this.serializer1 = srSerializer1.getObjectSerializer();
this.serializer2 = srSerializer2.getObjectSerializer();
}
@@ -76,7 +75,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
}
@Override
- protected void immutableInvoke() throws Exception {
+ public void invoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
if (next == 0) {
@@ -93,22 +92,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
}
}
- @Override
- protected void mutableInvoke() throws Exception {
- while (true) {
- int next = recordIterator.next(reuse1, reuse2);
- if (next == 0) {
- break;
- } else if (next == 1) {
- initialize1();
- handleStream1();
- } else {
- initialize2();
- handleStream2();
- }
- }
- }
-
protected abstract void handleStream1() throws Exception;
protected abstract void handleStream2() throws Exception;
@@ -147,8 +130,4 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
}
}
- @Override
- protected void callUserFunction() throws Exception {
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index be3f578..7df5668 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -59,11 +59,6 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
}
@Override
- protected void mutableInvoke() throws Exception {
- throw new RuntimeException("Reducing mutable sliding batch is not supported.");
- }
-
- @Override
protected void handleStream1() throws Exception {
window.addToBuffer1(reuse1.getObject());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 0058c66..9321bc7 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -30,8 +30,6 @@ import org.apache.flink.util.MutableObjectIterator;
public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
- private OutputHandler<OUT> outputHandler;
-
protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
@@ -68,8 +66,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
@Override
protected void setInvokable() {
userInvokable = configuration.getUserInvokable(userClassLoader);
- userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
- inputDeserializer2, isMutable);
+ userInvokable.setup(this);
}
protected void setConfigInputs() throws StreamVertexException {
@@ -105,4 +102,36 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
outputHandler.invokeUserFunction("CO-TASK", userInvokable);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ switch (index) {
+ case 0:
+ return (MutableObjectIterator<X>) inputIter1;
+ case 1:
+ return (MutableObjectIterator<X>) inputIter2;
+ default:
+ throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+ switch (index) {
+ case 0:
+ return (StreamRecordSerializer<X>) inputDeserializer1;
+ case 1:
+ return (StreamRecordSerializer<X>) inputDeserializer2;
+ default:
+ throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+ return (CoReaderIterator<X, Y>) coIter;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
new file mode 100644
index 0000000..7fbab3b
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamTaskContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.streamvertex;
+
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public interface StreamTaskContext<OUT> {
+
+ StreamConfig getConfig();
+
+ ClassLoader getUserCodeClassLoader();
+
+ <X> MutableObjectIterator<X> getInput(int index);
+
+ <X> StreamRecordSerializer<X> getInputSerializer(int index);
+
+ Collector<OUT> getOutputCollector();
+
+ <X, Y> CoReaderIterator<X, Y> getCoReader();
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 13e6c9f..7504efd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -23,9 +23,13 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.state.OperatorState;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
-public class StreamVertex<IN, OUT> extends AbstractInvokable {
+public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> {
private static int numTasks;
@@ -33,12 +37,11 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
protected int instanceID;
protected String name;
private static int numVertices = 0;
- protected boolean isMutable;
protected Object function;
protected String functionName;
private InputHandler<IN> inputHandler;
- private OutputHandler<OUT> outputHandler;
+ protected OutputHandler<OUT> outputHandler;
private StreamInvokable<IN, OUT> userInvokable;
private StreamingRuntimeContext context;
@@ -68,7 +71,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
this.userClassLoader = getUserCodeClassLoader();
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getVertexName();
- this.isMutable = configuration.getMutability();
this.functionName = configuration.getFunctionName();
this.function = configuration.getFunction(userClassLoader);
this.states = configuration.getOperatorStates(userClassLoader);
@@ -89,8 +91,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
protected void setInvokable() {
userInvokable = configuration.getUserInvokable(userClassLoader);
- userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
- inputHandler.getInputSerializer(), isMutable);
+ userInvokable.setup(this);
}
public String getName() {
@@ -111,4 +112,39 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable {
public void invoke() throws Exception {
outputHandler.invokeUserFunction("TASK", userInvokable);
}
+
+ @Override
+ public StreamConfig getConfig() {
+ return configuration;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ if (index == 0) {
+ return (MutableObjectIterator<X>) inputHandler.getInputIter();
+ } else {
+ throw new IllegalArgumentException("There is only 1 input");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+ if (index == 0) {
+ return (StreamRecordSerializer<X>) inputHandler.getInputSerializer();
+ } else {
+ throw new IllegalArgumentException("There is only 1 input");
+ }
+ }
+
+ @Override
+ public Collector<OUT> getOutputCollector() {
+ return outputHandler.getCollector();
+ }
+
+ @Override
+ public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+ throw new IllegalArgumentException("CoReader not available");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 0fbf72a..9376166 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
@@ -102,24 +102,24 @@ public class AggregationFunctionTest {
.getAggregator(1, type1, AggregationType.MAX);
ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0,
type2, AggregationType.MAX);
- List<Tuple2<Integer, Integer>> sumList = MockInvokable.createAndExecute(
+ List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
- List<Tuple2<Integer, Integer>> minList = MockInvokable.createAndExecute(
+ List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
- List<Tuple2<Integer, Integer>> maxList = MockInvokable.createAndExecute(
+ List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
- List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
+ List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction,
new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
- List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
+ List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction,
new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
- List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
+ List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction,
new TupleKeySelector<Tuple2<Integer, Integer>>(0)), getInputList());
@@ -129,11 +129,11 @@ public class AggregationFunctionTest {
assertEquals(expectedGroupSumList, groupedSumList);
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
- assertEquals(expectedSumList0, MockInvokable.createAndExecute(
+ assertEquals(expectedSumList0, MockContext.createAndExecute(
new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
- assertEquals(expectedMinList0, MockInvokable.createAndExecute(
+ assertEquals(expectedMinList0, MockContext.createAndExecute(
new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
- assertEquals(expectedMaxList0, MockInvokable.createAndExecute(
+ assertEquals(expectedMaxList0, MockContext.createAndExecute(
new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
@@ -210,16 +210,16 @@ public class AggregationFunctionTest {
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+ assertEquals(maxByFirstExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
getInputList()));
- assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+ assertEquals(maxByLastExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
getInputList()));
- assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+ assertEquals(minByLastExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
getInputList()));
- assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+ assertEquals(minByFirstExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
getInputList()));
@@ -284,16 +284,16 @@ public class AggregationFunctionTest {
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
- assertEquals(maxByFirstExpected, MockInvokable.createAndExecute(
+ assertEquals(maxByFirstExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
getInputList()));
- assertEquals(maxByLastExpected, MockInvokable.createAndExecute(
+ assertEquals(maxByLastExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
getInputList()));
- assertEquals(minByLastExpected, MockInvokable.createAndExecute(
+ assertEquals(minByLastExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
getInputList()));
- assertEquals(minByFirstExpected, MockInvokable.createAndExecute(
+ assertEquals(minByFirstExpected, MockContext.createAndExecute(
new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
getInputList()));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
index 44b0513..1db286c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchReduceTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.junit.Test;
public class CoBatchReduceTest {
@@ -84,7 +84,7 @@ public class CoBatchReduceTest {
expected.add("def");
expected.add("ghi");
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
Collections.sort(result);
Collections.sort(expected);
@@ -125,7 +125,7 @@ public class CoBatchReduceTest {
expected.add("efg");
expected.add("ghi");
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
Collections.sort(result);
Collections.sort(expected);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index 5009496..a91bd0c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -59,7 +59,7 @@ public class CoFlatMapTest implements Serializable {
List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
"e", "3", "4", "5");
- List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+ List<String> actualList = MockCoContext.createAndExecute(invokable,
Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
assertEquals(expectedList, actualList);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
index ce01a7d..bc19a89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchReduceTest.java
@@ -26,7 +26,7 @@ import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
@@ -99,7 +99,7 @@ public class CoGroupedBatchReduceTest {
new MyCoReduceFunction(), 4L, 3L, 4L, 3L, new TupleKeySelector(0),
new TupleKeySelector(0));
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(result);
Collections.sort(expected);
@@ -146,7 +146,7 @@ public class CoGroupedBatchReduceTest {
new MyCoReduceFunction(), 4L, 3L, 2L, 2L, new TupleKeySelector(0),
new TupleKeySelector(0));
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(result);
Collections.sort(expected);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
index 4570e23..15d42a4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
@@ -77,7 +77,7 @@ public class CoGroupedReduceTest {
List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
"7");
- List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+ List<String> actualList = MockCoContext.createAndExecute(invokable,
Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
assertEquals(expected, actualList);
@@ -87,7 +87,7 @@ public class CoGroupedReduceTest {
expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
- actualList = MockCoInvokable.createAndExecute(invokable,
+ actualList = MockCoContext.createAndExecute(invokable,
Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
assertEquals(expected, actualList);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index f36a7b5..b5a7e8d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
@@ -129,7 +129,7 @@ public class CoGroupedWindowReduceTest {
new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(result);
Collections.sort(expected);
@@ -182,7 +182,7 @@ public class CoGroupedWindowReduceTest {
new TupleKeySelector( 0), new MyTimeStamp<Tuple2<String, Integer>>(
timestamps1), new MyTimeStamp<Tuple2<String, String>>(timestamps2));
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs1, inputs2);
Collections.sort(result);
Collections.sort(expected);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 9c62aec..93d1741 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.junit.Test;
public class CoMapTest implements Serializable {
@@ -50,7 +50,7 @@ public class CoMapTest implements Serializable {
CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
- List<String> actualList = MockCoInvokable.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
+ List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
assertEquals(expectedList, actualList);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
index 996320a..3343ba0 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoStreamReduceTest.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.junit.Test;
public class CoStreamReduceTest {
@@ -62,7 +62,7 @@ public class CoStreamReduceTest {
new MyCoReduceFunction());
List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
- List<Integer> result = MockCoInvokable.createAndExecute(coReduce,
+ List<Integer> result = MockCoContext.createAndExecute(coReduce,
Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8"));
assertEquals(expected1, result);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 4604b27..90ad483 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -28,7 +28,7 @@ import java.util.List;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.junit.Test;
public class CoWindowReduceTest {
@@ -114,7 +114,7 @@ public class CoWindowReduceTest {
expected.add("abcde");
expected.add("fghi");
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
Collections.sort(result);
Collections.sort(expected);
@@ -160,7 +160,7 @@ public class CoWindowReduceTest {
expected.add("fgh");
expected.add("hi");
- List<String> result = MockCoInvokable.createAndExecute(invokable, inputs, inputs2);
+ List<String> result = MockCoContext.createAndExecute(invokable, inputs, inputs2);
Collections.sort(result);
Collections.sort(expected);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
index ebdd963..c6d446a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.streaming.util.MockCoContext;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -148,7 +148,7 @@ public class CoWindowTest {
expected1.add(0);
expected1.add(1);
- List<Integer> actual1 = MockCoInvokable.createAndExecute(invokable1, input11, input12);
+ List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
assertEquals(expected1, actual1);
CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
@@ -183,7 +183,7 @@ public class CoWindowTest {
expected2.add(8);
expected2.add(7);
- List<Integer> actual2 = MockCoInvokable.createAndExecute(invokable2, input21, input22);
+ List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
assertEquals(expected2, actual2);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c5e9a512/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
index 70b9d7e..969a06b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class CounterInvokableTest {
@@ -32,7 +32,7 @@ public class CounterInvokableTest {
CounterInvokable<String> invokable = new CounterInvokable<String>();
List<Long> expected = Arrays.asList(1L, 2L, 3L);
- List<Long> actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
+ List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
assertEquals(expected, actual);
}