You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/29 14:08:38 UTC

[7/8] flink git commit: [FLINK-2653] [runtime] Enable object reuse in MergeIterator

[FLINK-2653] [runtime] Enable object reuse in MergeIterator

This closes #1115


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a8df6d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a8df6d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a8df6d5

Branch: refs/heads/master
Commit: 0a8df6d513fa59d650ff875bdf3a1613d0f14af5
Parents: 6891212
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Sep 10 09:35:39 2015 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 29 12:21:34 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/DataSinkTask.java   |  3 +-
 .../runtime/operators/RegularPactTask.java      |  4 +-
 .../sort/CombiningUnilateralSortMerger.java     |  9 ++--
 .../operators/sort/LargeRecordHandler.java      |  3 +-
 .../runtime/operators/sort/MergeIterator.java   | 47 +++++++++++++++++---
 .../operators/sort/UnilateralSortMerger.java    | 47 ++++++++++++++------
 .../operators/ReduceTaskExternalITCase.java     |  4 +-
 .../flink/runtime/operators/ReduceTaskTest.java |  2 +-
 .../CombiningUnilateralSortMergerITCase.java    |  6 +--
 .../operators/sort/ExternalSortITCase.java      | 10 ++---
 .../sort/ExternalSortLargeRecordsITCase.java    |  8 ++--
 .../testutils/BinaryOperatorTestBase.java       |  3 +-
 .../operators/testutils/DriverTestBase.java     |  2 +-
 .../testutils/UnaryOperatorTestBase.java        |  2 +-
 .../operators/util/HashVsSortMiniBenchmark.java |  4 +-
 .../flink/tez/runtime/DataSinkProcessor.java    |  2 +-
 .../org/apache/flink/tez/runtime/TezTask.java   |  4 +-
 .../flink/test/manual/MassiveStringSorting.java |  4 +-
 .../test/manual/MassiveStringValueSorting.java  |  4 +-
 .../manual/MassiveCaseClassSortingITCase.scala  |  2 +-
 20 files changed, 114 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 39a0a28..1002bae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -164,7 +164,8 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 							getEnvironment().getIOManager(),
 							this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(),
 							this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
-							this.config.getSpillingThresholdInput(0));
+							this.config.getSpillingThresholdInput(0),
+							this.getExecutionConfig().isObjectReuseEnabled());
 					
 					this.localStrategy = sorter;
 					input1 = sorter.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 6d35f92..89963af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -923,7 +923,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 				UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
 					this.inputIterators[inputNum], this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
 					this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-					this.config.getSpillingThresholdInput(inputNum));
+					this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
 				// set the input to null such that it will be lazily fetched from the input strategy
 				this.inputs[inputNum] = null;
 				this.localStrategies[inputNum] = sorter;
@@ -959,7 +959,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 					(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
 					this, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
 					this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-					this.config.getSpillingThresholdInput(inputNum));
