You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 15:34:55 UTC
[5/5] flink git commit: [FLINK-2895] Duplicate immutable object
creation
[FLINK-2895] Duplicate immutable object creation
Operators defer object creation when object reuse is disabled.
This closes #1288
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbb75c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbb75c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbb75c59
Branch: refs/heads/master
Commit: bbb75c599aba1fffa3f52b45af77ee9c7ece3ca0
Parents: d2e4a27
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Oct 22 09:31:09 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:06:29 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/runtime/operators/AllReduceDriver.java | 8 +++-----
.../java/org/apache/flink/runtime/operators/NoOpDriver.java | 4 +---
.../org/apache/flink/runtime/operators/ReduceDriver.java | 4 ++--
3 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 06f22c5..1d35fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -108,7 +108,6 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
final MutableObjectIterator<T> input = this.input;
final TypeSerializer<T> serializer = this.serializer;
-
if (objectReuseEnabled) {
T val1 = serializer.createInstance();
@@ -123,14 +122,13 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
this.taskContext.getOutputCollector().collect(val1);
} else {
- T val1 = serializer.createInstance();
-
- if ((val1 = input.next(val1)) == null) {
+ T val1;
+ if ((val1 = input.next()) == null) {
return;
}
T val2;
- while (running && (val2 = input.next(serializer.createInstance())) != null) {
+ while (running && (val2 = input.next()) != null) {
val1 = stub.reduce(val1, val2);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index fcd2716..428cfe4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
@@ -87,8 +86,7 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
}
} else {
T record;
- TypeSerializer<T> serializer = this.taskContext.<T>getInputSerializer(0).getSerializer();
- while (this.running && ((record = input.next(serializer.createInstance())) != null)) {
+ while (this.running && ((record = input.next()) != null)) {
output.collect(record);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 970441e..6a7c42c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -148,7 +148,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
}
}
} else {
- T value = input.next(serializer.createInstance());
+ T value = input.next();
// iterate over key groups
while (this.running && value != null) {
@@ -156,7 +156,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
T res = value;
// iterate within a key group
- while ((value = input.next(serializer.createInstance())) != null) {
+ while ((value = input.next()) != null) {
if (comparator.equalToReference(value)) {
// same group, reduce
res = function.reduce(res, value);