You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/10 01:42:21 UTC

[4/9] git commit: [FLINK-1075] Removed the AsynchronousPartialSorter.

[FLINK-1075] Removed the AsynchronousPartialSorter.

This closes #104


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/64510b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/64510b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/64510b6a

Branch: refs/heads/release-0.6.1
Commit: 64510b6a5cb332b3c2b99471b6f6e8608854ef45
Parents: 8af40c3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 28 18:28:20 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:43:53 2014 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 125 +++++---
 .../runtime/operators/ReduceCombineDriver.java  |   1 -
 .../sort/AsynchronousPartialSorter.java         | 207 -------------
 .../AsynchronousPartialSorterCollector.java     | 101 ------
 .../sort/AsynchonousPartialSorterITCase.java    | 306 -------------------
 5 files changed, 89 insertions(+), 651 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index f786c56..0452ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,15 +23,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Combine operator, standalone (not chained)
  * <p>
@@ -43,16 +49,26 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	
 	private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
 
-	
+	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
+	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
 	private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
-	
-	private CloseableInputProvider<T> input;
 
-	private TypeSerializerFactory<T> serializerFactory;
+	private InMemorySorter<T> sorter;
+
+	private FlatCombineFunction<T> combiner;
+
+	private TypeSerializer<T> serializer;
 
 	private TypeComparator<T> comparator;
-	
-	private volatile boolean running;
+
+	private QuickSort sortAlgo = new QuickSort();
+
+	private MemoryManager memManager;
+
+	private Collector<T> output;
+
+	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
 
@@ -81,55 +97,92 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 	@Override
 	public void prepare() throws Exception {
-		final TaskConfig config = this.taskContext.getTaskConfig();
-		final DriverStrategy ls = config.getDriverStrategy();
+		if(this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_GROUP_COMBINE){
+			throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for " +
+					"group reduce combinder.");
+		}
 
-		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
+		this.memManager = this.taskContext.getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
 
-		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
-		this.serializerFactory = this.taskContext.getInputSerializer(0);
+		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
+		this.serializer = serializerFactory.getSerializer();
 		this.comparator = this.taskContext.getInputComparator(0);
-
-		switch (ls) {
-		case SORTED_GROUP_COMBINE:
-			this.input = new AsynchronousPartialSorter<T>(memoryManager, in, this.taskContext.getOwningNepheleTask(),
-						this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
-			break;
-		// obtain and return a grouped iterator from the combining sort-merger
-		default:
-			throw new RuntimeException("Invalid local strategy provided for CombineTask.");
+		this.combiner = this.taskContext.getStub();
+		this.output = this.taskContext.getOutputCollector();
+
+		final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
+				numMemoryPages);
+
+		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
+		if (this.comparator.supportsSerializationWithKeyNormalization() &&
+				this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
+		{
+			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+		} else {
+			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
 		}
 	}
 
 	@Override
 	public void run() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
+			LOG.debug("Combiner starting.");
 		}
 