+					this.config.getSpillingThresholdInput(inputNum), this.getExecutionConfig().isObjectReuseEnabled());
 				cSorter.setUdfConfiguration(this.config.getStubParameters());
 
 				// set the input to null such that it will be lazily fetched from the input strategy

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
index f662a7e..855ee21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMerger.java
@@ -103,11 +103,11 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 	public CombiningUnilateralSortMerger(GroupCombineFunction<E, E> combineStub, MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
+			double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled)
 	throws IOException, MemoryAllocationException
 	{
 		this(combineStub, memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
+			memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled);
 	}
 	
 	/**
@@ -136,11 +136,12 @@ public class CombiningUnilateralSortMerger<E> extends UnilateralSortMerger<E> {
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
-			float startSpillingFraction)
+			float startSpillingFraction, boolean objectReuseEnabled)
 	throws IOException, MemoryAllocationException
 	{
 		super(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true);
+			memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true,
+			objectReuseEnabled);
 		
 		this.combineStub = combineStub;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
index e4a99fb..518f44c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java
@@ -254,7 +254,8 @@ public class LargeRecordHandler<T> {
 		InputViewIterator<Tuple> keyIterator = new InputViewIterator<Tuple>(keysReader, keySerializer);
 		
 		keySorter = new UnilateralSortMerger<Tuple>(memManager, memory, ioManager, 
-				keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false);
+				keyIterator, memoryOwner, keySerializerFactory, keyComparator, 1, maxFilehandles, 1.0f, false,
+				this.executionConfig.isObjectReuseEnabled());
 
 		// wait for the sorter to sort the keys
 		MutableObjectIterator<Tuple> result;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
index 9da429d..0792dbf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/MergeIterator.java
@@ -55,18 +55,43 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 
 	/**
 	 * Gets the next smallest element, with respect to the definition of order implied by
-	 * the {@link TypeSerializer} provided to this iterator. This method does in fact not
-	 * reuse the given element (which would here imply potentially expensive copying), 
-	 * but always returns a new element.
+	 * the {@link TypeSerializer} provided to this iterator.
 	 * 
-	 * @param reuse Ignored.
-	 * @return The next smallest element, or null, if the iterator is exhausted. 
+	 * @param reuse Object that may be reused.
+	 * @return The next element if the iterator has another element, null otherwise.
 	 * 
 	 * @see org.apache.flink.util.MutableObjectIterator#next(java.lang.Object)
 	 */
 	@Override
 	public E next(E reuse) throws IOException {
-		return next();
+		/* There are three ways to handle object reuse:
+		 * 1) reuse and return the given object
+		 * 2) ignore the given object and return a new object
+		 * 3) exchange the given object for an existing object
+		 *
+		 * The first option is not available here as the return value has
+		 * already been deserialized from the heap's top iterator. The second
+		 * option avoids object reuse. The third option is implemented below
+		 * by passing the given object to the heap's top iterator into which
+		 * the next value will be deserialized.
+		 */
+
+		if (this.heap.size() > 0) {
+			// get the smallest element
+			final HeadStream<E> top = this.heap.peek();
+			E result = top.getHead();
+
+			// read an element
+			if (!top.nextHead(reuse)) {
+				this.heap.poll();
+			} else {
+				this.heap.adjustTop();
+			}
+			return result;
+		}
+		else {
+			return null;
+		}
 	}
 
 	/**
@@ -122,6 +147,16 @@ public class MergeIterator<E> implements MutableObjectIterator<E> {
 			return this.head;
 		}
 
+		public boolean nextHead(E reuse) throws IOException {
+			if ((this.head = this.iterator.next(reuse)) != null) {
+				this.comparator.setReference(this.head);
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
+
 		public boolean nextHead() throws IOException {
 			if ((this.head = this.iterator.next()) != null) {
 				this.comparator.setReference(this.head);

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 13159d9..0fa24f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -146,6 +146,11 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	 */
 	protected volatile boolean closed;
 
+	/**
+	 * Whether to reuse objects during deserialization.
+	 */
+	protected final boolean objectReuseEnabled;
+
 	// ------------------------------------------------------------------------
 	//                         Constructor & Shutdown
 	// ------------------------------------------------------------------------
@@ -153,22 +158,24 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 	public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction, int maxNumFileHandles, float startSpillingFraction)
+			double memoryFraction, int maxNumFileHandles, float startSpillingFraction,
+			boolean objectReuseEnabled)
 	throws IOException, MemoryAllocationException
 	{
 		this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
+			memoryFraction, -1, maxNumFileHandles, startSpillingFraction, objectReuseEnabled);
 	}
 	
 	public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager,
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
-			float startSpillingFraction)
+			float startSpillingFraction, boolean objectReuseEnabled)
 	throws IOException, MemoryAllocationException
 	{
 		this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator,
-			memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true);
+			memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true,
+			objectReuseEnabled);
 	}
 	
 	public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory,
