You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 23:34:44 UTC

[3/4] flink git commit: [FLINK-2724] [runtime] Fix object reuse in GroupReduceCombineDriver and ReduceCombineDriver

[FLINK-2724] [runtime] Fix object reuse in GroupReduceCombineDriver and ReduceCombineDriver


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3809d82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3809d82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3809d82

Branch: refs/heads/master
Commit: f3809d8272e9b1f611770cdfb71fe17c3feb08e6
Parents: 435ee4e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 21 21:27:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 23:18:04 2015 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 62 +++++++++++++-------
 .../runtime/operators/ReduceCombineDriver.java  | 56 ++++++++++++------
 .../chaining/GroupCombineChainedDriver.java     | 32 ++++------
 3 files changed, 89 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3809d82/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index ab46d95..c6a872c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -157,29 +157,29 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 		final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
 		final TypeSerializer<IN> serializer = this.serializer;
 
-		IN value = serializer.createInstance();
-
-		while (running && (value = in.next(value)) != null) {
-
-			// try writing to the sorter first
-			if (this.sorter.write(value)) {
-				continue;
+		if (objectReuseEnabled) {
+			IN value = serializer.createInstance();
+	
+			while (running && (value = in.next(value)) != null) {
+				// try writing to the sorter first
+				if (this.sorter.write(value)) {
+					continue;
+				}
+	
+				// do the actual sorting, combining, and data writing
+				sortAndCombineAndRetryWrite(value);
 			}
-
-			// do the actual sorting, combining, and data writing
-			sortAndCombine();
-			this.sorter.reset();
-
-			// write the value again
-			if (!this.sorter.write(value)) {
-				
-				++oversizedRecordCount;
-				LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
-								"Oversized record count: {}", oversizedRecordCount);
-				
-				// simply forward the record. We need to pass it through the combine function to convert it
-				Iterable<IN> input = Collections.singleton(value);
-				this.combiner.combine(input, this.output);
+		}
+		else {
+			IN value;
+			while (running && (value = in.next()) != null) {
+				// try writing to the sorter first
+				if (this.sorter.write(value)) {
+					continue;
+				}
+
+				// do the actual sorting, combining, and data writing
+				sortAndCombineAndRetryWrite(value);
 			}
 		}
 
@@ -214,6 +214,24 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 			}
 		}
 	}
+	
+	private void sortAndCombineAndRetryWrite(IN value) throws Exception {
+		sortAndCombine();
+		this.sorter.reset();
+
+		// write the value again
+		if (!this.sorter.write(value)) {
+
+			++oversizedRecordCount;
+			LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
+					"Oversized record count: {}", oversizedRecordCount);
+
+			// simply forward the record. We need to pass it through the combine function to convert it
+			Iterable<IN> input = Collections.singleton(value);
+			this.combiner.combine(input, this.output);
+			this.sorter.reset();
+		}
+	}
 
 	@Override
 	public void cleanup() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/f3809d82/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 26da0ab..f990156 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -143,22 +143,43 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
 		final TypeSerializer<T> serializer = this.serializer;
 		
-		T value = serializer.createInstance();
+		if (objectReuseEnabled) {
+			T value = serializer.createInstance();
 		
-		while (running && (value = in.next(value)) != null) {
-			
-			// try writing to the sorter first
-			if (this.sorter.write(value)) {
-				continue;
+			while (running && (value = in.next(value)) != null) {
+				
+				// try writing to the sorter first
+				if (this.sorter.write(value)) {
+					continue;
+				}
+		
+				// do the actual sorting, combining, and data writing
+				sortAndCombine();
+				this.sorter.reset();
+				
+				// write the value again
+				if (!this.sorter.write(value)) {
+					throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+				}
 			}
-	
-			// do the actual sorting, combining, and data writing
-			sortAndCombine();
-			this.sorter.reset();
-			
-			// write the value again
-			if (!this.sorter.write(value)) {
-				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+		}
+		else {
+			T value;
+			while (running && (value = in.next()) != null) {
+
+				// try writing to the sorter first
+				if (this.sorter.write(value)) {
+					continue;
+				}
+
+				// do the actual sorting, combining, and data writing
+				sortAndCombine();
+				this.sorter.reset();
+
+				// write the value again
+				if (!this.sorter.write(value)) {
+					throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+				}
 			}
 		}
 		
@@ -174,11 +195,8 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 			
 			final TypeSerializer<T> serializer = this.serializer;
 			final TypeComparator<T> comparator = this.comparator;
-			
 			final ReduceFunction<T> function = this.reducer;
-			
 			final Collector<T> output = this.output;
-			
 			final MutableObjectIterator<T> input = sorter.getIterator();
 
 			if (objectReuseEnabled) {
@@ -214,7 +232,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 					}
 				}
 			} else {
-				T value = input.next(serializer.createInstance());
+				T value = input.next();
 
 				// iterate over key groups
 				while (this.running && value != null) {
@@ -222,7 +240,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 					T res = value;
 
 					// iterate within a key group
-					while ((value = input.next(serializer.createInstance())) != null) {
+					while ((value = input.next()) != null) {
 						if (comparator.equalToReference(value)) {
 							// same group, reduce
 							res = function.reduce(res, value);

http://git-wip-us.apache.org/repos/asf/flink/blob/f3809d82/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 08ad25b..cf0fc85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -48,7 +48,7 @@ import java.util.List;
  * 
  * Acts like a combiner with a custom output type OUT.
  *
- * Sorting and reducing of the elements is performed invididually for each partition without data exchange. This may
+ * Sorting and reducing of the elements is performed individually for each partition without data exchange. This may
  * lead to a partial group reduce.
  *  
  * @param <IN> The data type consumed
@@ -58,9 +58,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(GroupCombineChainedDriver.class);
 
-	/**
-	 * Fix length records with a length below this threshold will be in-place sorted, if possible.
-	 */
+	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
 	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 
 	// --------------------------------------------------------------------------------------------
@@ -174,23 +172,16 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 			if (this.sorter.write(record)) {
 				return;
 			}
-		} catch (IOException e) {
-			throw new ExceptionInChainedStubException(this.taskName, e);
-		}
 
-		// do the actual sorting
-		try {
+			// do the actual sorting
 			sortAndReduce();
-		} catch (Exception e) {
-			throw new ExceptionInChainedStubException(this.taskName, e);
-		}
-		this.sorter.reset();
-
-		try {
+			this.sorter.reset();
+			
 			if (!this.sorter.write(record)) {
 				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
 			}
-		} catch (IOException e) {
+		}
+		catch (Exception e) {
 			throw new ExceptionInChainedStubException(this.taskName, e);
 		}
 	}
@@ -215,9 +206,9 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 				// run the reducer
-				final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
-
-
+				final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(
+						sorter.getIterator(), this.serializer, this.groupingComparator);
+				
 				// cache references on the stack
 				final GroupReduceFunction<IN, OUT> stub = this.reducer;
 				final Collector<OUT> output = this.outputCollector;
@@ -231,7 +222,8 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 			if (!sorter.isEmpty()) {
 				this.sortAlgo.sort(sorter);
 				// run the reducer
-				final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
+				final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(
+						sorter.getIterator(), this.groupingComparator);
 
 
 				// cache references on the stack