You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:52 UTC
[15/51] [abbrv] git commit: [streaming] Operator invokable refactor
[streaming] Operator invokable refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/be459aec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/be459aec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/be459aec
Branch: refs/heads/master
Commit: be459aece580ea0f4ea9e028cf29b15d7d7f33f4
Parents: 1fccb10
Author: gyfora <gy...@gmail.com>
Authored: Wed Jul 23 14:53:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:18 2014 +0200
----------------------------------------------------------------------
.../apache/flink/streaming/api/DataStream.java | 31 +++-
.../streaming/api/invokable/SinkInvokable.java | 21 +--
.../api/invokable/StreamRecordInvokable.java | 23 ++-
.../operator/BatchReduceInvokable.java | 156 ++++---------------
.../api/invokable/operator/FilterInvokable.java | 26 ++--
.../invokable/operator/FlatMapInvokable.java | 22 +--
.../api/invokable/operator/MapInvokable.java | 21 +--
.../operator/StreamReduceInvokable.java | 32 ++++
.../operator/WindowReduceInvokable.java | 125 +++++++++++++++
9 files changed, 276 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
index d32aa18..27e4d89 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
@@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
@@ -292,7 +293,7 @@ public class DataStream<T extends Tuple> {
public DataStream<T> forward() {
return setConnectionType(new ForwardPartitioner<T>());
}
-
+
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are distributed evenly to the next component.
@@ -332,6 +333,27 @@ public class DataStream<T extends Tuple> {
}
/**
+ * Applies a CoMap transformation on two separate {@link DataStream}s. The
+ * transformation calls a {@link CoMapFunction#map1(Tuple)} for each element
+ * of the first DataStream (on which .coMapWith was called) and
+ * {@link CoMapFunction#map2(Tuple)} for each element of the second
+ * DataStream. Each CoMapFunction call returns exactly one element.
+ *
+ * @param coMapper
+ * The CoMapFunction used to jointly transform the two input
+ * DataStreams
+ * @param otherStream
+ * The DataStream that will be transformed with
+ * {@link CoMapFunction#map2(Tuple)}
+ * @return The transformed DataStream
+ */
+ public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(
+ CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
+ return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(
+ otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
+ }
+
+ /**
* Applies a FlatMap transformation on a {@link DataStream}. The
* transformation calls a FlatMapFunction for each element of the
* DataStream. Each FlatMapFunction call can return any number of elements
@@ -387,11 +409,6 @@ public class DataStream<T extends Tuple> {
new BatchReduceInvokable<T, R>(reducer, batchSize));
}
- public <T2 extends Tuple, R extends Tuple> DataStream<R> coMapWith(CoMapFunction<T, T2, R> coMapper, DataStream<T2> otherStream) {
- return environment.addCoFunction("coMap", new DataStream<T>(this), new DataStream<T2>(otherStream), coMapper, new CoMapInvokable<T, T2, R>(coMapper));
- }
-
-
/**
* Applies a reduce transformation on preset "time" chunks of the
* DataStream. The transformation calls a {@link GroupReduceFunction} on
@@ -411,7 +428,7 @@ public class DataStream<T extends Tuple> {
public <R extends Tuple> StreamOperator<T, R> windowReduce(GroupReduceFunction<T, R> reducer,
long windowSize) {
return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
- new BatchReduceInvokable<T, R>(reducer, windowSize));
+ new WindowReduceInvokable<T, R>(reducer, windowSize));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 3c14490..81cfa81 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -32,16 +32,17 @@ public class SinkInvokable<IN extends Tuple> extends StreamRecordInvokable<IN, I
}
@Override
- public void invoke() throws Exception {
- if (this.isMutable) {
- while (recordIterator.next(reuse) != null) {
- sinkFunction.invoke((IN) reuse.getTuple());
- }
- } else {
- while (recordIterator.next(reuse) != null) {
- sinkFunction.invoke((IN) reuse.getTuple());
- resetReuse();
- }
+ protected void immutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ sinkFunction.invoke((IN) reuse.getTuple());
+ resetReuse();
+ }
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ sinkFunction.invoke((IN) reuse.getTuple());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
index 903372b..6beec27 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamRecordInvokable.java
@@ -19,6 +19,8 @@
package org.apache.flink.streaming.api.invokable;
+import java.io.IOException;
+
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -49,6 +51,25 @@ public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple>
protected void resetReuse() {
this.reuse = serializer.createInstance();
}
+
+ protected StreamRecord<IN> loadNextRecord() {
+ try {
+ reuse = recordIterator.next(reuse);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return reuse;
+ }
+
+ protected abstract void immutableInvoke() throws Exception;
- public abstract void invoke() throws Exception;
+ protected abstract void mutableInvoke() throws Exception;
+
+ public void invoke() throws Exception {
+ if (this.isMutable) {
+ mutableInvoke();
+ } else {
+ immutableInvoke();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
old mode 100644
new mode 100755
index 7684f70..4aa540c
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -19,109 +19,53 @@
package org.apache.flink.streaming.api.invokable.operator;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
- UserTaskInvokable<IN, OUT> {
+ StreamReduceInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private int batchSize;
- private long windowSize;
- volatile boolean isRunning;
- boolean window;
-
- private GroupReduceFunction<IN, OUT> reducer;
public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, int batchSize) {
this.reducer = reduceFunction;
this.batchSize = batchSize;
}
- public BatchReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
- this.reducer = reduceFunction;
- this.windowSize = windowSize;
- this.window = true;
- }
-
- private StreamRecord<IN> loadNextRecord() {
- try {
- reuse = recordIterator.next(reuse);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return reuse;
- }
-
@Override
- public void invoke() throws Exception {
- if (this.isMutable) {
- mutableInvoke();
- } else {
- immutableInvoke();
- }
- }
-
- private void immutableInvoke() throws Exception {
+ protected void immutableInvoke() throws Exception {
List<IN> tupleBatch = new ArrayList<IN>();
boolean batchStart;
-
- if (window) {
- long startTime = System.currentTimeMillis();
- while (loadNextRecord() != null) {
- batchStart = true;
- do {
- if (batchStart) {
- batchStart = false;
- } else {
- reuse = loadNextRecord();
- if (reuse == null) {
- break;
- }
+ int counter = 0;
+
+ while (loadNextRecord() != null) {
+ batchStart = true;
+ do {
+ if (batchStart) {
+ batchStart = false;
+ } else {
+ reuse = loadNextRecord();
+ if (reuse == null) {
+ break;
}
- tupleBatch.add(reuse.getTuple());
- resetReuse();
- } while (System.currentTimeMillis() - startTime < windowSize);
- reducer.reduce(tupleBatch.iterator(), collector);
- tupleBatch.clear();
- startTime = System.currentTimeMillis();
- }
- } else {
- int counter = 0;
- while (loadNextRecord() != null) {
- batchStart = true;
- do {
- if (batchStart) {
- batchStart = false;
- } else {
- reuse = loadNextRecord();
- if (reuse == null) {
- break;
- }
- }
- counter++;
- tupleBatch.add(reuse.getTuple());
- resetReuse();
- } while (counter < batchSize);
- reducer.reduce(tupleBatch.iterator(), collector);
- tupleBatch.clear();
- counter = 0;
- }
+ }
+ counter++;
+ tupleBatch.add(reuse.getTuple());
+ resetReuse();
+ } while (counter < batchSize);
+ reducer.reduce(tupleBatch.iterator(), collector);
+ tupleBatch.clear();
+ counter = 0;
}
+
}
- private void mutableInvoke() throws Exception {
- BatchIterator<IN> userIterator;
- if (window) {
- userIterator = new WindowIterator();
- } else {
- userIterator = new CounterIterator();
- }
+ @Override
+ protected void mutableInvoke() throws Exception {
+ BatchIterator<IN> userIterator = new CounterIterator();
do {
if (userIterator.hasNext()) {
@@ -131,7 +75,7 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
} while (reuse != null);
}
- public class CounterIterator implements BatchIterator<IN> {
+ private class CounterIterator implements BatchIterator<IN> {
private int counter;
private boolean loadedNext;
@@ -179,52 +123,4 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
}
- public class WindowIterator implements BatchIterator<IN> {
-
- volatile boolean iterate;
- private boolean loadedNext;
- private long startTime;
-
- public WindowIterator() {
- startTime = System.currentTimeMillis();
- }
-
- @Override
- public boolean hasNext() {
- if (System.currentTimeMillis() - startTime > windowSize) {
- return false;
- } else if (!loadedNext) {
- loadNextRecord();
- loadedNext = true;
- }
- return (reuse != null);
- }
-
- @Override
- public IN next() {
- if (hasNext()) {
- loadedNext = false;
- return reuse.getTuple();
- } else {
- loadedNext = false;
- return reuse.getTuple();
- }
- }
-
- public void reset() {
- while (System.currentTimeMillis() - startTime < windowSize) {
- loadNextRecord();
- }
- loadNextRecord();
- loadedNext = true;
- startTime = System.currentTimeMillis();
- }
-
- @Override
- public void remove() {
-
- }
-
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index ac79764..edeb79a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -34,21 +34,21 @@ public class FilterInvokable<IN extends Tuple> extends UserTaskInvokable<IN, IN>
}
@Override
- public void invoke() throws Exception {
- if (this.isMutable) {
- while (recordIterator.next(reuse) != null) {
- if (filterFunction.filter(reuse.getTuple())) {
- collector.collect(reuse.getTuple());
- }
- }
- } else {
- while (recordIterator.next(reuse) != null) {
- if (filterFunction.filter(reuse.getTuple())) {
- collector.collect(reuse.getTuple());
- }
- resetReuse();
+ protected void immutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ if (filterFunction.filter(reuse.getTuple())) {
+ collector.collect(reuse.getTuple());
}
+ resetReuse();
}
+ }
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ if (filterFunction.filter(reuse.getTuple())) {
+ collector.collect(reuse.getTuple());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 33bda80..279b160 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -33,16 +33,18 @@ public class FlatMapInvokable<IN extends Tuple, OUT extends Tuple> extends
this.flatMapper = flatMapper;
}
- public void invoke() throws Exception {
- if (this.isMutable) {
- while (recordIterator.next(reuse) != null) {
- flatMapper.flatMap(reuse.getTuple(), collector);
- }
- } else {
- while (recordIterator.next(reuse) != null) {
- flatMapper.flatMap(reuse.getTuple(), collector);
- resetReuse();
- }
+ @Override
+ protected void immutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ flatMapper.flatMap(reuse.getTuple(), collector);
+ resetReuse();
+ }
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ flatMapper.flatMap(reuse.getTuple(), collector);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index ff29d15..3c56b6f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -33,16 +33,17 @@ public class MapInvokable<IN extends Tuple, OUT extends Tuple> extends UserTaskI
}
@Override
- public void invoke() throws Exception {
- if (this.isMutable) {
- while (recordIterator.next(reuse) != null) {
- collector.collect(mapper.map(reuse.getTuple()));
- }
- } else {
- while (recordIterator.next(reuse) != null) {
- collector.collect(mapper.map(reuse.getTuple()));
- resetReuse();
- }
+ protected void immutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ collector.collect(mapper.map(reuse.getTuple()));
+ resetReuse();
+ }
+ }
+
+ @Override
+ protected void mutableInvoke() throws Exception {
+ while (recordIterator.next(reuse) != null) {
+ collector.collect(mapper.map(reuse.getTuple()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
new file mode 100644
index 0000000..e881d57
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -0,0 +1,32 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+
+public abstract class StreamReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+ UserTaskInvokable<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+ protected GroupReduceFunction<IN, OUT> reducer;
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/be459aec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
new file mode 100755
index 0000000..67c15dc
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java
@@ -0,0 +1,125 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.invokable.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+
+public class WindowReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
+ StreamReduceInvokable<IN, OUT> {
+ private static final long serialVersionUID = 1L;
+ private long windowSize;
+ volatile boolean isRunning;
+ boolean window;
+
+ public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long windowSize) {
+ this.reducer = reduceFunction;
+ this.windowSize = windowSize;
+ this.window = true;
+ }
+
+ protected void immutableInvoke() throws Exception {
+ List<IN> tupleBatch = new ArrayList<IN>();
+ boolean batchStart;
+
+ long startTime = System.currentTimeMillis();
+ while (loadNextRecord() != null) {
+ batchStart = true;
+ do {
+ if (batchStart) {
+ batchStart = false;
+ } else {
+ reuse = loadNextRecord();
+ if (reuse == null) {
+ break;
+ }
+ }
+ tupleBatch.add(reuse.getTuple());
+ resetReuse();
+ } while (System.currentTimeMillis() - startTime < windowSize);
+ reducer.reduce(tupleBatch.iterator(), collector);
+ tupleBatch.clear();
+ startTime = System.currentTimeMillis();
+ }
+
+ }
+
+ protected void mutableInvoke() throws Exception {
+ BatchIterator<IN> userIterator = new WindowIterator();
+
+ do {
+ if (userIterator.hasNext()) {
+ reducer.reduce(userIterator, collector);
+ userIterator.reset();
+ }
+ } while (reuse != null);
+ }
+
+ private class WindowIterator implements BatchIterator<IN> {
+
+ private boolean loadedNext;
+ private long startTime;
+
+ public WindowIterator() {
+ startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (System.currentTimeMillis() - startTime > windowSize) {
+ return false;
+ } else if (!loadedNext) {
+ loadNextRecord();
+ loadedNext = true;
+ }
+ return (reuse != null);
+ }
+
+ @Override
+ public IN next() {
+ if (hasNext()) {
+ loadedNext = false;
+ return reuse.getTuple();
+ } else {
+ loadedNext = false;
+ return reuse.getTuple();
+ }
+ }
+
+ public void reset() {
+ while (System.currentTimeMillis() - startTime < windowSize) {
+ loadNextRecord();
+ }
+ loadNextRecord();
+ loadedNext = true;
+ startTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void remove() {
+
+ }
+
+ }
+
+}
\ No newline at end of file