@@ -176,11 +183,12 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			int numSortBuffers, int maxNumFileHandles,
-			float startSpillingFraction, boolean handleLargeRecords)
+			float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled)
 	throws IOException
 	{
 		this(memoryManager, memory, ioManager, input, parentTask, serializerFactory, comparator,
-			numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords);
+			numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords,
+			objectReuseEnabled);
 	}
 	
 	protected UnilateralSortMerger(MemoryManager memoryManager,
@@ -188,12 +196,14 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			double memoryFraction, int numSortBuffers, int maxNumFileHandles,
-			float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords)
+			float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords,
+			boolean objectReuseEnabled)
 	throws IOException, MemoryAllocationException
 	{
 		this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)),
 				ioManager, input, parentTask, serializerFactory, comparator,
-				numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true);
+				numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true,
+				objectReuseEnabled);
 	}
 	
 	protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory,
@@ -201,7 +211,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
 			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
 			int numSortBuffers, int maxNumFileHandles,
-			float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords)
+			float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords,
+			boolean objectReuseEnabled)
 	throws IOException
 	{
 		// sanity checks
@@ -216,7 +227,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		}
 		
 		this.memoryManager = memoryManager;
-		
+		this.objectReuseEnabled = objectReuseEnabled;
+
 		// adjust the memory quotas to the page size
 		final int numPagesTotal = memory.size();
 
@@ -1595,10 +1607,17 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 																			this.memManager.getPageSize());
 
 			// read the merged stream and write the data back
-			final TypeSerializer<E> serializer = this.serializer;
-			E rec = serializer.createInstance();
-			while ((rec = mergeIterator.next(rec)) != null) {
-				serializer.serialize(rec, output);
+			if (objectReuseEnabled) {
+				final TypeSerializer<E> serializer = this.serializer;
+				E rec = serializer.createInstance();
+				while ((rec = mergeIterator.next(rec)) != null) {
+					serializer.serialize(rec, output);
+				}
+			} else {
+				E rec;
+				while ((rec = mergeIterator.next()) != null) {
+					serializer.serialize(rec, output);
+				}
 			}
 			output.close();
 			final int numBlocksWritten = output.getBlockCount();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index d83e92e..f59c4a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -134,7 +134,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
-					2, 0.8f);
+					2, 0.8f, true);
 			addInput(sorter.getIterator());
 			
 			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
@@ -180,7 +180,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
-					2, 0.8f);
+					2, 0.8f, false);
 			addInput(sorter.getIterator());
 			
 			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 964f646..cc25c99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -128,7 +128,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
-					4, 0.8f);
+					4, 0.8f, true);
 			addInput(sorter.getIterator());
 			
 			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index 75593b8..e1e2c0a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -115,7 +115,7 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
 				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
-				0.25, 64, 0.7f);
+				0.25, 64, 0.7f, false);
 
 		final Record rec = new Record();
 		rec.setField(1, new IntValue(1));
@@ -156,7 +156,7 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
 				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
-				0.01, 64, 0.005f);
+				0.01, 64, 0.005f, true);
 
 		final Record rec = new Record();
 		rec.setField(1, new IntValue(1));
@@ -205,7 +205,7 @@ public class CombiningUnilateralSortMergerITCase {
 		
 		Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb, 
 				this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
-				0.25, 2, 0.7f);
+				0.25, 2, 0.7f, false);
 
 		// emit data
 		LOG.debug("emitting data");

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 5aa9efb..9f0b3d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -119,7 +119,7 @@ public class ExternalSortITCase {
 			
 			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 				source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-					(double)64/78, 2, 0.9f);
+					(double)64/78, 2, 0.9f, true);
 	
 			// emit data
 			LOG.debug("Reading and sorting data...");
@@ -172,7 +172,7 @@ public class ExternalSortITCase {
 			
 			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-					(double)64/78, 10, 2, 0.9f);
