You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:43 UTC
[10/12] git commit: [streaming] added batchReduceGroup,
windowReduceGroup functionality to ConnectedDataStream and
GroupedConnectedDataStream
[streaming] added batchReduceGroup, windowReduceGroup functionality to ConnectedDataStream and GroupedConnectedDataStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ad983377
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ad983377
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ad983377
Branch: refs/heads/master
Commit: ad983377dcc297a137f74a5895b8336e446ef864
Parents: 7337110
Author: szape <ne...@gmail.com>
Authored: Fri Sep 12 11:26:18 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 249 ++++++++++++++++-
.../datastream/GroupedConnectedDataStream.java | 268 ++++++++++++++++++-
.../co/CoBatchGroupReduceInvokable.java | 2 +-
.../flink/streaming/state/CircularFifoList.java | 15 +-
.../flink/streaming/state/StreamIterator.java | 5 +-
5 files changed, 511 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 0499c4c..256f470 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -27,13 +27,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
@@ -59,7 +65,7 @@ public class ConnectedDataStream<IN1, IN2> {
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
this.input1 = input1.copy();
- this.input2 = input2.copy();
+ this.input2 = input2.copy();
}
/**
@@ -82,20 +88,22 @@ public class ConnectedDataStream<IN1, IN2> {
/**
* Gets the type of the first input
+ *
* @return The type of the first input
*/
public TypeInformation<IN1> getInputType1() {
return input1.getOutputType();
}
-
+
/**
* Gets the type of the second input
+ *
* @return The type of the second input
*/
public TypeInformation<IN2> getInputType2() {
return input2.getOutputType();
}
-
+
/**
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Used for
@@ -174,12 +182,15 @@ public class ConnectedDataStream<IN1, IN2> {
}
/**
- * Applies a reduce transformation on both input of a
- * {@link ConnectedDataStream} and maps the output to a common type. The
- * transformation calls {@link CoReduceFunction#reduce1} and
- * {@link CoReduceFunction#map1} for each element of the first input and
- * {@link CoReduceFunction#reduce2} and {@link CoReduceFunction#map2} for
- * each element of the second input.
+ * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
+ * the outputs to a common type. The transformation calls
+ * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+ * each element of the first input and {@link CoReduceFunction#reduce2} and
+ * {@link CoReduceFunction#map2} for each element of the second input. This
+ * type of reduce is much faster than reduceGroup since the reduce function
+ * can be applied incrementally. The user can also extend the
+ * {@link RichCoReduceFunction} to gain access to other features provided by
+ * the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every
@@ -199,6 +210,226 @@ public class ConnectedDataStream<IN1, IN2> {
new CoStreamReduceInvokable<IN1, IN2, OUT>(coReducer));
}
+ /**
+ * Applies a reduceGroup transformation on the preset batches of the inputs
+ * of a {@link ConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+ * input. Each {@link CoGroupReduceFunction} call can return any number of
+ * elements including none. When the reducer has ran for all the values of a
+ * batch, the batch is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param batchSize1
+ * The number of elements in a batch of the first input.
+ * @param batchSize2
+ * The number of elements in a batch of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2) {
+ return batchReduceGroup(coReducer, batchSize1, batchSize2, batchSize1, batchSize2);
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset batches of the inputs
+ * of a {@link ConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+ * input. Each {@link CoGroupReduceFunction} call can return any number of
+ * elements including none. When the reducer has ran for all the values of a
+ * batch, the batch is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param batchSize1
+ * The number of elements in a batch of the first input.
+ * @param batchSize2
+ * The number of elements in a batch of the second input.
+ * @param slideSize1
+ * The number of elements a batch of the first input is slid by.
+ * @param slideSize2
+ * The number of elements a batch of the second input is slid by.
+ * @return The transformed {@link DataStream}.
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2,
+ long slideSize1, long slideSize2) {
+
+ if (batchSize1 < 1 || batchSize2 < 1) {
+ throw new IllegalArgumentException("Batch size must be positive");
+ }
+ if (slideSize1 < 1 || slideSize2 < 1) {
+ throw new IllegalArgumentException("Slide size must be positive");
+ }
+ if (batchSize1 < slideSize1 || batchSize2 < slideSize2) {
+ throw new IllegalArgumentException("Batch size must be at least slide size");
+ }
+
+ FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+ CoGroupReduceFunction.class, 0);
+ FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+ CoGroupReduceFunction.class, 1);
+ FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+ CoGroupReduceFunction.class, 2);
+
+ return addCoFunction("coBatchReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+ outTypeWrapper, new CoBatchGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+ batchSize1, batchSize2, slideSize1, slideSize2));
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link ConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. Each {@link CoGroupReduceFunction} call can return any number of
+ * elements including none. When the reducer has ran for all the values of a
+ * window, the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2) {
+ return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2);
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link ConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. Each {@link CoGroupReduceFunction} call can return any number of
+ * elements including none. When the reducer has ran for all the values of a
+ * window, the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @param slideInterval1
+ * The time interval a window of the first input is slid by.
+ * @param slideInterval2
+ * The time interval a window of the second input is slid by.
+ * @return The transformed {@link DataStream}.
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+ long slideInterval1, long slideInterval2) {
+ return windowReduceGroup(coReducer, windowSize1, windowSize2, slideInterval1,
+ slideInterval2, new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link ConnectedDataStream}, where the time is provided by
+ * timestamps. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. Each {@link CoGroupReduceFunction} call can return any number of
+ * elements including none. When the reducer has ran for all the values of a
+ * window, the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @param timestamp1
+ * The predefined timestamp function of the first input.
+ * @param timestamp2
+ * The predefined timestamp function of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+ TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2,
+ timestamp1, timestamp2);
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link ConnectedDataStream}, where the time is provided by
+ * timestamps. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. Each {@link CoGroupReduceFunction} call can return any number of
+ * elements including none. When the reducer has ran for all the values of a
+ * window, the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @param slideInterval1
+ * The time interval a window of the first input is slid by.
+ * @param slideInterval2
+ * The time interval a window of the second input is slid by.
+ * @param timestamp1
+ * The predefined timestamp function of the first input.
+ * @param timestamp2
+ * The predefined timestamp function of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+ long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
+ TimeStamp<IN2> timestamp2) {
+
+ if (windowSize1 < 1 || windowSize2 < 1) {
+ throw new IllegalArgumentException("Window size must be positive");
+ }
+ if (slideInterval1 < 1 || slideInterval2 < 1) {
+ throw new IllegalArgumentException("Slide interval must be positive");
+ }
+ if (windowSize1 < slideInterval1 || windowSize2 < slideInterval2) {
+ throw new IllegalArgumentException("Window size must be at least slide interval");
+ }
+
+ FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+ CoGroupReduceFunction.class, 0);
+ FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+ CoGroupReduceFunction.class, 1);
+ FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+ CoGroupReduceFunction.class, 2);
+
+ return addCoFunction("coWindowReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+ outTypeWrapper, new CoWindowGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+ windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
+ timestamp2));
+ }
+
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
TypeSerializerWrapper<IN2> in2TypeWrapper, TypeSerializerWrapper<OUT> outTypeWrapper,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
index 59dd2fa..0b6a7b5 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
@@ -19,8 +19,14 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
@@ -37,20 +43,24 @@ public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN
}
/**
- * Applies a CoReduce transformation on a {@link ConnectedDataStream}
- * grouped by the given key position and maps the output to a common type.
- * The {@link CoReduceFunction} will receive input values based on the key
- * positions. The transformation calls {@link CoReduceFunction#reduce1} and
- * {@link CoReduceFunction#map1} for each element of the first input and
- * {@link CoReduceFunction#reduce2} and {@link CoReduceFunction#map2} for
- * each element of the second input. For each input, only values with the
- * same key will go to the same reducer.
+ * Applies a reduce transformation on a {@link GroupedConnectedDataStream},
+ * and maps the outputs to a common type. The transformation calls
+ * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
+ * each element of the first input and {@link CoReduceFunction#reduce2} and
+ * {@link CoReduceFunction#map2} for each element of the second input. For
+ * both inputs, the reducer is applied on every group of elements sharing
+ * the same key at the respective position. This type of reduce is much
+ * faster than reduceGroup since the reduce function can be applied
+ * incrementally. The user can also extend the {@link RichCoReduceFunction}
+ * to gain access to other features provided by the {@link RichFuntion}
+ * interface.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every
- * element with the same key of each input DataStream.
- * @return The transformed DataStream.
+ * element of the inputs.
+ * @return The transformed {@link DataStream}.
*/
+ @Override
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
@@ -64,4 +74,242 @@ public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN
new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
}
+ /**
+ * Applies a reduceGroup transformation on the preset batches of the inputs
+ * of a {@link GroupedConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+ * input. For both inputs, the reducer is applied on every group of elements
+ * of every batch sharing the same key at the respective position. Each
+ * {@link CoGroupReduceFunction} call can return any number of elements
+ * including none. When the reducer has ran for all the values of a batch,
+ * the batch is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param batchSize1
+ * The number of elements in a batch of the first input.
+ * @param batchSize2
+ * The number of elements in a batch of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ @Override
+ public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2) {
+ return batchReduceGroup(coReducer, batchSize1, batchSize2, batchSize1, batchSize2);
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset batches of the inputs
+ * of a {@link GroupedConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each batch of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each batch of the second
+ * input. For both inputs, the reducer is applied on every group of elements
+ * of every batch sharing the same key at the respective position. Each
+ * {@link CoGroupReduceFunction} call can return any number of elements
+ * including none. When the reducer has ran for all the values of a batch,
+ * the batch is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param batchSize1
+ * The number of elements in a batch of the first input.
+ * @param batchSize2
+ * The number of elements in a batch of the second input.
+ * @param slideSize1
+ * The number of elements a batch of the first input is slid by.
+ * @param slideSize2
+ * The number of elements a batch of the second input is slid by.
+ * @return The transformed {@link DataStream}.
+ */
+ @Override
+ public <OUT> SingleOutputStreamOperator<OUT, ?> batchReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1, long batchSize2,
+ long slideSize1, long slideSize2) {
+
+ if (batchSize1 < 1 || batchSize2 < 1) {
+ throw new IllegalArgumentException("Batch size must be positive");
+ }
+ if (slideSize1 < 1 || slideSize2 < 1) {
+ throw new IllegalArgumentException("Slide size must be positive");
+ }
+ if (batchSize1 < slideSize1 || batchSize2 < slideSize2) {
+ throw new IllegalArgumentException("Batch size must be at least slide size");
+ }
+
+ FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+ CoGroupReduceFunction.class, 0);
+ FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+ CoGroupReduceFunction.class, 1);
+ FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+ CoGroupReduceFunction.class, 2);
+
+ return addCoFunction("coBatchReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+ outTypeWrapper, new CoGroupedBatchGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+ batchSize1, batchSize2, slideSize1, slideSize2, keyPosition1, keyPosition2));
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link GroupedConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. For both inputs, the reducer is applied on every group of elements
+ * of every window sharing the same key at the respective position. Each
+ * {@link CoGroupReduceFunction} call can return any number of elements
+ * including none. When the reducer has ran for all the values of a window,
+ * the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ @Override
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2) {
+ return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2);
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link GroupedConnectedDataStream}. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. For both inputs, the reducer is applied on every group of elements
+ * of every window sharing the same key at the respective position. Each
+ * {@link CoGroupReduceFunction} call can return any number of elements
+ * including none. When the reducer has ran for all the values of a window,
+ * the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @param slideInterval1
+ * The time interval a window of the first input is slid by.
+ * @param slideInterval2
+ * The time interval a window of the second input is slid by.
+ * @return The transformed {@link DataStream}.
+ */
+ @Override
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+ long slideInterval1, long slideInterval2) {
+ return windowReduceGroup(coReducer, windowSize1, windowSize2, slideInterval1,
+ slideInterval2, new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link GroupedConnectedDataStream}, where the time is
+ * provided by timestamps. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. For both inputs, the reducer is applied on every group of elements
+ * of every window sharing the same key at the respective position. Each
+ * {@link CoGroupReduceFunction} call can return any number of elements
+ * including none. When the reducer has ran for all the values of a window,
+ * the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @param timestamp1
+ * The predefined timestamp function of the first input.
+ * @param timestamp2
+ * The predefined timestamp function of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ @Override
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+ TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ return windowReduceGroup(coReducer, windowSize1, windowSize2, windowSize1, windowSize2,
+ timestamp1, timestamp2);
+ }
+
+ /**
+ * Applies a reduceGroup transformation on the preset time windows of the
+ * inputs of a {@link GroupedConnectedDataStream}, where the time is
+ * provided by timestamps. The transformation calls
+ * {@link CoGroupReduceFunction#reduce1} for each window of the first input
+ * and {@link CoGroupReduceFunction#reduce2} for each window of the second
+ * input. For both inputs, the reducer is applied on every group of elements
+ * of every window sharing the same key at the respective position. Each
+ * {@link CoGroupReduceFunction} call can return any number of elements
+ * including none. When the reducer has ran for all the values of a window,
+ * the window is slid forward. The user can also extend
+ * {@link RichCoGroupReduceFunction} to gain access to other features
+ * provided by the {@link RichFuntion} interface.
+ *
+ * @param coReducer
+ * The {@link CoGroupReduceFunction} that will be called for
+ * every batch of each input.
+ * @param windowSize1
+ * The size of the time window of the first input.
+ * @param windowSize2
+ * The size of the time window of the second input.
+ * @param slideInterval1
+ * The time interval a window of the first input is slid by.
+ * @param slideInterval2
+ * The time interval a window of the second input is slid by.
+ * @param timestamp1
+ * The predefined timestamp function of the first input.
+ * @param timestamp2
+ * The predefined timestamp function of the second input.
+ * @return The transformed {@link DataStream}.
+ */
+ @Override
+ public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1, long windowSize2,
+ long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
+ TimeStamp<IN2> timestamp2) {
+
+ if (windowSize1 < 1 || windowSize2 < 1) {
+ throw new IllegalArgumentException("Window size must be positive");
+ }
+ if (slideInterval1 < 1 || slideInterval2 < 1) {
+ throw new IllegalArgumentException("Slide interval must be positive");
+ }
+ if (windowSize1 < slideInterval1 || windowSize2 < slideInterval2) {
+ throw new IllegalArgumentException("Window size must be at least slide interval");
+ }
+
+ FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
+ CoGroupReduceFunction.class, 0);
+ FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
+ CoGroupReduceFunction.class, 1);
+ FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
+ CoGroupReduceFunction.class, 2);
+
+ return addCoFunction("coWindowReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
+ outTypeWrapper, new CoGroupedWindowGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
+ windowSize1, windowSize2, slideInterval1, slideInterval2, keyPosition1,
+ keyPosition2, timestamp1, timestamp2));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
index 334caa7..07e3d6c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
@@ -21,8 +21,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
public class CoBatchGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
-
private static final long serialVersionUID = 1L;
+
protected long startCounter1;
protected long startCounter2;
protected long endCounter1;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
index 1a6d678..23dcb57 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
@@ -17,8 +17,7 @@
package org.apache.flink.streaming.state;
-import java.util.ArrayDeque;
-import java.util.Deque;
+import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
@@ -28,14 +27,16 @@ import java.util.Queue;
* queue if full and a new element is added, the elements that belong to the
* first sliding interval are removed.
*/
-public class CircularFifoList<T> {
+public class CircularFifoList<T> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
private Queue<T> queue;
- private Deque<Long> slideSizes;
+ private Queue<Long> slideSizes;
private long counter;
public CircularFifoList() {
this.queue = new LinkedList<T>();
- this.slideSizes = new ArrayDeque<Long>();
+ this.slideSizes = new LinkedList<Long>();
this.counter = 0;
}
@@ -50,10 +51,10 @@ public class CircularFifoList<T> {
}
public void shiftWindow() {
- for (int i = 0; i < slideSizes.getFirst(); i++) {
+ Long firstSlideSize = slideSizes.remove();
+ for (int i = 0; i < firstSlideSize; i++) {
queue.remove();
}
- slideSizes.remove();
}
public Iterator<T> getIterator() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ad983377/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
index 7a7a07a..ed1cd80 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.state;
+import java.io.Serializable;
import java.util.Iterator;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -25,7 +26,9 @@ import org.apache.flink.streaming.api.streamrecord.StreamRecord;
* Simple wrapper class to convert an Iterator<StreamRecord<T>> to an
* Iterator<T> iterator by invoking the getObject() method on every element.
*/
-public class StreamIterator<T> implements Iterator<T> {
+public class StreamIterator<T> implements Iterator<T>, Serializable {
+ private static final long serialVersionUID = 1L;
+
private Iterator<StreamRecord<T>> iterator = null;
public void load(Iterator<StreamRecord<T>> iterator) {