-		final KeyGroupedIterator<T> iter = new KeyGroupedIterator<T>(this.input.getIterator(),
-				this.serializerFactory.getSerializer(), this.comparator);
+		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
+		final TypeSerializer<T> serializer = this.serializer;
+
+		T value = serializer.createInstance();
+
+		while (running && (value = in.next(value)) != null) {
+
+			// try writing to the sorter first
+			if (this.sorter.write(value)) {
+				continue;
+			}
 
-		// cache references on the stack
-		final FlatCombineFunction<T> stub = this.taskContext.getStub();
-		final Collector<T> output = this.taskContext.getOutputCollector();
+			// do the actual sorting, combining, and data writing
+			sortAndCombine();
+			this.sorter.reset();
 
-		// run stub implementation
-		while (this.running && iter.nextKey()) {
-			stub.combine(iter.getValues(), output);
+			// write the value again
+			if (!this.sorter.write(value)) {
+				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+			}
+		}
+
+		// sort, combine, and send the final batch
+		sortAndCombine();
+	}
+
+	private void sortAndCombine() throws Exception {
+		final InMemorySorter<T> sorter = this.sorter;
+
+		if (!sorter.isEmpty()) {
+			this.sortAlgo.sort(sorter);
+
+			final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
+					this.comparator);
+
+			final FlatCombineFunction<T> combiner = this.combiner;
+			final Collector<T> output = this.output;
+
+			// iterate over key groups
+			while (this.running && keyIter.nextKey()) {
+				combiner.combine(keyIter.getValues(), output);
+			}
 		}
 	}
 
 	@Override
 	public void cleanup() throws Exception {
-		if (this.input != null) {
-			this.input.close();
-			this.input = null;
-		}
+		this.memManager.release(this.sorter.dispose());
 	}
 
 	@Override
 	public void cancel() {
 		this.running = false;
+		this.memManager.release(this.sorter.dispose());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 87cea30..6b18bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -109,7 +109,6 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
-		this.serializer = serializerFactory.getSerializer();
 		this.comparator = this.taskContext.getInputComparator(0);
 		this.serializer = serializerFactory.getSerializer();
 		this.reducer = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
deleted file mode 100644
index 03794ff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * The {@link AsynchronousPartialSorter} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- */
-public class AsynchronousPartialSorter<E> extends UnilateralSortMerger<E> {
-	
-	private BufferQueueIterator bufferIterator;
-	
-	// ------------------------------------------------------------------------
-	// Constructor
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * 
-	 * 
-	 * @param memoryManager The memory manager from which to allocate the memory.
-	 * @param input The input that is sorted by this sorter.
-	 * @param parentTask The parent task, which owns all resources used by this sorter.
-	 * @param serializerFactory The type serializer.
-	 * @param comparator The type comparator establishing the order relation.
-	 * @param memoryFraction The fraction of memory dedicated to sorting.
-	 * 
-	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
-	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
-	 *                                   perform the sort.
-	 */
-	public AsynchronousPartialSorter(MemoryManager memoryManager,
-			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction)
-	throws IOException, MemoryAllocationException
-	{
-		super(memoryManager, null, input, parentTask, serializerFactory, comparator, memoryFraction, 1, 2, 0.0f, true);
-	}
-	
-
-	public void close() {
-		// make a best effort to close the buffer iterator
-		try {
-			if (this.bufferIterator != null) {
-				this.bufferIterator.close();
-				this.bufferIterator = null;
-			}
-		}
-		finally {
-			super.close();
-		}
-	}
-	
-	/* 
-	 * This method does not actually create a spilling thread, but grabs the circular queues and creates the
-	 * iterator that reads from the sort buffers in turn.
-	 */
-	@Override
-	protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
-			AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
-	{
-		this.bufferIterator = new BufferQueueIterator(queues);
-		setResultIterator(this.bufferIterator);
-		
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-
-	private final class BufferQueueIterator implements MutableObjectIterator<E> {
-		
-		private final CircularQueues<E> queues;
-		
-		private CircularElement<E> currentElement;
-		
-		private MutableObjectIterator<E> currentIterator;
-		
-		private volatile boolean closed = false;
-
-
-		protected BufferQueueIterator(CircularQueues<E> queues) {
-			this.queues = queues;
-		}
-
-
-		@Override
-		public E next(final E reuse) throws IOException {
-			E result;
-			if (this.currentIterator != null && ((result = this.currentIterator.next(reuse)) != null)) {
-				return result;
-			}
-			else if (this.closed) {
-				throw new IllegalStateException("The sorter has been closed.");
-			}
-			else {
-				if (AsynchronousPartialSorter.this.iteratorException != null) {
-					throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
-				}
-				
-				while (true) {
-					if (this.currentElement == endMarker()) {
-						return null;
-					}
-					else if (this.currentElement != null) {
-						// return the current element to the empty queue
-						this.currentElement.buffer.reset();
-						this.queues.empty.add(this.currentElement);
-					}
-					
-					// get a new element
-					try {
-						this.currentElement = null;
-						while (!this.closed && this.currentElement == null) {
-							this.currentElement = this.queues.spill.poll(1000, TimeUnit.MILLISECONDS);
-						}
-						if (AsynchronousPartialSorter.this.iteratorException != null) {
-							throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
-						}
-						
-						if (this.currentElement == endMarker()) {
-							// signals the end, no more buffers will come
-							// release the memory first before returning
-							releaseSortBuffers();
-							return null;
-						}
-						if (this.currentElement == spillingMarker()) {
-							this.currentElement = null;
-							continue;
-						}
-					}
-					catch (InterruptedException e) {
-						throw new RuntimeException("Iterator was interrupted getting the next sortedBuffer.");
-					}
-					
-					this.currentIterator = this.currentElement.buffer.getIterator();
-					if ((result = this.currentIterator.next(reuse)) != null) {
-						return result;
-					}
-					this.currentIterator = null;
-				}
-			}
-		}
-		
-		public void close() {
-			synchronized (this) {
-				if (this.closed) {
-					return;
-				}
-				this.closed = true;
-			}
-			
-			if (this.currentElement != null) {
-				this.queues.empty.add(this.currentElement);
-				this.currentElement = null;
-			}
-			if (this.currentIterator != null) {
-				this.currentIterator = null;
-			}
-		}
-		
-		private final void releaseSortBuffers() 	{
-			while (!this.queues.empty.isEmpty()) {
-				final CircularElement<E> elem = this.queues.empty.poll();
-				if (elem != null) {
-					final InMemorySorter<E> sorter = elem.buffer;
-					final List<MemorySegment> segments = sorter.dispose();
-					AsynchronousPartialSorter.this.memoryManager.release(segments);
-				}
-			}
-		}
-
-	};
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
deleted file mode 100644
index a41dbf1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The {@link AsynchronousPartialSorterCollector} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- * <p>
- * In contract to the {@link AsynchronousPartialSorter}, this class has no dedicated reading thread that
- * pulls records from an iterator, but offers a collector into which data to be sorted is pushed.
- * 
- */
-public class AsynchronousPartialSorterCollector<E> extends AsynchronousPartialSorter<E> {
-	
-	private InputDataCollector<E> collector;
-	
-	// ------------------------------------------------------------------------
-	// Constructor
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * @param memoryManager The memory manager from which to allocate the memory.
-	 * @param parentTask The parent task, which owns all resources used by this sorter.
-	 * @param serializerFactory The type serializer.
-	 * @param comparator The type comparator establishing the order relation.
-	 * @param memoryFraction The fraction of memory dedicated to sorting.
-	 * 
-	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
-	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
-	 *                                   perform the sort.
-	 */
-	public AsynchronousPartialSorterCollector(MemoryManager memoryManager,
-			AbstractInvokable parentTask, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction)
-	throws IOException, MemoryAllocationException
-	{
-		super(memoryManager, null, parentTask, serializerFactory, comparator,
-				memoryFraction);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the collector that writes into the sort buffers.
-	 * 
-	 * @return The collector that writes into the sort buffers.
-	 */
-	public InputDataCollector<E> getInputCollector() {
-		return this.collector;
-	}
-
-	@Override
-	protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler,
-		MutableObjectIterator<E> reader, CircularQueues<E> queues, AbstractInvokable parentTask,
-		TypeSerializer<E> serializer, long startSpillingBytes)
-	{
-		this.collector = new InputDataCollector<E>(queues, startSpillingBytes);
-		return null;
-	}
-	
-
-	public void close() {
-		try {
-			if (this.collector != null) {
-				this.collector.close();
-			}
-		}
-		finally {
-			super.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
deleted file mode 100644
index 90fe59f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.sort.ExceptionHandler;
-import org.apache.flink.runtime.operators.sort.Sorter;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class AsynchonousPartialSorterITCase {
-	
-	private static final Log LOG = LogFactory.getLog(AsynchonousPartialSorterITCase.class);
-
-	private static final long SEED = 649180756312423613L;
-
-	private static final int KEY_MAX = Integer.MAX_VALUE;
-
-	private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
-	
-	private static final int VALUE_LENGTH = 114;
-
-	public static final int MEMORY_SIZE = 1024 * 1024 * 32;
-	
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-
-	private MemoryManager memoryManager;
-	
-	private TypeSerializerFactory<Record> serializer;
-	
-	private TypeComparator<Record> comparator;
-
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest()
-	{
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1);
-		this.ioManager = new IOManager();
-		this.serializer = RecordSerializerFactory.get();
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-	}
-
-	@After
-	public void afterTest()
-	{
-		this.ioManager.shutdown();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
-		
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-
-	@Test
-	public void testSmallSortInOneWindow() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 1000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 1.0);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 0);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testLargeSortAcrossTwoWindows() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 100000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 0.2);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 2);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testLargeSortAcrossMultipleWindows() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 1000000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 0.15);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 27);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testExceptionForwarding() throws IOException
-	{
-		try {
-			Sorter<Record> sorter = null;
-			try	{
-				final int NUM_RECORDS = 100;
-
-				// reader
-				final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-				final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-				
-				// merge iterator
-				LOG.debug("Initializing sortmerger...");
-				sorter = new ExceptionThrowingAsynchronousPartialSorter<Record>(this.memoryManager, source,
-						this.parentTask, this.serializer, this.comparator, 1.0);
-		
-				runPartialSorter(sorter, NUM_RECORDS, 0);
-				
-				Assert.fail("Expected Test Exception not thrown.");
-			} catch(Exception e) {
-				if (!containsTriggerException(e)) {
-					throw e;
-				}
-			} finally {
-				if (sorter != null) {
-					sorter.close();
-				}
-			}
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	private static void runPartialSorter(Sorter<Record> sorter, 
-								int expectedNumResultRecords, int expectedNumWindowTransitions)
-	throws Exception
-	{
-		// check order
-		final MutableObjectIterator<Record> iterator = sorter.getIterator();
-		int pairsEmitted = 1;
-		int windowTransitions = 0;
-		
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		LOG.debug("Checking results...");
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null)
-		{
-			final TestData.Key k1 = rec1.getField(0, TestData.Key.class);
-			final TestData.Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsEmitted++;
-			
-			// if the next key is smaller again, we have a new window
-			if (k1.compareTo(k2) > 0) {
-				windowTransitions++;
-			}
-			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
-			
-			rec2 = tmp;
-		}
-		
-		sorter.close();
-		
-		Assert.assertEquals("Sorter did not return the expected number of result records.",
-			expectedNumResultRecords, pairsEmitted);
-		Assert.assertEquals("The partial sorter made an unexpected number of window transitions.",
-			expectedNumWindowTransitions, windowTransitions); 
-	}
-	
-	private static boolean containsTriggerException(Throwable exception)
-	{
-		while (exception != null) {
-			if (exception.getClass().equals(TriggeredException.class)) {
-				return true;
-			}
-			exception = exception.getCause();
-		}
-		return false;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//              					 Internal classes
-	// --------------------------------------------------------------------------------------------
-	
-	/*
-	 * Mock exception thrown on purpose.
-	 */
-	@SuppressWarnings("serial")
-	private static class TriggeredException extends IOException {}
-	
-	/*
-	 * Mocked sorter that throws an exception in the sorting thread.
-	 */
-	private static class ExceptionThrowingAsynchronousPartialSorter<E> extends AsynchronousPartialSorter<E>
-	{	
-		protected static class ExceptionThrowingSorterThread<E> extends SortingThread<E> {
-				
-			public ExceptionThrowingSorterThread(ExceptionHandler<IOException> exceptionHandler,
-						org.apache.flink.runtime.operators.sort.UnilateralSortMerger.CircularQueues<E> queues,
-						AbstractInvokable parentTask)
-			{
-				super(exceptionHandler, queues, parentTask);
-			}
-	
-			@Override
-			public void go() throws IOException {
-				throw new TriggeredException();
-			}
-		}
-
-		public ExceptionThrowingAsynchronousPartialSorter(MemoryManager memoryManager,
-				MutableObjectIterator<E> input, AbstractInvokable parentTask, 
-				TypeSerializerFactory<E> serializer, TypeComparator<E> comparator,
-				double memoryFraction)
-		throws IOException, MemoryAllocationException
-		{
-			super(memoryManager, input, parentTask, serializer, comparator, memoryFraction);
-		}
-
-
-		@Override
-		protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
-				AbstractInvokable parentTask)
-		{
-			return new ExceptionThrowingSorterThread<E>(exceptionHandler, queues, parentTask);
-		}		
-	}
-}