You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/21 23:34:43 UTC

[2/4] flink git commit: [FLINK-2648] [FLINK-2717] [runtime] Harden memory release in sorters against asynchronous canceling

[FLINK-2648] [FLINK-2717] [runtime] Harden memory release in sorters against asynchronous canceling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/435ee4eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/435ee4eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/435ee4eb

Branch: refs/heads/master
Commit: 435ee4ebb5ec2ac0086e1a5136125ecc1d7c89a8
Parents: bd74bae
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 21 21:05:45 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 21 23:18:04 2015 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 28 ++++++---
 .../runtime/operators/ReduceCombineDriver.java  | 26 +++++---
 .../chaining/GroupCombineChainedDriver.java     | 45 ++++++++------
 .../SynchronousChainedCombineDriver.java        | 51 ++++++++--------
 .../operators/sort/FixedLengthRecordSorter.java | 37 +++---------
 .../runtime/operators/sort/InMemorySorter.java  | 22 ++++---
 .../operators/sort/NormalizedKeySorter.java     | 39 +++---------
 .../operators/sort/UnilateralSortMerger.java    | 18 +++---
 .../sort/FixedLengthRecordSorterTest.java       | 32 +++++-----
 .../operators/sort/NormalizedKeySorterTest.java | 63 ++++++++++----------
 10 files changed, 176 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index 7115a4d..ab46d95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -74,10 +74,10 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 
 	private QuickSort sortAlgo = new QuickSort();
 
-	private MemoryManager memManager;
-
 	private Collector<OUT> output;
 
+	private List<MemorySegment> memory;
+
 	private long oversizedRecordCount;
 
 	private volatile boolean running = true;
@@ -115,9 +115,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 		if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
 			throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
 		}
-
-		this.memManager = this.taskContext.getMemoryManager();
-		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+		
+		
 
 		final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
 		this.serializer = serializerFactory.getSerializer();
@@ -128,8 +127,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 		this.combiner = this.taskContext.getStub();
 		this.output = this.taskContext.getOutputCollector();
 
-		final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
-				numMemoryPages);
+		MemoryManager memManager = this.taskContext.getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+		this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
 		if (sortingComparator.supportsSerializationWithKeyNormalization() &&
@@ -218,16 +218,26 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
 	@Override
 	public void cleanup() throws Exception {
 		if (this.sorter != null) {
-			this.memManager.release(this.sorter.dispose());
+			this.sorter.dispose();
 		}
+
+		this.taskContext.getMemoryManager().release(this.memory);
 	}
 
 	@Override
 	public void cancel() {
 		this.running = false;
+		
 		if (this.sorter != null) {
-			this.memManager.release(this.sorter.dispose());
+			try {
+				this.sorter.dispose();
+			}
+			catch (Exception e) {
+				// may happen during concurrent modification
+			}
 		}
+
+		this.taskContext.getMemoryManager().release(this.memory);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 19557bc..26da0ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -63,12 +63,12 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 	
 	private Collector<T> output;
 	
-	private MemoryManager memManager;
-	
 	private InMemorySorter<T> sorter;
 	
 	private QuickSort sortAlgo = new QuickSort();
 
+	private List<MemorySegment> memory;
+
 	private boolean running;
 
 	private boolean objectReuseEnabled = false;
@@ -105,10 +105,6 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 			throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for reduce combiner.");
 		}
 		
-		this.memManager = this.taskContext.getMemoryManager();
-		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig()
-				.getRelativeMemoryDriver());
-		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
 		this.comparator = this.taskContext.getDriverComparator(0);
@@ -116,7 +112,10 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 		this.reducer = this.taskContext.getStub();
 		this.output = this.taskContext.getOutputCollector();
 
