You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:47 UTC
[10/51] [abbrv] git commit: [streaming] Updated operators for better
mutability handling
[streaming] Updated operators for better mutability handling
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/126a1cb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/126a1cb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/126a1cb7
Branch: refs/heads/master
Commit: 126a1cb7f02856feb11e653c74fd0b92880baa8f
Parents: 1edd031
Author: gyfora <gy...@gmail.com>
Authored: Wed Jul 23 12:44:29 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200
----------------------------------------------------------------------
.../operator/BatchReduceInvokable.java | 79 +++++++++++++++++---
.../api/invokable/operator/co/CoInvokable.java | 18 ++++-
.../invokable/operator/co/CoMapInvokable.java | 13 +++-
.../api/streamcomponent/CoStreamTask.java | 2 +-
4 files changed, 92 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 2d2d890..7684f70 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -20,6 +20,8 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.flink.api.java.functions.GroupReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
@@ -47,8 +49,73 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
this.window = true;
}
+ private StreamRecord<IN> loadNextRecord() {
+ try {
+ reuse = recordIterator.next(reuse);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return reuse;
+ }
+
@Override
public void invoke() throws Exception {
+ if (this.isMutable) {
+ mutableInvoke();
+ } else {
+ immutableInvoke();
+ }
+ }
+
+ private void immutableInvoke() throws Exception {
+ List<IN> tupleBatch = new ArrayList<IN>();
+ boolean batchStart;
+
+ if (window) {
+ long startTime = System.currentTimeMillis();
+ while (loadNextRecord() != null) {
+ batchStart = true;
+ do {
+ if (batchStart) {
+ batchStart = false;
+ } else {
+ reuse = loadNextRecord();
+ if (reuse == null) {
+ break;
+ }
+ }
+ tupleBatch.add(reuse.getTuple());
+ resetReuse();
+ } while (System.currentTimeMillis() - startTime < windowSize);
+ reducer.reduce(tupleBatch.iterator(), collector);
+ tupleBatch.clear();
+ startTime = System.currentTimeMillis();
+ }
+ } else {
+ int counter = 0;
+ while (loadNextRecord() != null) {
+ batchStart = true;
+ do {
+ if (batchStart) {
+ batchStart = false;
+ } else {
+ reuse = loadNextRecord();
+ if (reuse == null) {
+ break;
+ }
+ }
+ counter++;
+ tupleBatch.add(reuse.getTuple());
+ resetReuse();
+ } while (counter < batchSize);
+ reducer.reduce(tupleBatch.iterator(), collector);
+ tupleBatch.clear();
+ counter = 0;
+ }
+ }
+ }
+
+ private void mutableInvoke() throws Exception {
BatchIterator<IN> userIterator;
if (window) {
userIterator = new WindowIterator();
@@ -64,18 +131,6 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
} while (reuse != null);
}
- private StreamRecord<IN> loadNextRecord() {
- if (!isMutable) {
- resetReuse();
- }
- try {
- reuse = recordIterator.next(reuse);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return reuse;
- }
-
public class CounterIterator implements BatchIterator<IN> {
private int counter;
private boolean loadedNext;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 733b61e..85086f9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -36,19 +36,31 @@ public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT exte
protected MutableObjectIterator<StreamRecord<IN2>> recordIterator2;
protected StreamRecord<IN1> reuse1;
protected StreamRecord<IN2> reuse2;
+ protected StreamRecordSerializer<IN1> serializer1;
+ protected StreamRecordSerializer<IN2> serializer2;
+ protected boolean isMutable;
public void initialize(Collector<OUT> collector,
MutableObjectIterator<StreamRecord<IN1>> recordIterator1,
StreamRecordSerializer<IN1> serializer1,
MutableObjectIterator<StreamRecord<IN2>> recordIterator2,
- StreamRecordSerializer<IN2> serializer2) {
+ StreamRecordSerializer<IN2> serializer2, boolean isMutable) {
this.collector = collector;
-
+
this.recordIterator1 = recordIterator1;
this.reuse1 = serializer1.createInstance();
-
+
this.recordIterator2 = recordIterator2;
this.reuse2 = serializer2.createInstance();
+
+ this.serializer1 = serializer1;
+ this.serializer2 = serializer2;
+ this.isMutable = isMutable;
+ }
+
+ public void resetReuse() {
+ this.reuse1 = serializer1.createInstance();
+ this.reuse2 = serializer2.createInstance();
}
public abstract void invoke() throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index 4a30425..be5c42f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -35,19 +35,24 @@ public class CoMapInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tu
// TODO rework this as UnionRecordReader
@Override
public void invoke() throws Exception {
- boolean noMoreRecordOnInput1;
- boolean noMoreRecordOnInput2;
-
+ boolean noMoreRecordOnInput1 = false;
+ boolean noMoreRecordOnInput2 = false;
+
do {
noMoreRecordOnInput1 = recordIterator1.next(reuse1) == null;
if (!noMoreRecordOnInput1) {
collector.collect(mapper.map1(reuse1.getTuple()));
}
-
+
noMoreRecordOnInput2 = recordIterator2.next(reuse2) == null;
if (!noMoreRecordOnInput2) {
collector.collect(mapper.map2(reuse2.getTuple()));
}
+
+ if (!this.isMutable) {
+ resetReuse();
+ }
} while (!noMoreRecordOnInput1 && !noMoreRecordOnInput2);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 204bcfe..60a8152 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -135,7 +135,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
CoMapInvokable.class, CoInvokable.class);
userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
- inTupleSerializer2);
+ inTupleSerializer2, isMutable);
}
protected void setConfigInputs() throws StreamComponentException {