You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 15:34:55 UTC

[5/5] flink git commit: [FLINK-2895] Duplicate immutable object creation

[FLINK-2895] Duplicate immutable object creation

Operators defer object creation when object reuse is disabled.

This closes #1288


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbb75c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbb75c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbb75c59

Branch: refs/heads/master
Commit: bbb75c599aba1fffa3f52b45af77ee9c7ece3ca0
Parents: d2e4a27
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Oct 22 09:31:09 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:06:29 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/operators/AllReduceDriver.java  | 8 +++-----
 .../java/org/apache/flink/runtime/operators/NoOpDriver.java  | 4 +---
 .../org/apache/flink/runtime/operators/ReduceDriver.java     | 4 ++--
 3 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 06f22c5..1d35fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -108,7 +108,6 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 		final MutableObjectIterator<T> input = this.input;
 		final TypeSerializer<T> serializer = this.serializer;
 
-
 		if (objectReuseEnabled) {
 			T val1 = serializer.createInstance();
 
@@ -123,14 +122,13 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 
 			this.taskContext.getOutputCollector().collect(val1);
 		} else {
-			T val1 = serializer.createInstance();
-
-			if ((val1 = input.next(val1)) == null) {
+			T val1;
+			if ((val1 = input.next()) == null) {
 				return;
 			}
 
 			T val2;
-			while (running && (val2 = input.next(serializer.createInstance())) != null) {
+			while (running && (val2 = input.next()) != null) {
 				val1 = stub.reduce(val1, val2);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index fcd2716..428cfe4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -87,8 +86,7 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
 			}
 		} else {
 			T record;
-			TypeSerializer<T> serializer = this.taskContext.<T>getInputSerializer(0).getSerializer();
-			while (this.running && ((record = input.next(serializer.createInstance())) != null)) {
+			while (this.running && ((record = input.next()) != null)) {
 				output.collect(record);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 970441e..6a7c42c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -148,7 +148,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 				}
 			}
 		} else {
-			T value = input.next(serializer.createInstance());
+			T value = input.next();
 
 			// iterate over key groups
 			while (this.running && value != null) {
@@ -156,7 +156,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 				T res = value;
 
 				// iterate within a key group
-				while ((value = input.next(serializer.createInstance())) != null) {
+				while ((value = input.next()) != null) {
 					if (comparator.equalToReference(value)) {
 						// same group, reduce
 						res = function.reduce(res, value);