You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 15:38:32 UTC
[3/3] flink git commit: [FLINK-2659] [runtime] Fix object reuse in
UnionWithTempOperator
[FLINK-2659] [runtime] Fix object reuse in UnionWithTempOperator
This closes #1130
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad09721f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad09721f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad09721f
Branch: refs/heads/release-0.9
Commit: ad09721fbf40e1136009a81db3f8a458d5d4a6ac
Parents: 43e23ba
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Sep 14 11:29:49 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 17 11:58:10 2015 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/operators/UnionWithTempOperator.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad09721f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index d8437a9..098686c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -60,15 +60,16 @@ public class UnionWithTempOperator<T> implements PactDriver<Function, T> {
public void run() throws Exception {
final Collector<T> output = this.taskContext.getOutputCollector();
- T record = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
+ T reuse = this.taskContext.<T>getInputSerializer(STREAMED_INPUT).getSerializer().createInstance();
+ T record;
final MutableObjectIterator<T> input = this.taskContext.getInput(STREAMED_INPUT);
- while (this.running && ((record = input.next(record)) != null)) {
+ while (this.running && ((record = input.next(reuse)) != null)) {
output.collect(record);
}
final MutableObjectIterator<T> cache = this.taskContext.getInput(CACHED_INPUT);
- while (this.running && ((record = cache.next(record)) != null)) {
+ while (this.running && ((record = cache.next(reuse)) != null)) {
output.collect(record);
}
}