+					(double)64/78, 10, 2, 0.9f, false);
 	
 			// emit data
 			LOG.debug("Reading and sorting data...");
@@ -225,7 +225,7 @@ public class ExternalSortITCase {
 			
 			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-					(double)16/78, 64, 0.7f);
+					(double)16/78, 64, 0.7f, true);
 	
 			// emit data
 			LOG.debug("Reading and sorting data...");
@@ -281,7 +281,7 @@ public class ExternalSortITCase {
 			
 			Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager, 
 					source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
-					(double)64/78, 16, 0.7f);
+					(double)64/78, 16, 0.7f, false);
 			
 			// emit data
 			LOG.debug("Emitting data...");
@@ -341,7 +341,7 @@ public class ExternalSortITCase {
 			LOG.debug("Initializing sortmerger...");
 			
 			Sorter<IntPair> merger = new UnilateralSortMerger<IntPair>(this.memoryManager, this.ioManager, 
-					generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f);
+					generator, this.parentTask, serializerFactory, comparator, (double)64/78, 4, 0.7f, true);
 	
 			// emit data
 			LOG.debug("Emitting data...");

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 951ce30..c806766 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -128,7 +128,7 @@ public class ExternalSortLargeRecordsITCase {
 					this.memoryManager, this.ioManager, 
 					source, this.parentTask,
 					new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-					comparator, 1.0, 1, 128, 0.7f);
+					comparator, 1.0, 1, 128, 0.7f, false);
 			
 			// check order
 			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
@@ -198,7 +198,7 @@ public class ExternalSortLargeRecordsITCase {
 					this.memoryManager, this.ioManager, 
 					source, this.parentTask,
 					new RuntimeSerializerFactory<Tuple2<Long, SomeMaybeLongValue>>(serializer, (Class<Tuple2<Long, SomeMaybeLongValue>>) (Class<?>) Tuple2.class),
-					comparator, 1.0, 1, 128, 0.7f);
+					comparator, 1.0, 1, 128, 0.7f, true);
 			
 			// check order
 			MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> iterator = sorter.getIterator();
@@ -283,7 +283,7 @@ public class ExternalSortLargeRecordsITCase {
 					this.memoryManager, this.ioManager, 
 					source, this.parentTask,
 					new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
-					comparator, 1.0, 1, 128, 0.7f);
+					comparator, 1.0, 1, 128, 0.7f, false);
 			
 			// check order
 			MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
@@ -354,7 +354,7 @@ public class ExternalSortLargeRecordsITCase {
 					this.memoryManager, this.ioManager, 
 					source, this.parentTask,
 					new RuntimeSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
-					comparator, 1.0, 1, 128, 0.7f);
+					comparator, 1.0, 1, 128, 0.7f, true);
 			
 			// check order
 			MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index ece20ff..5136aea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -147,7 +147,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends TestLog
 				comp,
 				this.perSortFractionMem,
 				32,
-				0.8f
+				0.8f,
+				false
 		);
 		this.sorters.add(sorter);
 		this.inputs.add(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 1737349..116fdec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -142,7 +142,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	public void addInputSorted(MutableObjectIterator<Record> input, RecordComparator comp) throws Exception {
 		UnilateralSortMerger<Record> sorter = new UnilateralSortMerger<Record>(
 				this.memManager, this.ioManager, input, this.owner, RecordSerializerFactory.get(), comp,
-				this.perSortFractionMem, 32, 0.8f);
+				this.perSortFractionMem, 32, 0.8f, true);
 		this.sorters.add(sorter);
 		this.inputs.add(null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 924a16b..e2b2430 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -146,7 +146,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends TestLogg
 				this.memManager, this.ioManager, input, this.owner,
 				this.<IN>getInputSerializer(0),
 				comp,
-				this.perSortFractionMem, 32, 0.8f);
+				this.perSortFractionMem, 32, 0.8f, false);
 	}
 	
 	public void addDriverComparator(TypeComparator<IN> comparator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index 1060e55..f112ff8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -133,11 +133,11 @@ public class HashVsSortMiniBenchmark {
 			
 			final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>(
 					this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, 
-					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f);
+					this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
 			
 			final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>(
 					this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, 
-					this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f);
+					this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
 			
 			final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator();
 			final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
index 01dbbc5..8011d21 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java
@@ -146,7 +146,7 @@ public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor {
 								this.runtimeEnvironment.getIOManager(),
 								this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(),
 								this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0),
-								this.config.getSpillingThresholdInput(0));
+								this.config.getSpillingThresholdInput(0), false);
 
 						this.localStrategy = sorter;
 						this.input = sorter.getIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