-		final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
+		MemoryManager memManager = this.taskContext.getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(
+				this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+		this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
 		if (this.comparator.supportsSerializationWithKeyNormalization() &&
@@ -241,12 +240,21 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 
 	@Override
 	public void cleanup() {
-		this.memManager.release(this.sorter.dispose());
+		this.sorter.dispose();
+		this.taskContext.getMemoryManager().release(this.memory);
 	}
 
 	@Override
 	public void cancel() {
 		this.running = false;
-		this.memManager.release(this.sorter.dispose());
+		
+		try {
+			this.sorter.dispose();
+		}
+		catch (Exception e) {
+			// may happen during concurrent modifications
+		}
+
+		this.taskContext.getMemoryManager().release(this.memory);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index 09f4288..08ad25b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -71,15 +71,13 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 	private TypeSerializer<IN> serializer;
 
-	private TypeComparator<IN> sortingComparator;
-
 	private TypeComparator<IN> groupingComparator;
 
 	private AbstractInvokable parent;
 
 	private QuickSort sortAlgo = new QuickSort();
 
-	private MemoryManager memManager;
+	private List<MemorySegment> memory;
 
 	private volatile boolean running = true;
 
@@ -102,28 +100,28 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 		final Configuration stubConfig = this.config.getStubParameters();
 		RegularPactTask.openUserCode(this.reducer, stubConfig);
 
-		// ----------------- Set up the asynchronous sorter -------------------------
-
-		this.memManager = this.parent.getEnvironment().getMemoryManager();
-		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+		// ----------------- Set up the sorter -------------------------
 
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
 		final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
 		final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
 		this.serializer = serializerFactory.getSerializer();
-		this.sortingComparator = sortingComparatorFactory.createComparator();
+		
+		TypeComparator<IN> sortingComparator = sortingComparatorFactory.createComparator();
 		this.groupingComparator = groupingComparatorFactory.createComparator();
 
-		final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
+		MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+		this.memory = memManager.allocatePages(this.parent, numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+		if (sortingComparator.supportsSerializationWithKeyNormalization() &&
 			this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -133,19 +131,30 @@ public class GroupCombineChainedDriver<IN, OUT> extends ChainedDriver<IN, OUT> {
 
 	@Override
 	public void closeTask() throws Exception {
-		this.memManager.release(this.sorter.dispose());
-
-		if (!this.running) {
-			return;
+		if (this.sorter != null) {
+			this.sorter.dispose();
 		}
+		this.parent.getEnvironment().getMemoryManager().release(this.memory);
 
-		RegularPactTask.closeUserCode(this.reducer);
+		if (this.running) {
+			RegularPactTask.closeUserCode(this.reducer);
+		}
 	}
 
 	@Override
 	public void cancelTask() {
 		this.running = false;
-		this.memManager.release(this.sorter.dispose());
+
+		if (this.sorter != null) {
+			try {
+				this.sorter.dispose();
+			}
+			catch (Exception e) {
+				// may happen during concurrent modification
+			}
+		}
+		
+		this.parent.getEnvironment().getMemoryManager().release(this.memory);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 75c1eed..da9698c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -58,9 +58,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 	private static final Logger LOG = LoggerFactory.getLogger(SynchronousChainedCombineDriver.class);
 
 
-	/**
-	 * Fix length records with a length below this threshold will be in-place sorted, if possible.
-	 */
+	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
 	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 
 	// --------------------------------------------------------------------------------------------
@@ -71,16 +69,14 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 	private TypeSerializer<IN> serializer;
 
-	private TypeComparator<IN> sortingComparator;
-
 	private TypeComparator<IN> groupingComparator;
 
 	private AbstractInvokable parent;
 
-	private QuickSort sortAlgo = new QuickSort();
-
-	private MemoryManager memManager;
+	private final QuickSort sortAlgo = new QuickSort();
 
+	private List<MemorySegment> memory;
+	
 	private volatile boolean running = true;
 
 	// --------------------------------------------------------------------------------------------
@@ -102,28 +98,29 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 		final Configuration stubConfig = this.config.getStubParameters();
 		RegularPactTask.openUserCode(this.combiner, stubConfig);
 
-		// ----------------- Set up the asynchronous sorter -------------------------
-
-		this.memManager = this.parent.getEnvironment().getMemoryManager();
-		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+		// ----------------- Set up the sorter -------------------------
 
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<IN> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
 		final TypeComparatorFactory<IN> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
 		final TypeComparatorFactory<IN> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
+		
 		this.serializer = serializerFactory.getSerializer();
-		this.sortingComparator = sortingComparatorFactory.createComparator();
-		this.groupingComparator = groupingComparatorFactory.createComparator();
 
-		final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
+		TypeComparator<IN> sortingComparator = sortingComparatorFactory.createComparator();
+		this.groupingComparator = groupingComparatorFactory.createComparator();
+		
+		MemoryManager memManager = this.parent.getEnvironment().getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.config.getRelativeMemoryDriver());
+		this.memory = memManager.allocatePages(this.parent, numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+		if (sortingComparator.supportsSerializationWithKeyNormalization() &&
 			this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+			this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, this.memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory);
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -133,19 +130,25 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends ChainedDriver<IN,
 
 	@Override
 	public void closeTask() throws Exception {
-		this.memManager.release(this.sorter.dispose());
+		this.sorter.dispose();
+		this.parent.getEnvironment().getMemoryManager().release(this.memory);
 
-		if (!this.running) {
-			return;
+		if (this.running) {
+			RegularPactTask.closeUserCode(this.combiner);
 		}
-
-		RegularPactTask.closeUserCode(this.combiner);
 	}
 
 	@Override
 	public void cancelTask() {
 		this.running = false;
-		this.memManager.release(this.sorter.dispose());
+		try {
+			this.sorter.dispose();
+		}
+		catch (Exception e) {
+			// may happen during concurrent modification when canceling
+		}
+		
+		this.parent.getEnvironment().getMemoryManager().release(this.memory);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
index e932592..da96b17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorter.java
@@ -121,13 +121,7 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 		this.lastEntryOffset = (this.recordsPerSegment - 1) * this.recordSize;
 		this.swapBuffer = new byte[this.recordSize];
 		
-		if (memory instanceof ArrayList<?>) {
-			this.freeMemory = (ArrayList<MemorySegment>) memory;
-		}
-		else {
-			this.freeMemory = new ArrayList<MemorySegment>(memory.size());
-			this.freeMemory.addAll(memory);
-		}
+		this.freeMemory = new ArrayList<MemorySegment>(memory);
 		
 		// create the buffer collections
 		this.sortBuffer = new ArrayList<MemorySegment>(16);
@@ -174,16 +168,10 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 		return this.numRecords == 0;
 	}
 	
-	/**
-	 * Collects all memory segments from this sorter.
-	 * 
-	 * @return All memory segments from this sorter.
-	 */
 	@Override
-	public List<MemorySegment> dispose() {
-		this.freeMemory.addAll(this.sortBuffer);
+	public void dispose() {
+		this.freeMemory.clear();
 		this.sortBuffer.clear();
-		return this.freeMemory;
 	}
 	
 	@Override
@@ -195,23 +183,16 @@ public final class FixedLengthRecordSorter<T> implements InMemorySorter<T> {
 	public long getOccupancy() {
 		return this.sortBufferBytes;
 	}
-	
-	@Override
-	public long getNumRecordBytes() {
-		return this.sortBufferBytes;
-	}
 
 	// -------------------------------------------------------------------------
 	// Retrieving and Writing
 	// -------------------------------------------------------------------------
-
-	/**
-	 * Gets the record at the given logical position.
-	 * 
-	 * @param reuse The reuse object to deserialize the record into.
-	 * @param logicalPosition The logical position of the record.
-	 * @throws IOException Thrown, if an exception occurred during deserialization.
-	 */
+	
+	@Override
+	public T getRecord(int logicalPosition) throws IOException {
+		return getRecord(serializer.createInstance(), logicalPosition);
+	}
+	
 	@Override
 	public T getRecord(T reuse, int logicalPosition) throws IOException {
 		final int buffer = logicalPosition / this.recordsPerSegment;

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
index 633ec70..a47041b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/InMemorySorter.java
@@ -19,9 +19,7 @@
 package org.apache.flink.runtime.operators.sort;
 
 import java.io.IOException;
-import java.util.List;
 
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -43,11 +41,10 @@ public interface InMemorySorter<T> extends IndexedSortable {
 	boolean isEmpty();
 	
 	/**
-	 * Collects all memory segments from this sorter.
-	 * 
-	 * @return All memory segments from this sorter.
+	 * Disposes the sorter.
+	 * This method does not release the memory segments used by the sorter.
 	 */
-	List<MemorySegment> dispose();
+	void dispose();
 	
 	/**
 	 * Gets the total capacity of this sorter, in bytes.
@@ -62,13 +59,14 @@ public interface InMemorySorter<T> extends IndexedSortable {
 	 * @return The number of bytes occupied.
 	 */
 	long getOccupancy();
-	
+
 	/**
-	 * Gets the number of bytes occupied by records only.
-	 * 
-	 * @return The number of bytes occupied by records.
+	 * Gets the record at the given logical position.
+	 *
+	 * @param logicalPosition The logical position of the record.
+	 * @throws IOException Thrown, if an exception occurred during deserialization.
 	 */
-	long getNumRecordBytes();
+	T getRecord(int logicalPosition) throws IOException;
 	
 	/**
 	 * Gets the record at the given logical position.
@@ -114,5 +112,5 @@ public interface InMemorySorter<T> extends IndexedSortable {
 	 * @param num The number of elements to write.
 	 * @throws IOException Thrown, if an I/O exception occurred writing to the output view.
 	 */
-	public void writeToOutput(final ChannelWriterOutputView output, final int start, int num) throws IOException;
+	public void writeToOutput(ChannelWriterOutputView output, int start, int num) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
index c05e518..9e1882c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java
@@ -133,14 +133,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 			throw new IllegalArgumentException("Normalized-Key sorter requires at least " + MIN_REQUIRED_BUFFERS + " memory buffers.");
 		}
 		this.segmentSize = memory.get(0).size();
-		
-		if (memory instanceof ArrayList<?>) {
-			this.freeMemory = (ArrayList<MemorySegment>) memory;
-		}
-		else {
-			this.freeMemory = new ArrayList<MemorySegment>(memory.size());
-			this.freeMemory.addAll(memory);
-		}
+		this.freeMemory = new ArrayList<MemorySegment>(memory);
 		
 		// create the buffer collections
 		this.sortIndex = new ArrayList<MemorySegment>(16);
@@ -220,20 +213,11 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 		return this.numRecords == 0;
 	}
 	
-	/**
-	 * Collects all memory segments from this sorter.
-	 * 
-	 * @return All memory segments from this sorter.
-	 */
 	@Override
-	public List<MemorySegment> dispose() {
-		this.freeMemory.addAll(this.sortIndex);
-		this.freeMemory.addAll(this.recordBufferSegments);
-		
+	public void dispose() {
+		this.freeMemory.clear();
 		this.recordBufferSegments.clear();
 		this.sortIndex.clear();
-		
-		return this.freeMemory;
 	}
 	
 	@Override
@@ -245,23 +229,16 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T> {
 	public long getOccupancy() {
 		return this.currentDataBufferOffset + this.sortIndexBytes;
 	}
-	
-	@Override
-	public long getNumRecordBytes() {
-		return this.currentDataBufferOffset;
-	}
 
 	// -------------------------------------------------------------------------
 	// Retrieving and Writing
 	// -------------------------------------------------------------------------
 
-	/**
-	 * Gets the record at the given logical position.
-	 * 
-	 * @param reuse The target object to deserialize the record into.
-	 * @param logicalPosition The logical position of the record.
-	 * @throws IOException Thrown, if an exception occurred during deserialization.
-	 */
+	@Override
+	public T getRecord(int logicalPosition) throws IOException {
+		return getRecordFromBuffer(readPointer(logicalPosition));
+	}
+	
 	@Override
 	public T getRecord(T reuse, int logicalPosition) throws IOException {
 		return getRecordFromBuffer(reuse, readPointer(logicalPosition));

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 32fbb52..13159d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -342,7 +342,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 			}
 
 			// add to empty queue
-			CircularElement<E> element = new CircularElement<E>(i, buffer);
+			CircularElement<E> element = new CircularElement<E>(i, buffer, sortSegments);
 			circularQueues.empty.add(element);
 		}
 
@@ -686,15 +686,18 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		
 		final int id;
 		final InMemorySorter<E> buffer;
+		final List<MemorySegment> memory;
 
 		public CircularElement() {
 			this.id = -1;
 			this.buffer = null;
+			this.memory = null;
 		}
 
-		public CircularElement(int id, InMemorySorter<E> buffer) {
+		public CircularElement(int id, InMemorySorter<E> buffer, List<MemorySegment> memory) {
 			this.id = id;
 			this.buffer = buffer;
+			this.memory = memory;
 		}
 	}
 
@@ -1199,7 +1202,7 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		public void go() throws IOException {
 			
 			final Queue<CircularElement<E>> cache = new ArrayDeque<CircularElement<E>>();
-			CircularElement<E> element = null;
+			CircularElement<E> element;
 			boolean cacheOnly = false;
 			
 			// ------------------- In-Memory Cache ------------------------
@@ -1236,7 +1239,8 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 				
 				CircularElement<E> circElement;
 				while ((circElement = this.queues.empty.poll()) != null) {
-					memoryForLargeRecordSorting.addAll(circElement.buffer.dispose());
+					circElement.buffer.dispose();
+					memoryForLargeRecordSorting.addAll(circElement.memory);
 				}
 				
 				if (memoryForLargeRecordSorting.isEmpty()) {
@@ -1440,10 +1444,10 @@ public class UnilateralSortMerger<E> implements Sorter<E> {
 		protected final void disposeSortBuffers(boolean releaseMemory) {
 			while (!this.queues.empty.isEmpty()) {
 				try {
-					final InMemorySorter<?> sorter = this.queues.empty.take().buffer;
-					final List<MemorySegment> sorterMem = sorter.dispose();
+					CircularElement<E> element = this.queues.empty.take();
+					element.buffer.dispose();
 					if (releaseMemory) {
-						this.memManager.release(sorterMem);
+						this.memManager.release(element.memory);
 					}
 				}
 				catch (InterruptedException iex) {

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index f40171a..517bec3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -33,14 +33,12 @@ import org.apache.flink.runtime.operators.testutils.types.IntPair;
 import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * 
- */
 public class FixedLengthRecordSorterTest {
 	
 	private static final long SEED = 649180756312423613L;
@@ -125,7 +123,8 @@ public class FixedLengthRecordSorterTest {
 //		System.out.println("RECORDS " + num);
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
@@ -170,7 +169,8 @@ public class FixedLengthRecordSorterTest {
 		Assert.assertEquals("Incorrect number of records", num, count);
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
@@ -225,7 +225,8 @@ public class FixedLengthRecordSorterTest {
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	/**
@@ -276,7 +277,8 @@ public class FixedLengthRecordSorterTest {
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	/**
@@ -318,7 +320,8 @@ public class FixedLengthRecordSorterTest {
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
@@ -345,11 +348,10 @@ public class FixedLengthRecordSorterTest {
 		MutableObjectIterator<IntPair> iter = sorter.getIterator();
 		IntPair readTarget = new IntPair();
 		
-		int current = 0;
-		int last = 0;
+		int current;
+		int last;
 		
 		iter.next(readTarget);
-		//readTarget.getFieldInto(0, last);
 		last = readTarget.getKey();
 		
 		while ((readTarget = iter.next(readTarget)) != null) {
@@ -359,13 +361,11 @@ public class FixedLengthRecordSorterTest {
 			if (cmp > 0) {
 				Assert.fail("Next key is not larger or equal to previous key.");
 			}
-			
-			int tmp = current;
-			current = last;
-			last = tmp;
+			last = current;
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/435ee4eb/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
index 3e941dd..f8a8f11 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
@@ -35,15 +35,15 @@ import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
 import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- */
-public class NormalizedKeySorterTest
-{
+
+public class NormalizedKeySorterTest {
+	
 	private static final long SEED = 649180756312423613L;
 	
 	private static final long SEED2 = 97652436586326573L;
@@ -76,16 +76,14 @@ public class NormalizedKeySorterTest
 		}
 	}
 
-	private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception
-	{
-		@SuppressWarnings("unchecked")
-		RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{Key.class});
+	private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception {
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{ Key.class });
 		return new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
 	}
 
 	@Test
-	public void testWriteAndRead() throws Exception
-	{
+	public void testWriteAndRead() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
@@ -122,12 +120,12 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
-	public void testWriteAndIterator() throws Exception
-	{
+	public void testWriteAndIterator() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
@@ -161,12 +159,12 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
-	public void testReset() throws Exception
-	{
+	public void testReset() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
@@ -217,7 +215,8 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	/**
@@ -226,8 +225,7 @@ public class NormalizedKeySorterTest
 	 * and compares for equality.
 	 */
 	@Test
-	public void testSwap() throws Exception
-	{
+	public void testSwap() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
@@ -270,7 +268,8 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	/**
@@ -279,8 +278,7 @@ public class NormalizedKeySorterTest
 	 * ones.
 	 */
 	@Test
-	public void testCompare() throws Exception
-	{
+	public void testCompare() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
@@ -314,12 +312,12 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
-	public void testSort() throws Exception
-	{
+	public void testSort() throws Exception {
 		final int NUM_RECORDS = 559273;
 		
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
@@ -364,15 +362,16 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
 	public void testSortShortStringKeys() throws Exception {
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
-		
-		@SuppressWarnings("unchecked")
+
+		@SuppressWarnings({"unchecked", "rawtypes"})
 		RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
 		NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
 		
@@ -412,7 +411,8 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 	
 	@Test
@@ -420,7 +420,7 @@ public class NormalizedKeySorterTest
 		final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
 		final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
 		
-		@SuppressWarnings("unchecked")
+		@SuppressWarnings({"unchecked", "rawtypes"})
 		RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
 		NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
 		
@@ -460,6 +460,7 @@ public class NormalizedKeySorterTest
 		}
 		
 		// release the memory occupied by the buffers
-		this.memoryManager.release(sorter.dispose());
+		sorter.dispose();
+		this.memoryManager.release(memory);
 	}
 }