You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 23:34:44 UTC
[3/4] flink git commit: [FLINK-2724] [runtime] Fix object reuse in
GroupReduceCombineDriver and ReduceCombineDriver
[FLINK-2724] [runtime] Fix object reuse in GroupReduceCombineDriver and ReduceCombineDriver
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3809d82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3809d82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3809d82
Branch: refs/heads/master
Commit: f3809d8272e9b1f611770cdfb71fe17c3feb08e6
Parents: 435ee4e
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 21 21:27:38 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 23:18:04 2015 +0200
----------------------------------------------------------------------
.../operators/GroupReduceCombineDriver.java | 62 +++++++++++++-------
.../runtime/operators/ReduceCombineDriver.java | 56 ++++++++++++------
.../chaining/GroupCombineChainedDriver.java | 32 ++++------
3 files changed, 89 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f3809d82/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index ab46d95..c6a872c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -157,29 +157,29 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
final TypeSerializer<IN> serializer = this.serializer;
- IN value = serializer.createInstance();
-
- while (running && (value = in.next(value)) != null) {
-
- // try writing to the sorter first
- if (this.sorter.write(value)) {
- continue;
+ if (objectReuseEnabled) {
+ IN value = serializer.createInstance();
+
+ while (running && (value = in.next(value)) != null) {
+ // try writing to the sorter first
+ if (this.sorter.write(value)) {
+ continue;
+ }
+
+ // do the actual sorting, combining, and data writing
+ sortAndCombineAndRetryWrite(value);
}
-
- // do the actual sorting, combining, and data writing
- sortAndCombine();
- this.sorter.reset();
-
- // write the value again
- if (!this.sorter.write(value)) {
-
- ++oversizedRecordCount;
- LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
- "Oversized record count: {}", oversizedRecordCount);
-
- // simply forward the record. We need to pass it through the combine function to convert it
- Iterable<IN> input = Collections.singleton(value);
- this.combiner.combine(input, this.output);
+ }
+ else {
+ IN value;
+ while (running && (value = in.next()) != null) {
+ // try writing to the sorter first
+ if (this.sorter.write(value)) {
+ continue;
+ }
+
+ // do the actual sorting, combining, and data writing
+ sortAndCombineAndRetryWrite(value);
}
}
@@ -214,6 +214,24 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
}
}
}
+
+ private void sortAndCombineAndRetryWrite(IN value) throws Exception {
+ sortAndCombine();
+ this.sorter.reset();
+
+ // write the value again
+ if (!this.sorter.write(value)) {
+
+ ++oversizedRecordCount;
+ LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
+ "Oversized record count: {}", oversizedRecordCount);
+
+ // simply forward the record. We need to pass it through the combine function to convert it
+ Iterable<IN> input = Collections.singleton(value);
+ this.combiner.combine(input, this.output);
+ this.sorter.reset();
+ }
+ }
@Override
public void cleanup() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/f3809d82/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 26da0ab..f990156 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -143,22 +143,43 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
final MutableObjectIterator<T> in = this.taskContext.getInput(0);
final TypeSerializer<T> serializer = this.serializer;
- T value = serializer.createInstance();
+ if (objectReuseEnabled) {
+ T value = serializer.createInstance();
- while (running && (value = in.next(value)) != null) {
-
- // try writing to the sorter first
- if (this.sorter.write(value)) {
- continue;
+ while (running && (value = in.next(value)) != null) {
+
+ // try writing to the sorter first
+ if (this.sorter.write(value)) {
+ continue;
+ }
+
+ // do the actual sorting, combining, and data writing
+ sortAndCombine();
+ this.sorter.reset();
+
+ // write the value again
+ if (!this.sorter.write(value)) {
+ throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+ }
}
-
- // do the actual sorting, combining, and data writing
- sortAndCombine();
- this.sorter.reset();
-
- // write the value again
- if (!this.sorter.write(value)) {
- throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+ }
+ else {
+ T value;
+ while (running && (value = in.next()) != null) {
+
+ // try writing to the sorter first
+ if (this.sorter.write(value)) {
+ continue;
+ }
+
+ // do the actual sorting, combining, and data writing
+ sortAndCombine();
+ this.sorter.reset();
+
+ // write the value again
+ if (!this.sorter.write(value)) {
+ throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+ }
}
}
@@ -174,11 +195,8 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
final TypeSerializer<T> serializer = this.serializer;
final TypeComparator<T> comparator = this.comparator;
-
final ReduceFunction<T> function = this.reducer;
-
final Collector<T> output = this.output;
-
final MutableObjectIterator<T> input = sorter.getIterator();
if (objectReuseEnabled) {
@@ -214,7 +232,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
}
}
} else {
- T value = input.next(serializer.createInstance());
+ T value = input.next();
// iterate over key groups
while (this.running && value != null) {
@@ -222,7 +240,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
T res = value;
// iterate within a key group
- while ((value = input.next(serializer.createInstance())) != null) {
+ while ((value = input.next()) != null) {
if (comparator.equalToReference(value)) {
// same group, reduce
res = function.reduce(res, value);
http://git-wip-us.apache.org/repos/asf/flink/blob/f3809d82/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 08ad25b..cf0fc85 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -48,7 +48,7 @@ import java.util.List;
*
* Acts like a combiner with a custom output type OUT.
*
- * Sorting and reducing of the elements is performed invididually for each partition without data exchange. This may
+ * Sorting and reducing of the elements is performed individually for each partition without data exchange. This may
* lead to a partial group reduce.
*
* @param <IN> The data type consumed
@@ -58,9 +58,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
private static final Logger LOG = LoggerFactory.getLogger(GroupCombineChainedDriver.class);
- /**
- * Fix length records with a length below this threshold will be in-place sorted, if possible.
- */
+ /** Fix length records with a length below this threshold will be in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
// --------------------------------------------------------------------------------------------
@@ -174,23 +172,16 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
if (this.sorter.write(record)) {
return;
}
- } catch (IOException e) {
- throw new ExceptionInChainedStubException(this.taskName, e);
- }
- // do the actual sorting
- try {
+ // do the actual sorting
sortAndReduce();
- } catch (Exception e) {
- throw new ExceptionInChainedStubException(this.taskName, e);
- }
- this.sorter.reset();
-
- try {
+ this.sorter.reset();
+
if (!this.sorter.write(record)) {
throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
}
- } catch (IOException e) {
+ }
+ catch (Exception e) {
throw new ExceptionInChainedStubException(this.taskName, e);
}
}
@@ -215,9 +206,9 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
if (!sorter.isEmpty()) {
this.sortAlgo.sort(sorter);
// run the reducer
- final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
-
-
+ final ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(
+ sorter.getIterator(), this.serializer, this.groupingComparator);
+
// cache references on the stack
final GroupReduceFunction<IN, OUT> stub = this.reducer;
final Collector<OUT> output = this.outputCollector;
@@ -231,7 +222,8 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
if (!sorter.isEmpty()) {
this.sortAlgo.sort(sorter);
// run the reducer
- final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
+ final NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(
+ sorter.getIterator(), this.groupingComparator);
// cache references on the stack