index a745177..b7cbfb4 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -378,7 +378,7 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 					UnilateralSortMerger<?> sorter = new UnilateralSortMerger(getMemoryManager(), getIOManager(),
 							this.inputIterators[inputNum], this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
 							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-							this.config.getSpillingThresholdInput(inputNum));
+							this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
 					// set the input to null such that it will be lazily fetched from the input strategy
 					this.inputs[inputNum] = null;
 					this.localStrategies[inputNum] = sorter;
@@ -414,7 +414,7 @@ public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> {
 							(GroupCombineFunction) localStub, getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
 							this.invokable, this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
 							this.config.getRelativeMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
-							this.config.getSpillingThresholdInput(inputNum));
+							this.config.getSpillingThresholdInput(inputNum), this.executionConfig.isObjectReuseEnabled());
 					cSorter.setUdfConfiguration(this.config.getStubParameters());
 
 					// set the input to null such that it will be lazily fetched from the input strategy

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index c11b93c..c9bd56b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -91,7 +91,7 @@ public class MassiveStringSorting {
 				MutableObjectIterator<String> inputIterator = new StringReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<String>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<String>(serializer, String.class), comparator, 1.0, 4, 0.8f, false);
 
 				MutableObjectIterator<String> sortedData = sorter.getIterator();
 				
@@ -184,7 +184,7 @@ public class MassiveStringSorting {
 				MutableObjectIterator<Tuple2<String, String[]>> inputIterator = new StringTupleReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<Tuple2<String, String[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<Tuple2<String, String[]>>(serializer, (Class<Tuple2<String, String[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, false);
 
 				
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 7a484e7..9a016cc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -91,7 +91,7 @@ public class MassiveStringValueSorting {
 				MutableObjectIterator<StringValue> inputIterator = new StringValueReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<StringValue>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<StringValue>(serializer, StringValue.class), comparator, 1.0, 4, 0.8f, true);
 
 				MutableObjectIterator<StringValue> sortedData = sorter.getIterator();
 				
@@ -187,7 +187,7 @@ public class MassiveStringValueSorting {
 				MutableObjectIterator<Tuple2<StringValue, StringValue[]>> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader);
 				
 				sorter = new UnilateralSortMerger<Tuple2<StringValue, StringValue[]>>(mm, ioMan, inputIterator, new DummyInvokable(),
-						new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f);
+						new RuntimeSerializerFactory<Tuple2<StringValue, StringValue[]>>(serializer, (Class<Tuple2<StringValue, StringValue[]>>) (Class<?>) Tuple2.class), comparator, 1.0, 4, 0.8f, false);
 
 				
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/0a8df6d5/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
index 7385fa2..a38a19b 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/manual/MassiveCaseClassSortingITCase.scala
@@ -98,7 +98,7 @@ class MassiveCaseClassSortingITCase {
         sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
               new DummyInvokable(), 
               new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
-              comparator, 1.0, 4, 0.8f)
+              comparator, 1.0, 4, 0.8f, false)
             
         val sortedData = sorter.getIterator
         reader.close()