You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/29 14:08:38 UTC
[7/8] flink git commit: [FLINK-2653] [runtime] Enable object reuse in
MergeIterator
[FLINK-2653] [runtime] Enable object reuse in MergeIterator
This closes #1115
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a8df6d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a8df6d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a8df6d5
Branch: refs/heads/master
Commit: 0a8df6d513fa59d650ff875bdf3a1613d0f14af5
Parents: 6891212
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 10 09:35:39 2015 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 29 12:21:34 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/operators/DataSinkTask.java | 3 +-
.../runtime/operators/RegularPactTask.java | 4 +-
.../sort/CombiningUnilateralSortMerger.java | 9 ++--
.../operators/sort/LargeRecordHandler.java | 3 +-
.../runtime/operators/sort/MergeIterator.java | 47 +++++++++++++++++---
.../operators/sort/UnilateralSortMerger.java | 47 ++++++++++++++------
.../operators/ReduceTaskExternalITCase.java | 4 +-
.../flink/runtime/operators/ReduceTaskTest.java | 2 +-
.../CombiningUnilateralSortMergerITCase.java | 6 +--
.../operators/sort/ExternalSortITCase.java | 10 ++---
.../sort/ExternalSortLargeRecordsITCase.java | 8 ++--
.../testutils/BinaryOperatorTestBase.java | 3 +-
.../operators/testutils/DriverTestBase.java | 2 +-
.../testutils/UnaryOperatorTestBase.java | 2 +-
.../operators/util/HashVsSortMiniBenchmark.java | 4 +-
.../flink/tez/runtime/DataSinkProcessor.java | 2 +-
.../org/apache/flink/tez/runtime/TezTask.java | 4 +-
.../flink/test/manual/MassiveStringSorting.java | 4 +-
.../test/manual/MassiveStringValueSorting.java | 4 +-
.../manual/MassiveCaseClassSortingITCase.scala | 2 +-
20 files changed, 114 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 39a0a28..1002bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -164,7 +164,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
getEnvironment().getIOManager(),
this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(),
this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
- this.config.getSpillingThresholdInput(0));
+ this.config.getSpillingThresholdInput(0),
+ this.getExecutionConfig().isObjectReuseEnabled());
this.localStrategy = sorter;
input1 = sorter.getIterator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 6d35f92..89963af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -923,7 +923,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
- this.config.getSpillingThresholdInput(inputNum));
+ this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
// set the input to null such that it will be lazily fetched from the input strategy
this.inputs[inputNum] = null;
this.localStrategies[inputNum] = sorter;
@@ -959,7 +959,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
- this.config.getSpillingThresholdInput(inputNum));
+ this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
cSorter.setUdfConfiguration(this.config.getStubParameters());
// set the input to null such that it will be lazily fetched from the input strategy
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index f662a7e..855ee21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -103,11 +103,11 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
- double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
+ double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
- memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
+ memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled);
}
/**
@@ -136,11 +136,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
- float startSpillingFraction)
+ float startSpillingFraction, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
- memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true);
+ memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true,
+ objectReuseEnabled);
this.combineStub = combineStub;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
index e4a99fb..518f44c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -254,7 +254,8 @@ public class LargeRecordHandler<T> {
InputViewIterator<Tuple> keyIterator = new InputViewIterator<Tuple>(keysReader, keySerializer);
keySorter = new UnilateralSortMerger<Tuple>(memManager, memory, ioManager,
- keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false);
+ keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false,
+ this.executionConfig.isObjectReuseEnabled());
// wait for the sorter to sort the keys
MutableObjectIterator<Tuple> result;
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 9da429d..0792dbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -55,18 +55,43 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
/**
* Gets the next smallest element, with respect to the definition of order implied by
- * the {@link TypeSerializer} provided to this iterator. This method does in fact not
- * reuse the given element (which would here imply potentially expensive copying),
- * but always returns a new element.
+ * the {@link TypeSerializer} provided to this iterator.
*
- * @param reuse Ignored.
- * @return The next smallest element, or null, if the iterator is exhausted.
+ * @param reuse Object that may be reused.
+ * @return The next element if the iterator has another element, null otherwise.
*
* @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object)
*/
@Override
public E next(E reuse) throws IOException {
- return next();
+ /* There are three ways to handle object reuse:
+ * 1) reuse and return the given object
+ * 2) ignore the given object and return a new object
+ * 3) exchange the given object for an existing object
+ *
+ * The first option is not available here as the return value has
+ * already been deserialized from the heap's top iterator. The second
+ * option avoids object reuse. The third option is implemented below
+ * by passing the given object to the heap's top iterator into which
+ * the next value will be deserialized.
+ */
+
+ if (this.heap.size() > 0) {
+ // get the smallest element
+ final HeadStream<E> top = this.heap.peek();
+ E result = top.getHead();
+
+ // read an element
+ if (!top.nextHead(reuse)) {
+ this.heap.poll();
+ } else {
+ this.heap.adjustTop();
+ }
+ return result;
+ }
+ else {
+ return null;
+ }
}
/**
@@ -122,6 +147,16 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
return this.head;
}
+ public boolean nextHead(E reuse) throws IOException {
+ if ((this.head = this.iterator.next(reuse)) != null) {
+ this.comparator.setReference(this.head);
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
public boolean nextHead() throws IOException {
if ((this.head = this.iterator.next()) != null) {
this.comparator.setReference(this.head);
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 13159d9..0fa24f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -146,6 +146,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
*/
protected volatile boolean closed;
+ /**
+ * Whether to reuse objects during deserialization.
+ */
+ protected final boolean objectReuseEnabled;
+
// ------------------------------------------------------------------------
// Constructor & Shutdown
// ------------------------------------------------------------------------
@@ -153,22 +158,24 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
- double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
+ double memoryFraction, int maxNumFileHandles, float startSpillingFraction,
+ boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
- memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
+ memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled);
}
public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
- float startSpillingFraction)
+ float startSpillingFraction, boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
- memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true);
+ memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true,
+ objectReuseEnabled);
}
public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory,
@@ -176,11 +183,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
int numSortBuffers, int maxNumFileHandles,
- float startSpillingFraction, boolean handleLargeRecords)
+ float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled)
throws IOException
{
this(memoryManager, memory, ioManager, input, parentTask, serializerFactory, comparator,
- numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords);
+ numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords,
+ objectReuseEnabled);
}
protected UnilateralSortMerger(MemoryManager memoryManager,
@@ -188,12 +196,14 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
double memoryFraction, int numSortBuffers, int maxNumFileHandles,
- float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords)
+ float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords,
+ boolean objectReuseEnabled)
throws IOException, MemoryAllocationException
{
this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)),
ioManager, input, parentTask, serializerFactory, comparator,
- numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true);
+ numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true,
+ objectReuseEnabled);
}
protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory,
@@ -201,7 +211,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
MutableObjectIterator<E> input, AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
int numSortBuffers, int maxNumFileHandles,
- float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords)
+ float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords,
+ boolean objectReuseEnabled)
throws IOException
{
// sanity checks
@@ -216,7 +227,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
}
this.memoryManager = memoryManager;
-
+ this.objectReuseEnabled = objectReuseEnabled;
+
// adjust the memory quotas to the page size
final int numPagesTotal = memory.size();
@@ -1595,10 +1607,17 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
this.memManager.getPageSize());
// read the merged stream and write the data back
- final TypeSerializer<E> serializer = this.serializer;
- E rec = serializer.createInstance();
- while ((rec = mergeIterator.next(rec)) != null) {
- serializer.serialize(rec, output);
+ if (objectReuseEnabled) {
+ final TypeSerializer<E> serializer = this.serializer;
+ E rec = serializer.createInstance();
+ while ((rec = mergeIterator.next(rec)) != null) {
+ serializer.serialize(rec, output);
+ }
+ } else {
+ E rec;
+ while ((rec = mergeIterator.next()) != null) {
+ serializer.serialize(rec, output);
+ }
}
output.close();
final int numBlocksWritten = output.getBlockCount();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index d83e92e..f59c4a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -134,7 +134,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
this.perSortFractionMem,
- 2, 0.8f);
+ 2, 0.8f, true);
addInput(sorter.getIterator());
GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
@@ -180,7 +180,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
this.perSortFractionMem,
- 2, 0.8f);
+ 2, 0.8f, false);
addInput(sorter.getIterator());
GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 964f646..cc25c99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -128,7 +128,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
- 4, 0.8f);
+ 4, 0.8f, true);
addInput(sorter.getIterator());
GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 75593b8..e1e2c0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -115,7 +115,7 @@ public class CombiningUnilateralSortMergerITCase {
Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
- 0.25, 64, 0.7f);
+ 0.25, 64, 0.7f, false);
final Record rec = new Record();
rec.setField(1, new IntValue(1));
@@ -156,7 +156,7 @@ public class CombiningUnilateralSortMergerITCase {
Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
- 0.01, 64, 0.005f);
+ 0.01, 64, 0.005f, true);
final Record rec = new Record();
rec.setField(1, new IntValue(1));
@@ -205,7 +205,7 @@ public class CombiningUnilateralSortMergerITCase {
Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb,
this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
- 0.25, 2, 0.7f);
+ 0.25, 2, 0.7f, false);
// emit data
LOG.debug("emitting data");
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 5aa9efb..9f0b3d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -119,7 +119,7 @@ public class ExternalSortITCase {
Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
- (double)64/78, 2, 0.9f);
+ (double)64/78, 2, 0.9f, true);
// emit data
LOG.debug("Reading and sorting data...");
@@ -172,7 +172,7 @@ public class ExternalSortITCase {
Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
- (double)64/78, 10, 2, 0.9f);
+ (double)64/78, 10, 2, 0.9f, false);
// emit data
LOG.debug("Reading and sorting data...");
@@ -225,7 +225,7 @@ public class ExternalSortITCase {
Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
- (double)16/78, 64, 0.7f);
+ (double)16/78, 64, 0.7f, true);
// emit data
LOG.debug("Reading and sorting data...");
@@ -281,7 +281,7 @@ public class ExternalSortITCase {
Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
- (double)64/78, 16, 0.7f);
+ (double)64/78, 16, 0.7f, false);
// emit data
LOG.debug("Emitting data...");
@@ -341,7 +341,7 @@ public class ExternalSortITCase {
LOG.debug("Initializing sortmerger...");
Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager,
- generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f);
+ generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f, true);
// emit data
LOG.debug("Emitting data...");
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 951ce30..c806766 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -128,7 +128,7 @@ public class ExternalSortLargeRecordsITCase {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
- comparator, 1.0, 1, 128, 0.7f);
+ comparator, 1.0, 1, 128, 0.7f, false);
// check order
MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
@@ -198,7 +198,7 @@ public class ExternalSortLargeRecordsITCase {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
- comparator, 1.0, 1, 128, 0.7f);
+ comparator, 1.0, 1, 128, 0.7f, true);
// check order
MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
@@ -283,7 +283,7 @@ public class ExternalSortLargeRecordsITCase {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
- comparator, 1.0, 1, 128, 0.7f);
+ comparator, 1.0, 1, 128, 0.7f, false);
// check order
MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
@@ -354,7 +354,7 @@ public class ExternalSortLargeRecordsITCase {
this.memoryManager, this.ioManager,
source, this.parentTask,
new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
- comparator, 1.0, 1, 128, 0.7f);
+ comparator, 1.0, 1, 128, 0.7f, true);
// check order
MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index ece20ff..5136aea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -147,7 +147,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
comp,
this.perSortFractionMem,
32,
- 0.8f
+ 0.8f,
+ false
);
this.sorters.add(sorter);
this.inputs.add(null);
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 1737349..116fdec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -142,7 +142,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
public void addInputSorted(MutableObjectIterator<Record> input, RecordComparator comp) throws Exception {
UnilateralSortMerger<Record> sorter = new UnilateralSortMerger<Record>(
this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp,
- this.perSortFractionMem, 32, 0.8f);
+ this.perSortFractionMem, 32, 0.8f, true);
this.sorters.add(sorter);
this.inputs.add(null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 924a16b..e2b2430 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -146,7 +146,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
this.memManager, this.ioManager, input, this.owner,
this.<IN>getInputSerializer(0),
comp,
- this.perSortFractionMem, 32, 0.8f);
+ this.perSortFractionMem, 32, 0.8f, false);
}
public void addDriverComparator(TypeComparator<IN> comparator) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 1060e55..f112ff8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -133,11 +133,11 @@ public class HashVsSortMiniBenchmark {
final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>(
this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1,
- this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f);
+ this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>(
this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2,
- this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f);
+ this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator();
final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
index 01dbbc5..8011d21 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
@@ -146,7 +146,7 @@ public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor {
this.runtimeEnvironment.getIOManager(),
this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(),
this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
- this.config.getSpillingThresholdInput(0));
+ this.config.getSpillingThresholdInput(0), false);
this.localStrategy = sorter;
this.input = sorter.getIterator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
index a745177..b7cbfb4 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -378,7 +378,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> {
UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
- this.config.getSpillingThresholdInput(inputNum));
+ this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
// set the input to null such that it will be lazily fetched from the input strategy
this.inputs[inputNum] = null;
this.localStrategies[inputNum] = sorter;
@@ -414,7 +414,7 @@ public class TezTask<S extends Function,OT> implements PactTaskContext<S, OT> {
(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
- this.config.getSpillingThresholdInput(inputNum));
+ this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
cSorter.setUdfConfiguration(this.config.getStubParameters());
// set the input to null such that it will be lazily fetched from the input strategy
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index c11b93c..c9bd56b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -91,7 +91,7 @@ public class MassiveStringSorting {
MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader);
sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(),
- new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f);
+ new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f, false);
MutableObjectIterator<String> sortedData = sorter.getIterator();
@@ -184,7 +184,7 @@ public class MassiveStringSorting {
MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader);
sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
- new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+ new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 7a484e7..9a016cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -91,7 +91,7 @@ public class MassiveStringValueSorting {
MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader);
sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(),
- new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f);
+ new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f, true);
MutableObjectIterator<StringValue> sortedData = sorter.getIterator();
@@ -187,7 +187,7 @@ public class MassiveStringValueSorting {
MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader);
sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
- new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+ new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
index 7385fa2..a38a19b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
@@ -98,7 +98,7 @@ class MassiveCaseClassSortingITCase {
sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
new DummyInvokable(),
new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
- comparator, 1.0, 4, 0.8f)
+ comparator, 1.0, 4, 0.8f, false)
val sortedData = sorter.getIterator
reader.close()