You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:47 UTC

[10/51] [abbrv] git commit: [streaming] Updated operators for better mutability handling

[streaming] Updated operators for better mutability handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/126a1cb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/126a1cb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/126a1cb7

Branch: refs/heads/master
Commit: 126a1cb7f02856feb11e653c74fd0b92880baa8f
Parents: 1edd031
Author: gyfora <gy...@gmail.com>
Authored: Wed Jul 23 12:44:29 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:19:17 2014 +0200

----------------------------------------------------------------------
 .../operator/BatchReduceInvokable.java          | 79 +++++++++++++++++---
 .../api/invokable/operator/co/CoInvokable.java  | 18 ++++-
 .../invokable/operator/co/CoMapInvokable.java   | 13 +++-
 .../api/streamcomponent/CoStreamTask.java       |  2 +-
 4 files changed, 92 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
index 2d2d890..7684f70 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceInvokable.java
@@ -20,6 +20,8 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.flink.api.java.functions.GroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
@@ -47,8 +49,73 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 		this.window = true;
 	}
 
+	private StreamRecord<IN> loadNextRecord() {
+		try {
+			reuse = recordIterator.next(reuse);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return reuse;
+	}
+
 	@Override
 	public void invoke() throws Exception {
+		if (this.isMutable) {
+			mutableInvoke();
+		} else {
+			immutableInvoke();
+		}
+	}
+
+	private void immutableInvoke() throws Exception {
+		List<IN> tupleBatch = new ArrayList<IN>();
+		boolean batchStart;
+
+		if (window) {
+			long startTime = System.currentTimeMillis();
+			while (loadNextRecord() != null) {
+				batchStart = true;
+				do {
+					if (batchStart) {
+						batchStart = false;
+					} else {
+						reuse = loadNextRecord();
+						if (reuse == null) {
+							break;
+						}
+					}
+					tupleBatch.add(reuse.getTuple());
+					resetReuse();
+				} while (System.currentTimeMillis() - startTime < windowSize);
+				reducer.reduce(tupleBatch.iterator(), collector);
+				tupleBatch.clear();
+				startTime = System.currentTimeMillis();
+			}
+		} else {
+			int counter = 0;
+			while (loadNextRecord() != null) {
+				batchStart = true;
+				do {
+					if (batchStart) {
+						batchStart = false;
+					} else {
+						reuse = loadNextRecord();
+						if (reuse == null) {
+							break;
+						}
+					}
+					counter++;
+					tupleBatch.add(reuse.getTuple());
+					resetReuse();
+				} while (counter < batchSize);
+				reducer.reduce(tupleBatch.iterator(), collector);
+				tupleBatch.clear();
+				counter = 0;
+			}
+		}
+	}
+
+	private void mutableInvoke() throws Exception {
 		BatchIterator<IN> userIterator;
 		if (window) {
 			userIterator = new WindowIterator();
@@ -64,18 +131,6 @@ public class BatchReduceInvokable<IN extends Tuple, OUT extends Tuple> extends
 		} while (reuse != null);
 	}
 
-	private StreamRecord<IN> loadNextRecord() {
-		if (!isMutable) {
-			resetReuse();
-		}
-		try {
-			reuse = recordIterator.next(reuse);
-		} catch (IOException e) {
-			e.printStackTrace();
-		}
-		return reuse;
-	}
-
 	public class CounterIterator implements BatchIterator<IN> {
 		private int counter;
 		private boolean loadedNext;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 733b61e..85086f9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -36,19 +36,31 @@ public abstract class CoInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT exte
 	protected MutableObjectIterator<StreamRecord<IN2>> recordIterator2;
 	protected StreamRecord<IN1> reuse1;
 	protected StreamRecord<IN2> reuse2;
+	protected StreamRecordSerializer<IN1> serializer1;
+	protected StreamRecordSerializer<IN2> serializer2;
+	protected boolean isMutable;
 
 	public void initialize(Collector<OUT> collector,
 			MutableObjectIterator<StreamRecord<IN1>> recordIterator1,
 			StreamRecordSerializer<IN1> serializer1,
 			MutableObjectIterator<StreamRecord<IN2>> recordIterator2,
-			StreamRecordSerializer<IN2> serializer2) {
+			StreamRecordSerializer<IN2> serializer2, boolean isMutable) {
 		this.collector = collector;
-	
+
 		this.recordIterator1 = recordIterator1;
 		this.reuse1 = serializer1.createInstance();
-		
+
 		this.recordIterator2 = recordIterator2;
 		this.reuse2 = serializer2.createInstance();
+
+		this.serializer1 = serializer1;
+		this.serializer2 = serializer2;
+		this.isMutable = isMutable;
+	}
+
+	public void resetReuse() {
+		this.reuse1 = serializer1.createInstance();
+		this.reuse2 = serializer2.createInstance();
 	}
 
 	public abstract void invoke() throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index 4a30425..be5c42f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -35,19 +35,24 @@ public class CoMapInvokable<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tu
 	// TODO rework this as UnionRecordReader
 	@Override
 	public void invoke() throws Exception {
-		boolean noMoreRecordOnInput1;
-		boolean noMoreRecordOnInput2;
-		
+		boolean noMoreRecordOnInput1 = false;
+		boolean noMoreRecordOnInput2 = false;
+
 		do {
 			noMoreRecordOnInput1 = recordIterator1.next(reuse1) == null;
 			if (!noMoreRecordOnInput1) {
 				collector.collect(mapper.map1(reuse1.getTuple()));
 			}
-			
+
 			noMoreRecordOnInput2 = recordIterator2.next(reuse2) == null;
 			if (!noMoreRecordOnInput2) {
 				collector.collect(mapper.map2(reuse2.getTuple()));
 			}
+
+			if (!this.isMutable) {
+				resetReuse();
+			}
 		} while (!noMoreRecordOnInput1 && !noMoreRecordOnInput2);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/126a1cb7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
index 204bcfe..60a8152 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/CoStreamTask.java
@@ -135,7 +135,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
 				CoMapInvokable.class, CoInvokable.class);
 		userFunction = (CoInvokable<IN1, IN2, OUT>) getInvokable(userFunctionClass);
 		userFunction.initialize(collector, inputIter1, inTupleSerializer1, inputIter2,
-				inTupleSerializer2);
+				inTupleSerializer2, isMutable);
 	}
 
 	protected void setConfigInputs() throws StreamComponentException {