You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/20 15:10:59 UTC
[16/18] git commit: [streaming] Added CoBatchGroupReduceInvokable,
CoWindowGroupReduceInvokable and grouped variants
[streaming] Added CoBatchGroupReduceInvokable, CoWindowGroupReduceInvokable and grouped variants
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b6ffdbad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b6ffdbad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b6ffdbad
Branch: refs/heads/master
Commit: b6ffdbad1e6779d8c02e1d0ac77ae6da4ad4a216
Parents: d0dd513
Author: szape <ne...@gmail.com>
Authored: Wed Sep 10 10:41:28 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Sat Sep 20 13:44:12 2014 +0200
----------------------------------------------------------------------
.../api/datastream/ConnectedDataStream.java | 4 +-
.../datastream/GroupedConnectedDataStream.java | 4 +-
.../api/datastream/GroupedDataStream.java | 6 +-
.../operator/GroupReduceInvokable.java | 56 ----
.../operator/GroupedReduceInvokable.java | 56 ++++
.../co/CoBatchGroupReduceInvokable.java | 117 +++++++++
.../operator/co/CoGroupReduceInvokable.java | 140 ++++++----
.../co/CoGroupedBatchGroupReduceInvokable.java | 99 +++++++
.../operator/co/CoGroupedReduceInvokable.java | 85 ++++++
.../co/CoGroupedWindowGroupReduceInvokable.java | 102 ++++++++
.../api/invokable/operator/co/CoInvokable.java | 14 +-
.../operator/co/CoReduceInvokable.java | 70 -----
.../operator/co/CoStreamReduceInvokable.java | 70 +++++
.../co/CoWindowGroupReduceInvokable.java | 98 +++++++
.../flink/streaming/state/CircularFifoList.java | 67 +++++
.../flink/streaming/state/StreamIterator.java | 55 ++++
.../streaming/api/AggregationFunctionTest.java | 8 +-
.../operator/CoBatchGroupReduceTest.java | 96 +++++++
.../api/invokable/operator/CoFlatMapTest.java | 4 +-
.../invokable/operator/CoGroupReduceTest.java | 96 -------
.../operator/CoGroupedBatchGroupReduceTest.java | 116 +++++++++
.../invokable/operator/CoGroupedReduceTest.java | 94 +++++++
.../CoGroupedWindowGroupReduceTest.java | 223 ++++++++++++++++
.../api/invokable/operator/CoReduceTest.java | 71 -----
.../invokable/operator/CoStreamReduceTest.java | 71 +++++
.../operator/CoWindowGroupReduceTest.java | 256 +++++++++++++++++++
.../operator/GroupReduceInvokableTest.java | 54 ----
.../operator/GroupedReduceInvokableTest.java | 54 ++++
28 files changed, 1782 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index e00919f..9529dcd 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
import org.apache.flink.types.TypeInformation;
@@ -196,7 +196,7 @@ public class ConnectedDataStream<IN1, IN2> {
CoReduceFunction.class, 2);
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
- new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
+ new CoStreamReduceInvokable<IN1, IN2, OUT>(coReducer));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
index 8276561..8a16b98 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedConnectedDataStream.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
@@ -61,7 +61,7 @@ public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN
CoReduceFunction.class, 2);
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
- new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
+ new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 138a6f8..30826d3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
@@ -69,7 +69,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
- ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
+ ReduceFunction.class, 0), new GroupedReduceInvokable<OUT>(reducer, keyPosition));
}
/**
@@ -240,7 +240,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
@Override
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
- GroupReduceInvokable<OUT> invokable = new GroupReduceInvokable<OUT>(aggregate, keyPosition);
+ GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
null, null, invokable);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
deleted file mode 100755
index 73523d9..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupReduceInvokable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.state.MutableTableState;
-
-public class GroupReduceInvokable<IN> extends StreamReduceInvokable<IN> {
- private static final long serialVersionUID = 1L;
-
- private int keyPosition;
- private MutableTableState<Object, IN> values;
- private IN reduced;
-
- public GroupReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
- super(reducer);
- this.keyPosition = keyPosition;
- values = new MutableTableState<Object, IN>();
- }
-
- @Override
- protected void reduce() throws Exception {
- Object key = reuse.getField(keyPosition);
- currentValue = values.get(key);
- nextValue = reuse.getObject();
- if (currentValue != null) {
- callUserFunctionAndLogException();
- values.put(key, reduced);
- collector.collect(reduced);
- } else {
- values.put(key, nextValue);
- collector.collect(nextValue);
- }
- }
-
- @Override
- protected void callUserFunction() throws Exception {
- reduced = reducer.reduce(currentValue, nextValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
new file mode 100755
index 0000000..6423492
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private int keyPosition;
+ private MutableTableState<Object, IN> values;
+ private IN reduced;
+
+ public GroupedReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
+ super(reducer);
+ this.keyPosition = keyPosition;
+ values = new MutableTableState<Object, IN>();
+ }
+
+ @Override
+ protected void reduce() throws Exception {
+ Object key = reuse.getField(keyPosition);
+ currentValue = values.get(key);
+ nextValue = reuse.getObject();
+ if (currentValue != null) {
+ callUserFunctionAndLogException();
+ values.put(key, reduced);
+ collector.collect(reduced);
+ } else {
+ values.put(key, nextValue);
+ collector.collect(nextValue);
+ }
+ }
+
+ @Override
+ protected void callUserFunction() throws Exception {
+ reduced = reducer.reduce(currentValue, nextValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
new file mode 100644
index 0000000..1cdf8c7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoBatchGroupReduceInvokable.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+
+public class CoBatchGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+ protected long startCounter1;
+ protected long startCounter2;
+ protected long endCounter1;
+ protected long endCounter2;
+
+ public CoBatchGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
+ long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
+ super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
+ }
+
+ @Override
+ protected void handleStream1() throws Exception {
+ circularList1.add(reuse1);
+ if (windowStart1()) {
+ circularList1.newSlide();
+ }
+ if (windowEnd1()) {
+ reduce1();
+ circularList1.shiftWindow();
+ }
+ }
+
+ @Override
+ protected void handleStream2() throws Exception {
+ circularList2.add(reuse2);
+ if (windowStart2()) {
+ circularList2.newSlide();
+ }
+ if (windowEnd2()) {
+ reduce2();
+ circularList2.shiftWindow();
+ }
+ }
+
+ @Override
+ protected boolean windowStart1() throws Exception {
+ if (startCounter1 == slideInterval1) {
+ startCounter1 = 0;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean windowStart2() throws Exception {
+ if (startCounter2 == slideInterval2) {
+ startCounter2 = 0;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean windowEnd1() throws Exception {
+ if (endCounter1 == windowSize1) {
+ endCounter1 -= slideInterval1;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean windowEnd2() throws Exception {
+ if (endCounter2 == windowSize2) {
+ endCounter2 -= slideInterval2;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ startCounter1 = 0;
+ startCounter2 = 0;
+ endCounter1 = 0;
+ endCounter2 = 0;
+ }
+
+ @Override
+ protected void initialize1() {
+ startCounter1++;
+ endCounter1++;
+ }
+
+ @Override
+ protected void initialize2() {
+ startCounter2++;
+ endCounter2++;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
index ab827d8..9700c70 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -17,69 +17,123 @@
package org.apache.flink.streaming.api.invokable.operator.co;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.state.MutableTableState;
+import java.util.Iterator;
-public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.CircularFifoList;
+import org.apache.flink.streaming.state.StreamIterator;
+
+public abstract class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
- private int keyPosition1;
- private int keyPosition2;
- private MutableTableState<Object, IN1> values1;
- private MutableTableState<Object, IN2> values2;
- IN1 reduced1;
- IN2 reduced2;
-
- public CoGroupReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
- int keyPosition2) {
- super(coReducer);
- this.coReducer = coReducer;
- this.keyPosition1 = keyPosition1;
- this.keyPosition2 = keyPosition2;
- values1 = new MutableTableState<Object, IN1>();
- values2 = new MutableTableState<Object, IN2>();
+ protected CoGroupReduceFunction<IN1, IN2, OUT> coReducer;
+ protected StreamIterator<IN1> userIterator1;
+ protected StreamIterator<IN2> userIterator2;
+ protected Iterable<IN1> userIterable1;
+ protected Iterable<IN2> userIterable2;
+ protected long windowSize1;
+ protected long windowSize2;
+ protected long slideInterval1;
+ protected long slideInterval2;
+ protected CircularFifoList<StreamRecord<IN1>> circularList1;
+ protected CircularFifoList<StreamRecord<IN2>> circularList2;
+ protected long WindowStartTime1;
+ protected long WindowStartTime2;
+ protected long WindowEndTime1;
+ protected long WindowEndTime2;
+
+ public CoGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
+ long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
+ super(reduceFunction);
+ this.coReducer = reduceFunction;
+ this.userIterator1 = new StreamIterator<IN1>();
+ this.userIterator2 = new StreamIterator<IN2>();
+ this.windowSize1 = windowSize1;
+ this.windowSize2 = windowSize2;
+ this.slideInterval1 = slideInterval1;
+ this.slideInterval2 = slideInterval2;
+ this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
+ this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ throw new RuntimeException("Reducing mutable sliding batch is not supported.");
}
@Override
- public void handleStream1() throws Exception {
- Object key = reuse1.getField(keyPosition1);
- currentValue1 = values1.get(key);
- nextValue1 = reuse1.getObject();
- if (currentValue1 != null) {
- callUserFunctionAndLogException1();
- values1.put(key, reduced1);
- collector.collect(coReducer.map1(reduced1));
- } else {
- values1.put(key, nextValue1);
- collector.collect(coReducer.map1(nextValue1));
+ protected void handleStream1() throws Exception {
+ while (windowStart1()) {
+ circularList1.newSlide();
+ }
+ while (windowEnd1()) {
+ reduce1();
+ circularList1.shiftWindow();
}
+ circularList1.add(reuse1);
}
@Override
- public void handleStream2() throws Exception {
- Object key = reuse2.getField(keyPosition2);
- currentValue2 = values2.get(key);
- nextValue2 = reuse2.getObject();
- if (currentValue2 != null) {
- callUserFunctionAndLogException2();
- values2.put(key, reduced2);
- collector.collect(coReducer.map2(reduced2));
- } else {
- values2.put(key, nextValue2);
- collector.collect(coReducer.map2(nextValue2));
+ protected void handleStream2() throws Exception {
+ while (windowStart2()) {
+ circularList2.newSlide();
}
+ while (windowEnd2()) {
+ reduce2();
+ circularList2.shiftWindow();
+ }
+ circularList2.add(reuse2);
+ }
+
+ protected void reduce1() throws Exception {
+ userIterator1.load(circularList1.getIterator());
+ callUserFunctionAndLogException1();
+ }
+
+ protected void reduce2() throws Exception {
+ userIterator2.load(circularList2.getIterator());
+ callUserFunctionAndLogException2();
}
@Override
protected void callUserFunction1() throws Exception {
- reduced1 = coReducer.reduce1(currentValue1, nextValue1);
-
+ coReducer.reduce1(userIterable1, collector);
}
@Override
protected void callUserFunction2() throws Exception {
- reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+ coReducer.reduce2(userIterable2, collector);
+ }
+
+ protected abstract boolean windowStart1() throws Exception;
+
+ protected abstract boolean windowStart2() throws Exception;
+
+ protected abstract boolean windowEnd1() throws Exception;
+
+ protected abstract boolean windowEnd2() throws Exception;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ userIterable1 = new BatchIterable1();
+ userIterable2 = new BatchIterable2();
+ }
+
+ protected class BatchIterable1 implements Iterable<IN1> {
+ @Override
+ public Iterator<IN1> iterator() {
+ return userIterator1;
+ }
+ }
+ protected class BatchIterable2 implements Iterable<IN2> {
+ @Override
+ public Iterator<IN2> iterator() {
+ return userIterator2;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java
new file mode 100644
index 0000000..20679dd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedBatchGroupReduceInvokable.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupedBatchGroupReduceInvokable<IN1, IN2, OUT> extends
+ CoBatchGroupReduceInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private int keyPosition1;
+ private int keyPosition2;
+ private Iterator<StreamRecord<IN1>> iterator1;
+ private Iterator<StreamRecord<IN2>> iterator2;
+ private MutableTableState<Object, List<IN1>> values1;
+ private MutableTableState<Object, List<IN2>> values2;
+ private IN1 nextValue1;
+ private IN2 nextValue2;
+
+ public CoGroupedBatchGroupReduceInvokable(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReduceFunction, long batchSize1,
+ long batchSize2, long slideSize1, long slideSize2, int keyPosition1, int keyPosition2) {
+ super(coReduceFunction, batchSize1, batchSize2, slideSize1, slideSize2);
+ this.keyPosition1 = keyPosition1;
+ this.keyPosition2 = keyPosition2;
+ values1 = new MutableTableState<Object, List<IN1>>();
+ values2 = new MutableTableState<Object, List<IN2>>();
+ }
+
+ @Override
+ protected void reduce1() {
+ iterator1 = circularList1.getIterator();
+ while (iterator1.hasNext()) {
+ StreamRecord<IN1> nextRecord = iterator1.next();
+ Object key = nextRecord.getField(keyPosition1);
+ nextValue1 = nextRecord.getObject();
+
+ List<IN1> group = values1.get(key);
+ if (group != null) {
+ group.add(nextValue1);
+ } else {
+ group = new ArrayList<IN1>();
+ group.add(nextValue1);
+ values1.put(key, group);
+ }
+ }
+ for (List<IN1> group : values1.values()) {
+ userIterable1 = group;
+ callUserFunctionAndLogException1();
+ }
+ values1.clear();
+ }
+
+ @Override
+ protected void reduce2() {
+ iterator2 = circularList2.getIterator();
+ while (iterator2.hasNext()) {
+ StreamRecord<IN2> nextRecord = iterator2.next();
+ Object key = nextRecord.getField(keyPosition2);
+ nextValue2 = nextRecord.getObject();
+
+ List<IN2> group = values2.get(key);
+ if (group != null) {
+ group.add(nextValue2);
+ } else {
+ group = new ArrayList<IN2>();
+ group.add(nextValue2);
+ values2.put(key, group);
+ }
+ }
+ for (List<IN2> group : values2.values()) {
+ userIterable2 = group;
+ callUserFunctionAndLogException2();
+ }
+ values2.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
new file mode 100644
index 0000000..0daa1ea
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceInvokable.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupedReduceInvokable<IN1, IN2, OUT> extends CoStreamReduceInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private int keyPosition1;
+ private int keyPosition2;
+ private MutableTableState<Object, IN1> values1;
+ private MutableTableState<Object, IN2> values2;
+ IN1 reduced1;
+ IN2 reduced2;
+
+ public CoGroupedReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
+ int keyPosition2) {
+ super(coReducer);
+ this.coReducer = coReducer;
+ this.keyPosition1 = keyPosition1;
+ this.keyPosition2 = keyPosition2;
+ values1 = new MutableTableState<Object, IN1>();
+ values2 = new MutableTableState<Object, IN2>();
+ }
+
+ @Override
+ public void handleStream1() throws Exception {
+ Object key = reuse1.getField(keyPosition1);
+ currentValue1 = values1.get(key);
+ nextValue1 = reuse1.getObject();
+ if (currentValue1 != null) {
+ callUserFunctionAndLogException1();
+ values1.put(key, reduced1);
+ collector.collect(coReducer.map1(reduced1));
+ } else {
+ values1.put(key, nextValue1);
+ collector.collect(coReducer.map1(nextValue1));
+ }
+ }
+
+ @Override
+ public void handleStream2() throws Exception {
+ Object key = reuse2.getField(keyPosition2);
+ currentValue2 = values2.get(key);
+ nextValue2 = reuse2.getObject();
+ if (currentValue2 != null) {
+ callUserFunctionAndLogException2();
+ values2.put(key, reduced2);
+ collector.collect(coReducer.map2(reduced2));
+ } else {
+ values2.put(key, nextValue2);
+ collector.collect(coReducer.map2(nextValue2));
+ }
+ }
+
+ @Override
+ protected void callUserFunction1() throws Exception {
+ reduced1 = coReducer.reduce1(currentValue1, nextValue1);
+
+ }
+
+ @Override
+ protected void callUserFunction2() throws Exception {
+ reduced2 = coReducer.reduce2(currentValue2, nextValue2);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java
new file mode 100644
index 0000000..60cace6
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowGroupReduceInvokable.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.state.MutableTableState;
+
+public class CoGroupedWindowGroupReduceInvokable<IN1, IN2, OUT> extends
+ CoWindowGroupReduceInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ private int keyPosition1;
+ private int keyPosition2;
+ private Iterator<StreamRecord<IN1>> iterator1;
+ private Iterator<StreamRecord<IN2>> iterator2;
+ private MutableTableState<Object, List<IN1>> values1;
+ private MutableTableState<Object, List<IN2>> values2;
+ private IN1 nextValue1;
+ private IN2 nextValue2;
+
+ public CoGroupedWindowGroupReduceInvokable(
+ CoGroupReduceFunction<IN1, IN2, OUT> coReduceFunction, long windowSize1,
+ long windowSize2, long slideInterval1, long slideInterval2, int keyPosition1,
+ int keyPosition2, TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ super(coReduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2,
+ timestamp1, timestamp2);
+ this.keyPosition1 = keyPosition1;
+ this.keyPosition2 = keyPosition2;
+ values1 = new MutableTableState<Object, List<IN1>>();
+ values2 = new MutableTableState<Object, List<IN2>>();
+ }
+
+ @Override
+ protected void reduce1() {
+ iterator1 = circularList1.getIterator();
+ while (iterator1.hasNext()) {
+ StreamRecord<IN1> nextRecord = iterator1.next();
+ Object key = nextRecord.getField(keyPosition1);
+ nextValue1 = nextRecord.getObject();
+
+ List<IN1> group = values1.get(key);
+ if (group != null) {
+ group.add(nextValue1);
+ } else {
+ group = new ArrayList<IN1>();
+ group.add(nextValue1);
+ values1.put(key, group);
+ }
+ }
+ for (List<IN1> group : values1.values()) {
+ userIterable1 = group;
+ callUserFunctionAndLogException1();
+ }
+ values1.clear();
+ }
+
+ @Override
+ protected void reduce2() {
+ iterator2 = circularList2.getIterator();
+ while (iterator2.hasNext()) {
+ StreamRecord<IN2> nextRecord = iterator2.next();
+ Object key = nextRecord.getField(keyPosition2);
+ nextValue2 = nextRecord.getObject();
+
+ List<IN2> group = values2.get(key);
+ if (group != null) {
+ group.add(nextValue2);
+ } else {
+ group = new ArrayList<IN2>();
+ group.add(nextValue2);
+ values2.put(key, group);
+ }
+ }
+ for (List<IN2> group : values2.values()) {
+ userIterable2 = group;
+ callUserFunctionAndLogException2();
+ }
+ values2.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index b064df7..0e1f2b0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -85,9 +85,11 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
if (next == 0) {
break;
} else if (next == 1) {
+ initialize1();
handleStream1();
resetReuse1();
} else {
+ initialize2();
handleStream2();
resetReuse2();
}
@@ -100,8 +102,10 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
if (next == 0) {
break;
} else if (next == 1) {
+ initialize1();
handleStream1();
} else {
+ initialize2();
handleStream2();
}
}
@@ -114,7 +118,15 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
protected abstract void callUserFunction1() throws Exception;
protected abstract void callUserFunction2() throws Exception;
-
+
+ protected void initialize1() {
+
+ };
+
+ protected void initialize2() {
+
+ };
+
protected void callUserFunctionAndLogException1() {
try {
callUserFunction1();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
deleted file mode 100644
index 407f217..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-
-public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
- private static final long serialVersionUID = 1L;
-
- protected CoReduceFunction<IN1, IN2, OUT> coReducer;
- protected IN1 currentValue1 = null;
- protected IN2 currentValue2 = null;
- protected IN1 nextValue1 = null;
- protected IN2 nextValue2 = null;
-
- public CoReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
- super(coReducer);
- this.coReducer = coReducer;
- currentValue1 = null;
- currentValue2 = null;
- }
-
- @Override
- public void handleStream1() throws Exception {
- nextValue1 = reuse1.getObject();
- callUserFunctionAndLogException1();
- }
-
- @Override
- public void handleStream2() throws Exception {
- nextValue2 = reuse2.getObject();
- callUserFunctionAndLogException2();
- }
-
- @Override
- protected void callUserFunction1() throws Exception {
- if (currentValue1 != null) {
- currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
- } else {
- currentValue1 = nextValue1;
- }
- collector.collect(coReducer.map1(currentValue1));
- }
-
- @Override
- protected void callUserFunction2() throws Exception {
- if (currentValue2 != null) {
- currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
- } else {
- currentValue2 = nextValue2;
- }
- collector.collect(coReducer.map2(currentValue2));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
new file mode 100644
index 0000000..81a6a23
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceInvokable.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+
+public class CoStreamReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ protected CoReduceFunction<IN1, IN2, OUT> coReducer;
+ protected IN1 currentValue1 = null;
+ protected IN2 currentValue2 = null;
+ protected IN1 nextValue1 = null;
+ protected IN2 nextValue2 = null;
+
+ public CoStreamReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer) {
+ super(coReducer);
+ this.coReducer = coReducer;
+ currentValue1 = null;
+ currentValue2 = null;
+ }
+
+ @Override
+ public void handleStream1() throws Exception {
+ nextValue1 = reuse1.getObject();
+ callUserFunctionAndLogException1();
+ }
+
+ @Override
+ public void handleStream2() throws Exception {
+ nextValue2 = reuse2.getObject();
+ callUserFunctionAndLogException2();
+ }
+
+ @Override
+ protected void callUserFunction1() throws Exception {
+ if (currentValue1 != null) {
+ currentValue1 = coReducer.reduce1(currentValue1, nextValue1);
+ } else {
+ currentValue1 = nextValue1;
+ }
+ collector.collect(coReducer.map1(currentValue1));
+ }
+
+ @Override
+ protected void callUserFunction2() throws Exception {
+ if (currentValue2 != null) {
+ currentValue2 = coReducer.reduce2(currentValue2, nextValue2);
+ } else {
+ currentValue2 = nextValue2;
+ }
+ collector.collect(coReducer.map2(currentValue2));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java
new file mode 100644
index 0000000..49c73b5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowGroupReduceInvokable.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.co;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+
+public class CoWindowGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {
+ private static final long serialVersionUID = 1L;
+
+ protected long startTime1;
+ protected long startTime2;
+ protected long endTime1;
+ protected long endTime2;
+ protected long currentTime;
+ protected TimeStamp<IN1> timestamp1;
+ protected TimeStamp<IN2> timestamp2;
+
+ public CoWindowGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
+ long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
+ TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+ super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
+ this.timestamp1 = timestamp1;
+ this.timestamp2 = timestamp2;
+ startTime1 = timestamp1.getStartTime();
+ startTime2 = timestamp2.getStartTime();
+ endTime1 = startTime1 + windowSize1;
+ endTime2 = startTime2 + windowSize2;
+ }
+
+ @Override
+ protected boolean windowStart1() throws Exception {
+ if (currentTime - startTime1 >= slideInterval1) {
+ startTime1 += slideInterval1;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean windowStart2() throws Exception {
+ if (currentTime - startTime2 >= slideInterval2) {
+ startTime2 += slideInterval2;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean windowEnd1() throws Exception {
+ if (currentTime >= endTime1) {
+ endTime1 += slideInterval1;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected boolean windowEnd2() throws Exception {
+ if (currentTime >= endTime2) {
+ endTime2 += slideInterval2;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void initialize1() {
+ currentTime = timestamp1.getTimestamp(reuse1.getObject());
+ }
+
+ @Override
+ protected void initialize2() {
+ currentTime = timestamp2.getTimestamp(reuse2.getObject());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
new file mode 100644
index 0000000..46ccce7
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/CircularFifoList.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * A simple class, that manages a circular queue with sliding interval. If the
+ * queue if full and a new element is added, the elements that belong to the
+ * first sliding interval are removed.
+ */
+public class CircularFifoList<T> {
+ private Queue<T> queue;
+ private Deque<Long> slideSizes;
+ private long counter;
+
+ public CircularFifoList() {
+ this.queue = new LinkedList<T>();
+ this.slideSizes = new ArrayDeque<Long>();
+ this.counter = 0;
+ }
+
+ public void add(T element) {
+ queue.add(element);
+ counter++;
+ }
+
+ public void newSlide() {
+ slideSizes.add(counter);
+ counter = 0;
+ }
+
+ public void shiftWindow() {
+ for (int i = 0; i < slideSizes.getFirst(); i++) {
+ queue.remove();
+ }
+ slideSizes.remove();
+ }
+
+ public Iterator<T> getIterator() {
+ return queue.iterator();
+ }
+
+ @Override
+ public String toString() {
+ return queue.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
new file mode 100644
index 0000000..e3ba4b5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/StreamIterator.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.state;
+
+import java.util.Iterator;
+
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+
+/**
+ * Simple wrapper class to convert an Iterator<StreamRecord<T>> to an
+ * Iterator<T> iterator by invoking the getObject() method on every element.
+ */
+public class StreamIterator<T> implements Iterator<T> {
+ private Iterator<StreamRecord<T>> iterator = null;
+
+ public void load(Iterator<StreamRecord<T>> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ return iterator.next().getObject();
+ }
+
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+
+ @Override
+ public String toString() {
+ return iterator.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 3861aab..54264fe 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
-import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;
@@ -107,13 +107,13 @@ public class AggregationFunctionTest {
new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
List<Tuple2<Integer, Integer>> groupedSumList = MockInvokable.createAndExecute(
- new GroupReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
+ new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, 0), getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockInvokable.createAndExecute(
- new GroupReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
+ new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, 0), getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockInvokable.createAndExecute(
- new GroupReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
+ new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, 0), getInputList());
assertEquals(expectedSumList, sumList);
assertEquals(expectedMinList, minList);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java
new file mode 100644
index 0000000..470d956
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoBatchGroupReduceTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoBatchGroupReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoBatchGroupReduceTest {
+
+ public static final class MyCoGroupReduceFunction implements
+ CoGroupReduceFunction<Integer, String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce1(Iterable<Integer> values, Collector<String> out) throws Exception {
+ String gather = "";
+ for (Integer value : values) {
+ gather += value.toString();
+ }
+ out.collect(gather);
+ }
+
+ @Override
+ public void reduce2(Iterable<String> values, Collector<String> out) throws Exception {
+ String gather = "";
+ for (String value : values) {
+ gather += value;
+ }
+ out.collect(gather);
+ }
+ }
+
+ @Test
+ public void coBatchGroupReduceTest1() {
+
+ List<Integer> inputs1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ List<String> inputs2 = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
+
+ CoBatchGroupReduceInvokable<Integer, String, String> invokable = new CoBatchGroupReduceInvokable<Integer, String, String>(
+ new MyCoGroupReduceFunction(), 4L, 2L, 4L, 2L);
+
+ List<String> expected = Arrays.asList("1234", "5678", "ab", "cd", "ef", "gh");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ Collections.sort(expected);
+ Collections.sort(actualList);
+
+ assertEquals(expected, actualList);
+ }
+
+ @Test
+ public void coBatchGroupReduceTest2() {
+
+ List<Integer> inputs1 = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+
+ List<String> inputs2 = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h");
+
+ CoBatchGroupReduceInvokable<Integer, String, String> invokable = new CoBatchGroupReduceInvokable<Integer, String, String>(
+ new MyCoGroupReduceFunction(), 4L, 2L, 3L, 1L);
+
+ List<String> expected = Arrays.asList("1234", "4567", "78910", "ab", "bc", "cd", "de",
+ "ef", "fg", "gh");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ Collections.sort(expected);
+ Collections.sort(actualList);
+
+ assertEquals(expected, actualList);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index acf2f28..264b074 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -1,4 +1,5 @@
-/** Licensed to the Apache Software Foundation (ASF) under one or more
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -12,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.invokable.operator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
deleted file mode 100644
index bf9a772..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/** Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
-import org.junit.Test;
-
-public class CoGroupReduceTest {
-
- private final static class MyCoReduceFunction implements
- CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
- Tuple3<String, String, String> value2) {
- return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
- }
-
- @Override
- public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
- Tuple2<Integer, Integer> value2) {
- return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
- }
-
- @Override
- public String map1(Tuple3<String, String, String> value) {
- return value.f1;
- }
-
- @Override
- public String map2(Tuple2<Integer, Integer> value) {
- return value.f1.toString();
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void coGroupReduceTest() {
- Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
- Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
- Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
- Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
- Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
- Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
- Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
- Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
- CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), 0, 0);
-
- List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
- "7");
-
- List<String> actualList = MockCoInvokable.createAndExecute(invokable,
- Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
- assertEquals(expected, actualList);
-
-
- invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), 2, 0);
-
- expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5",
- "7");
-
- actualList = MockCoInvokable.createAndExecute(invokable,
- Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
- assertEquals(expected, actualList);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java
new file mode 100644
index 0000000..f150d30
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedBatchGroupReduceTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedBatchGroupReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoGroupedBatchGroupReduceTest {
+
+ public static final class MyCoGroupReduceFunction implements
+ CoGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce1(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
+ throws Exception {
+ String gather = "";
+ for (Tuple2<Integer, Integer> value : values) {
+ gather += value.f1.toString();
+ }
+ out.collect(gather);
+ }
+
+ @Override
+ public void reduce2(Iterable<Tuple2<Integer, String>> values, Collector<String> out)
+ throws Exception {
+ String gather = "";
+ for (Tuple2<Integer, String> value : values) {
+ gather += value.f1;
+ }
+ out.collect(gather);
+ }
+ }
+
+ final static int KEY_POSITION1 = 0;
+ final static int KEY_POSITION2 = 0;
+
+ @Test
+ public void coGroupedBatchGroupReduceTest1() {
+
+ List<Tuple2<Integer, Integer>> inputs1 = new ArrayList<Tuple2<Integer, Integer>>();
+ for (Integer i = 1; i <= 10; i++) {
+ inputs1.add(new Tuple2<Integer, Integer>(i % 3, i));
+ }
+
+ List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
+ for (char ch = 'a'; ch <= 'h'; ch++) {
+ inputs2.add(new Tuple2<Integer, String>(((int) ch) % 3, ch + ""));
+ }
+
+ CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> invokable = new CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String>(
+ new MyCoGroupReduceFunction(), 5L, 4L, 5L, 4L, KEY_POSITION1, KEY_POSITION2);
+
+ List<String> expected = Arrays.asList("14", "25", "3", "69", "710", "8", "ad", "b", "c",
+ "eh", "f", "g");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ Collections.sort(expected);
+ Collections.sort(actualList);
+
+ assertEquals(expected, actualList);
+ }
+
+ @Test
+ public void coGroupedBatchGroupReduceTest2() {
+
+ List<Tuple2<Integer, Integer>> inputs1 = new ArrayList<Tuple2<Integer, Integer>>();
+ for (Integer i = 1; i <= 10; i++) {
+ inputs1.add(new Tuple2<Integer, Integer>(i % 3, i));
+ }
+
+ List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
+ for (char ch = 'a'; ch <= 'h'; ch++) {
+ inputs2.add(new Tuple2<Integer, String>(((int) ch) % 3, ch + ""));
+ }
+
+ CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String> invokable = new CoGroupedBatchGroupReduceInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, String>, String>(
+ new MyCoGroupReduceFunction(), 6L, 6L, 3L, 2L, KEY_POSITION1, KEY_POSITION2);
+
+ List<String> expected = Arrays.asList("14", "25", "36", "47", "58", "69", "ad", "be", "cf",
+ "cf", "dg", "eh");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ Collections.sort(expected);
+ Collections.sort(actualList);
+
+ assertEquals(expected, actualList);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
new file mode 100644
index 0000000..9842d1c
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedReduceTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.function.co.CoReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.junit.Test;
+
+public class CoGroupedReduceTest {
+
+ private final static class MyCoReduceFunction implements
+ CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
+ Tuple3<String, String, String> value2) {
+ return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
+ Tuple2<Integer, Integer> value2) {
+ return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+ }
+
+ @Override
+ public String map1(Tuple3<String, String, String> value) {
+ return value.f1;
+ }
+
+ @Override
+ public String map2(Tuple2<Integer, Integer> value) {
+ return value.f1.toString();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void coGroupedReduceTest() {
+ Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
+ Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
+ Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
+ Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
+ Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
+ Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
+ Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
+ Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
+
+ CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+ new MyCoReduceFunction(), 0, 0);
+
+ List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
+ "7");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+ Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+ assertEquals(expected, actualList);
+
+ invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+ new MyCoReduceFunction(), 2, 0);
+
+ expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
+
+ actualList = MockCoInvokable.createAndExecute(invokable,
+ Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+ assertEquals(expected, actualList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java
new file mode 100644
index 0000000..b421284
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowGroupReduceTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.util.MockCoInvokable;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class CoGroupedWindowGroupReduceTest {
+
+ public static final class MyCoGroupReduceFunction1 implements
+ CoGroupReduceFunction<Character, Character, String> {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("unused")
+ @Override
+ public void reduce1(Iterable<Character> values, Collector<String> out) throws Exception {
+ Integer gather = 0;
+ Character ch = values.iterator().next();
+ for (Character value : values) {
+ gather++;
+ }
+ out.collect(ch + ":" + gather);
+ }
+
+ @SuppressWarnings("unused")
+ @Override
+ public void reduce2(Iterable<Character> values, Collector<String> out) throws Exception {
+ Integer gather = 0;
+ Character ch = values.iterator().next();
+ for (Character value : values) {
+ gather++;
+ }
+ out.collect(ch + ":" + gather);
+ }
+ }
+
+ public static final class MyCoGroupReduceFunction2 implements
+ CoGroupReduceFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce1(Iterable<Tuple2<String, Integer>> values, Collector<String> out)
+ throws Exception {
+ String gather = "";
+ for (Tuple2<String, Integer> value : values) {
+ gather += value.f0;
+ }
+ out.collect(gather);
+ }
+
+ @Override
+ public void reduce2(Iterable<Tuple2<Integer, Integer>> values, Collector<String> out)
+ throws Exception {
+ Integer gather = 0;
+ for (Tuple2<Integer, Integer> value : values) {
+ gather += value.f0;
+ }
+ out.collect(gather.toString());
+ }
+ }
+
+ public static final class MyTimeStamp<T> implements TimeStamp<T> {
+ private static final long serialVersionUID = 1L;
+
+ private Iterator<Long> timestamps;
+ private long start;
+
+ public MyTimeStamp(List<Long> timestamps) {
+ this.timestamps = timestamps.iterator();
+ this.start = timestamps.get(0);
+ }
+
+ @Override
+ public long getTimestamp(T value) {
+ long ts = timestamps.next();
+ return ts;
+ }
+
+ @Override
+ public long getStartTime() {
+ return start;
+ }
+ }
+
+ @Test
+ public void coGroupedWindowGroupReduceTest1() {
+
+ List<Character> inputs1 = new ArrayList<Character>();
+ inputs1.add('a');
+ inputs1.add('b');
+ inputs1.add('c');
+ inputs1.add('a');
+ inputs1.add('a');
+ inputs1.add('c');
+ inputs1.add('b');
+ inputs1.add('c');
+ inputs1.add('a');
+ inputs1.add('a');
+ inputs1.add('x');
+
+ List<Character> inputs2 = new ArrayList<Character>();
+ inputs2.add('a');
+ inputs2.add('d');
+ inputs2.add('d');
+ inputs2.add('e');
+ inputs2.add('d');
+ inputs2.add('e');
+ inputs2.add('e');
+ inputs2.add('a');
+ inputs2.add('a');
+ inputs2.add('x');
+
+ List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 8L, 10L, 11L);
+
+ List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L, 7L, 7L, 8L, 8L, 10L);
+
+ CoGroupedWindowGroupReduceInvokable<Character, Character, String> invokable = new CoGroupedWindowGroupReduceInvokable<Character, Character, String>(
+ new MyCoGroupReduceFunction1(), 5L, 5L, 3L, 5L, 0, 0, new MyTimeStamp<Character>(
+ timestamps1), new MyTimeStamp<Character>(timestamps2));
+
+ List<String> expected = new ArrayList<String>();
+ expected.add("a:3");
+ expected.add("b:2");
+ expected.add("c:3");
+ expected.add("c:1");
+ expected.add("a:2");
+ expected.add("a:1");
+ expected.add("a:2");
+ expected.add("d:3");
+ expected.add("e:3");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ Collections.sort(expected);
+ Collections.sort(actualList);
+
+ assertEquals(expected, actualList);
+ }
+
+ @Test
+ public void coGroupedWindowGroupReduceTest2() {
+
+ List<Tuple2<String, Integer>> inputs1 = new ArrayList<Tuple2<String, Integer>>();
+ inputs1.add(new Tuple2<String, Integer>("a", 1));
+ inputs1.add(new Tuple2<String, Integer>("b", 1));
+ inputs1.add(new Tuple2<String, Integer>("c", 0));
+ inputs1.add(new Tuple2<String, Integer>("d", 0));
+ inputs1.add(new Tuple2<String, Integer>("e", 1));
+ inputs1.add(new Tuple2<String, Integer>("f", 1));
+ inputs1.add(new Tuple2<String, Integer>("g", 0));
+ inputs1.add(new Tuple2<String, Integer>("h", 0));
+ inputs1.add(new Tuple2<String, Integer>("i", 1));
+ inputs1.add(new Tuple2<String, Integer>("j", 1));
+
+ List<Tuple2<Integer, Integer>> inputs2 = new ArrayList<Tuple2<Integer, Integer>>();
+ inputs2.add(new Tuple2<Integer, Integer>(1, 1));
+ inputs2.add(new Tuple2<Integer, Integer>(2, 2));
+ inputs2.add(new Tuple2<Integer, Integer>(3, 1));
+ inputs2.add(new Tuple2<Integer, Integer>(4, 2));
+ inputs2.add(new Tuple2<Integer, Integer>(5, 1));
+ inputs2.add(new Tuple2<Integer, Integer>(6, 2));
+ inputs2.add(new Tuple2<Integer, Integer>(7, 1));
+ inputs2.add(new Tuple2<Integer, Integer>(8, 2));
+ inputs2.add(new Tuple2<Integer, Integer>(9, 1));
+ inputs2.add(new Tuple2<Integer, Integer>(10, 2));
+
+ List<Long> timestamps1 = Arrays.asList(0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L, 4L, 7L);
+
+ List<Long> timestamps2 = Arrays.asList(0L, 5L, 5L, 6L, 6L, 7L, 7L, 8L, 8L, 10L);
+
+ CoGroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedWindowGroupReduceInvokable<Tuple2<String, Integer>, Tuple2<Integer, Integer>, String>(
+ new MyCoGroupReduceFunction2(), 2L, 4L, 2L, 2L, 1, 1,
+ new MyTimeStamp<Tuple2<String, Integer>>(timestamps1),
+ new MyTimeStamp<Tuple2<Integer, Integer>>(timestamps2));
+
+ List<String> expected = new ArrayList<String>();
+ expected.add("ab");
+ expected.add("cd");
+ expected.add("ef");
+ expected.add("gh");
+ expected.add("i");
+ expected.add("1");
+ expected.add("3");
+ expected.add("2");
+ expected.add("15");
+ expected.add("12");
+ expected.add("21");
+ expected.add("18");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, inputs1, inputs2);
+ Collections.sort(expected);
+ Collections.sort(actualList);
+
+ assertEquals(expected, actualList);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6ffdbad/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java
deleted file mode 100755
index 4f52444..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoReduceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/** Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoInvokable;
-import org.junit.Test;
-
-public class CoReduceTest {
-
- public static class MyCoReduceFunction implements CoReduceFunction<Integer, String, Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce1(Integer value1, Integer value2) {
- return value1 * value2;
- }
-
- @Override
- public String reduce2(String value1, String value2) {
- return value1 + value2;
- }
-
- @Override
- public Integer map1(Integer value) {
- return value;
- }
-
- @Override
- public Integer map2(String value) {
- return Integer.parseInt(value);
- }
-
- }
-
- @Test
- public void coGroupReduceTest() {
-
- CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>(
- new MyCoReduceFunction());
-
- List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
-
- assertEquals(
- expected1,
- (MockCoInvokable.createAndExecute(coReduce, Arrays.asList(1, 2, 3, 4),
- Arrays.asList("9", "9", "8"))));
-
- }
-}