You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/26 19:24:39 UTC

[1/2] flink git commit: [FLINK-2189] [runtime] Fix various issues in hash table - check for memory availability before probing - correctly compute memory required for recursive build fast path - remove all temp files properly

Repository: flink
Updated Branches:
  refs/heads/master e02c3019d -> 4a5fe4e63


[FLINK-2189] [runtime] Fix various issues in hash table
  - check for memory availability before probing
  - correctly compute memory required for recursive build fast path
  - remove all temp files properly


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a5fe4e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a5fe4e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a5fe4e6

Branch: refs/heads/master
Commit: 4a5fe4e631d53c0c940c2899978ee72e23690c86
Parents: ced6a19
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 26 17:14:09 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 26 18:48:17 2015 +0200

----------------------------------------------------------------------
 .../iomanager/AsynchronousFileIOChannel.java    |   4 +
 .../runtime/operators/hash/HashPartition.java   |  82 +++----
 .../operators/hash/MutableHashTable.java        | 143 ++++++------
 .../hash/ReOpenableMutableHashTable.java        |   2 +-
 .../runtime/operators/hash/HashTableTest.java   | 221 +++++++++++++++++++
 .../hash/RecordsAndWidthsCombinationCheck.java  | 199 +++++++++++++++++
 6 files changed, 517 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index cca04b7..aefeddb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -294,6 +294,10 @@ final class SegmentReadRequest implements ReadRequest {
 	private final MemorySegment segment;
 
 	protected SegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) {
+		if (segment == null) {
+			throw new NullPointerException("Illegal read request with null memory segment.");
+		}
+		
 		this.channel = targetChannel;
 		this.segment = segment;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index cee9ebb..b899acd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.hash;
 
 import java.io.EOFException;
@@ -41,14 +40,13 @@ import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
-
 /**
  * 
  * @param <BT> The type of the build side records.
  * @param <PT> The type of the probe side records.
  */
-public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView
-{
+public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView {
+	
 	// --------------------------------- Table Structure Auxiliaries ------------------------------------
 	
 	protected MemorySegment[] overflowSegments;	// segments in which overflow buckets from the table structure are stored
@@ -71,7 +69,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	
 	private int finalBufferLimit;
 	
-	private BuildSideBuffer<BT> buildSideWriteBuffer;
+	private BuildSideBuffer buildSideWriteBuffer;
 	
 	protected ChannelWriterOutputView probeSideBuffer;
 	
@@ -107,8 +105,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	
 	// --------------------------------------------------------------------------------------------------
 	
-	
-	
 	/**
 	 * Creates a new partition, initially in memory, with one buffer for the build side. The partition is
 	 * initialized to expect record insertions for the build side.
@@ -136,7 +132,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		this.numOverflowSegments = 0;
 		this.nextOverflowBucket = 0;
 		
-		this.buildSideWriteBuffer = new BuildSideBuffer<BT>(initialBuffer, memSource);
+		this.buildSideWriteBuffer = new BuildSideBuffer(initialBuffer, memSource);
 	}
 	
 	/**
@@ -239,8 +235,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	 * @return A pointer to the object in the partition, or <code>-1</code>, if the partition is spilled.
 	 * @throws IOException Thrown, when this is a spilled partition and the write failed.
 	 */
-	public final long insertIntoBuildBuffer(BT record) throws IOException
-	{
+	public final long insertIntoBuildBuffer(BT record) throws IOException {
 		this.buildSideRecordCounter++;
 		
 		if (isInMemory()) {
@@ -263,8 +258,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	 * @param record The record to be inserted into the probe side buffers.
 	 * @throws IOException Thrown, if the buffer is full, needs to be spilled, and spilling causes an error.
 	 */
-	public final void insertIntoProbeBuffer(PT record) throws IOException
-	{
+	public final void insertIntoProbeBuffer(PT record) throws IOException {
 		this.probeSideSerializer.serialize(record, this.probeSideBuffer);
 		this.probeSideRecordCounter++;
 	}
@@ -331,8 +325,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	}
 	
 	/**
-	 * @param freeMemory
-	 * @param spilledPartitions
 	 * @return The number of write-behind buffers reclaimable after this method call.
 	 * 
 	 * @throws IOException
@@ -351,8 +343,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			this.numOverflowSegments = 0;
 			this.nextOverflowBucket = 0;
 			// return the partition buffers
-			for (int i = 0; i < this.partitionBuffers.length; i++) {
-				freeMemory.add(this.partitionBuffers[i]);
+			for (MemorySegment partitionBuffer : this.partitionBuffers) {
+				freeMemory.add(partitionBuffer);
 			}
 			this.partitionBuffers = null;
 			return 0;
@@ -366,7 +358,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			this.probeSideChannel.close();
 			this.buildSideChannel.deleteChannel();
 			this.probeSideChannel.deleteChannel();
-			
 			return 0;
 		}
 		else {
@@ -378,10 +369,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		}
 	}
 	
-
 	
-	public void clearAllMemory(List<MemorySegment> target)
-	{
+	public void clearAllMemory(List<MemorySegment> target) {
 		// return current buffers from build side and probe side
 		if (this.buildSideWriteBuffer != null) {
 			if (this.buildSideWriteBuffer.getCurrentSegment() != null) {
@@ -405,8 +394,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		
 		// return the partition buffers
 		if (this.partitionBuffers != null) {
-			for (int k = 0; k < this.partitionBuffers.length; k++) {
-				target.add(this.partitionBuffers[k]);
+			for (MemorySegment partitionBuffer : this.partitionBuffers) {
+				target.add(partitionBuffer);
 			}
 			this.partitionBuffers = null;
 		}
@@ -421,15 +410,13 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 				this.probeSideChannel.close();
 				this.probeSideChannel.deleteChannel();
 			}
-			
 		}
 		catch (IOException ioex) {
 			throw new RuntimeException("Error deleting the partition files. Some temporary files might not be removed.");
 		}
 	}
 	
-	final PartitionIterator getPartitionIterator(TypeComparator<BT> comparator) throws IOException
-	{
+	final PartitionIterator getPartitionIterator(TypeComparator<BT> comparator) throws IOException {
 		return new PartitionIterator(comparator);
 	}
 	
@@ -457,20 +444,13 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		this.probeSideChannel = ioAccess.createBlockChannelWriter(probeChannelEnumerator.next(), bufferReturnQueue);
 		this.probeSideBuffer = new ChannelWriterOutputView(this.probeSideChannel, this.memorySegmentSize);
 	}
-		
-
-
-
-
-	
 
 	
 	// --------------------------------------------------------------------------------------------------
 	//                   Methods to provide input view abstraction for reading probe records
 	// --------------------------------------------------------------------------------------------------
 	
-	public void setReadPosition(long pointer)
-	{	
+	public void setReadPosition(long pointer) {
 		final int bufferNum = (int) (pointer >>> this.segmentSizeBits);
 		final int offset = (int) (pointer & (this.memorySegmentSize - 1));
 		
@@ -480,8 +460,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		
 	}
 
-	
-
 	@Override
 	protected MemorySegment nextSegment(MemorySegment current) throws IOException {
 		this.currentBufferNum++;
@@ -500,8 +478,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 	
 	// ============================================================================================
 	
-	protected static final class BuildSideBuffer<BT> extends AbstractPagedOutputView
-	{
+	protected static final class BuildSideBuffer extends AbstractPagedOutputView {
+		
 		private final ArrayList<MemorySegment> targetList;
 		
 		private final MemorySegmentSource memSource;
@@ -513,8 +491,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		private final int sizeBits;
 		
 		
-		private BuildSideBuffer(MemorySegment initialSegment, MemorySegmentSource memSource)
-		{
+		private BuildSideBuffer(MemorySegment initialSegment, MemorySegmentSource memSource) {
 			super(initialSegment, initialSegment.size(), 0);
 			
 			this.targetList = new ArrayList<MemorySegment>();
@@ -524,8 +501,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 		
 
 		@Override
-		protected MemorySegment nextSegment(MemorySegment current, int bytesUsed) throws IOException
-		{
+		protected MemorySegment nextSegment(MemorySegment current, int bytesUsed) throws IOException {
 			finalizeSegment(current, bytesUsed);
 			
 			final MemorySegment next;
@@ -553,8 +529,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			return this.currentBlockNumber + 1;
 		}
 		
-		int spill(BlockChannelWriter<MemorySegment> writer) throws IOException
-		{
+		int spill(BlockChannelWriter<MemorySegment> writer) throws IOException {
 			this.writer = writer;
 			final int numSegments = this.targetList.size();
 			for (int i = 0; i < numSegments; i++) {
@@ -564,8 +539,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			return numSegments;
 		}
 		
-		MemorySegment[] close() throws IOException
-		{
+		MemorySegment[] close() throws IOException {
 			final MemorySegment current = getCurrentSegment();
 			if (current == null) {
 				throw new IllegalStateException("Illegal State in HashPartition: No current buffer when finilizing build side.");
@@ -575,7 +549,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			
 			if (this.writer == null) {
 				this.targetList.add(current);
-				MemorySegment[] buffers = (MemorySegment[]) this.targetList.toArray(new MemorySegment[this.targetList.size()]);
+				MemorySegment[] buffers = this.targetList.toArray(new MemorySegment[this.targetList.size()]);
 				this.targetList.clear();
 				return buffers;
 			} else {
@@ -584,29 +558,26 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			}
 		}
 		
-		private void finalizeSegment(MemorySegment seg, int bytesUsed) {
-		}
+		private void finalizeSegment(MemorySegment seg, int bytesUsed) {}
 	}
 	
 	// ============================================================================================
 	
-	final class PartitionIterator implements MutableObjectIterator<BT>
-	{
+	final class PartitionIterator implements MutableObjectIterator<BT> {
+		
 		private final TypeComparator<BT> comparator;
 		
 		private long currentPointer;
 		
 		private int currentHashCode;
 		
-		private PartitionIterator(final TypeComparator<BT> comparator) throws IOException
-		{
+		private PartitionIterator(final TypeComparator<BT> comparator) throws IOException {
 			this.comparator = comparator;
 			setReadPosition(0);
 		}
 		
 		
-		public final BT next(BT reuse) throws IOException
-		{
+		public final BT next(BT reuse) throws IOException {
 			final int pos = getCurrentPositionInSegment();
 			final int buffer = HashPartition.this.currentBufferNum;
 			
@@ -621,8 +592,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
 			}
 		}
 
-		public final BT next() throws IOException
-		{
+		public final BT next() throws IOException {
 			final int pos = getCurrentPositionInSegment();
 			final int buffer = HashPartition.this.currentBufferNum;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index b0042fc..7661808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.iterative.io.HashPartitionIterator;
 import org.apache.flink.runtime.operators.util.BloomFilter;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
@@ -83,6 +82,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * @param <BT> The type of records from the build side that are stored in the hash table.
  * @param <PT> The type of records from the probe side that are stored in the hash table.
  */
+@SuppressWarnings("ForLoopReplaceableByForEach")
 public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	
 	private static final Logger LOG = LoggerFactory.getLogger(MutableHashTable.class);
@@ -125,23 +125,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	private static final int POINTER_LEN = 8;
 	
 	/**
-	 * The number of bytes for the serialized record length in the partition buffers.
-	 */
-	private static final int SERIALIZED_LENGTH_FIELD_BYTES = 0;
-	
-	/**
 	 * The number of bytes that the entry in the hash structure occupies, in bytes.
 	 * It corresponds to a 4 byte hash value and an 8 byte pointer.
 	 */
 	private static final int RECORD_TABLE_BYTES = HASH_CODE_LEN + POINTER_LEN;
 	
-	/**
-	 * The total storage overhead per record, in bytes. This corresponds to the space in the
-	 * actual hash table buckets, consisting of a 4 byte hash value and an 8 byte
-	 * pointer, plus the overhead for the stored length field.
-	 */
-	private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES + SERIALIZED_LENGTH_FIELD_BYTES;
-	
 	// -------------------------- Bucket Size and Structure -------------------------------------
 	
 	static final int NUM_INTRA_BUCKET_BITS = 7;
@@ -150,7 +138,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	
 	static final int BUCKET_HEADER_LENGTH = 16;
 	
-	private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_OVERHEAD_BYTES;
+	private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_TABLE_BYTES;
 	
 	private static final int BUCKET_POINTER_START_OFFSET = BUCKET_HEADER_LENGTH + (NUM_ENTRIES_PER_BUCKET * HASH_CODE_LEN);
 	
@@ -344,12 +332,13 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	/**
 	 * If true, build side partitions are kept for multiple probe steps.
 	 */
-	protected boolean keepBuildSidePartitions = false;
+	protected boolean keepBuildSidePartitions;
 	
-	protected boolean furtherPartitioning = false;
+	protected boolean furtherPartitioning;
 	
 	private boolean running = true;
 	
+	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
 	// ------------------------------------------------------------------------
@@ -446,8 +435,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 		
 		// grab the write behind buffers first
-		for (int i = this.numWriteBehindBuffers; i > 0; --i)
-		{
+		for (int i = this.numWriteBehindBuffers; i > 0; --i) {
 			this.writeBehindBuffers.add(this.availableMemory.remove(this.availableMemory.size() - 1));
 		}
 		// open builds the initial table by consuming the build-side input
@@ -522,10 +510,9 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 
 		// check if there are pending partitions
-		if (!this.partitionsPending.isEmpty())
-		{
+		if (!this.partitionsPending.isEmpty()) {
 			final HashPartition<BT, PT> p = this.partitionsPending.get(0);
-
+			
 			// build the next table
 			buildTableFromSpilledPartition(p);
 
@@ -534,10 +521,16 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			this.currentSpilledProbeSide = this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), returnQueue);
 
 			List<MemorySegment> memory = new ArrayList<MemorySegment>();
-			memory.add(getNextBuffer());
-			MemorySegment nextBuffer = getNextBuffer();
-			if (nextBuffer != null) {
-				memory.add(nextBuffer);
+			MemorySegment seg1 = getNextBuffer();
+			if (seg1 != null) {
+				memory.add(seg1);
+				MemorySegment seg2 = getNextBuffer();
+				if (seg2 != null) {
+					memory.add(seg2);
+				}
+			}
+			else {
+				throw new IllegalStateException("Attempting to begin probing of partition without any memory available");
 			}
 
 			ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
@@ -595,10 +588,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	public HashBucketIterator<BT, PT> getBuildSideIterator() {
 		return this.bucketIterator;
 	}
-
-	public MutableObjectIterator<BT> getPartitionEntryIterator() {
-		return new HashPartitionIterator<BT, PT>(this.partitionsBeingBuilt.iterator(), this.buildSideSerializer);
-	}
 	
 	/**
 	 * Closes the hash table. This effectively releases all internal structures and closes all
@@ -667,8 +656,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * @param input
-	 * @throws IOException
+	 * Creates the initial hash table. This method sets up partitions, hash index, and inserts
+	 * the data from the given iterator.
+	 * 
+	 * @param input The iterator with the build side data.
+	 * @throws IOException Thrown, if an element could not be fetched and deserialized from
+	 *                     the iterator, or if serialization fails.
 	 */
 	protected void buildInitialTable(final MutableObjectIterator<BT> input) throws IOException {
 		// create the partitions
@@ -718,16 +711,13 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 
 	private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
 		final long totalSize = ((long) bufferSize) * numBuffers;
-		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_TABLE_BYTES);
 		final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
 		final long maxNumRecordsPerBucket = maxNumRecordsStorable / numBuckets;
 		return (int) maxNumRecordsPerBucket;
 	}
 	
-	/**
-	 * @param p
-	 * @throws IOException
-	 */
+
 	protected void buildTableFromSpilledPartition(final HashPartition<BT, PT> p) throws IOException {
 		
 		final int nextRecursionLevel = p.getRecursionLevel() + 1;
@@ -748,11 +738,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			throw new RuntimeException("Hash Join bug in memory management: Memory buffers leaked.");
 		}
 		
-		long numBuckets = (p.getBuildSideRecordCount() * RECORD_TABLE_BYTES) / (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) + 1;
+		long numBuckets = p.getBuildSideRecordCount() / NUM_ENTRIES_PER_BUCKET + 1;
 		
 		// we need to consider the worst case where everything hashes to one bucket which needs to overflow by the same
-		// number of total buckets again.
-		final long totalBuffersNeeded = (numBuckets * 2) / (this.bucketsPerSegmentMask + 1) + p.getBuildSideBlockCount() + 1;
+		// number of total buckets again. Also, one buffer needs to remain for the probing
+		final long totalBuffersNeeded = 2 * (numBuckets / (this.bucketsPerSegmentMask + 1)) + p.getBuildSideBlockCount() + 2;
 		
 		if (totalBuffersNeeded < totalBuffersAvailable) {
 			// we are guaranteed to stay in memory
@@ -791,7 +781,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				final int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
 				final MemorySegment bucket = this.buckets[bucketArrayPos];
 				
-				insertBucketEntry(newPart, bucket, bucketInSegmentPos, hashCode, pointer);
+				insertBucketEntry(newPart, bucket, bucketInSegmentPos, hashCode, pointer, false);
 			}
 		}
 		else {
@@ -824,11 +814,16 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 					this.availableMemory, this.buildSideSerializer);
 			final TypeComparator<BT> btComparator = this.buildSideComparator;
 			BT rec = this.buildSideSerializer.createInstance();
-			while ((rec = inIter.next(rec)) != null)
-			{	
+			while ((rec = inIter.next(rec)) != null) {
 				final int hashCode = hash(btComparator.hash(rec), nextRecursionLevel);
 				insertIntoTable(rec, hashCode);
 			}
+
+			if (keepBuildSidePartitions && p.recursionLevel == 0) {
+				inReader.close(); // keep the partitions
+			} else {
+				inReader.closeAndDelete();
+			}
 			
 			// finalize the partitions
 			for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
@@ -838,11 +833,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 	}
 	
-	/**
-	 * @param record
-	 * @param hashCode
-	 * @throws IOException
-	 */
+
 	protected final void insertIntoTable(final BT record, final int hashCode) throws IOException {
 		final int posHashCode = hashCode % this.numBuckets;
 		
@@ -865,7 +856,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		long pointer = p.insertIntoBuildBuffer(record);
 		if (pointer != -1) {
 			// record was inserted into an in-memory partition. a pointer must be inserted into the buckets
-			insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer);
+			insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer, true);
 		} else {
 			byte status = bucket.get(bucketInSegmentPos + HEADER_STATUS_OFFSET);
 			if (status == BUCKET_STATUS_IN_FILTER) {
@@ -877,24 +868,17 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		}
 	}
 	
-	/**
-	 * @param p
-	 * @param bucket
-	 * @param bucketInSegmentPos
-	 * @param hashCode
-	 * @param pointer
-	 * @throws IOException
-	 */
+
 	final void insertBucketEntry(final HashPartition<BT, PT> p, final MemorySegment bucket, 
-			final int bucketInSegmentPos, final int hashCode, final long pointer)
-	throws IOException
+			final int bucketInSegmentPos, final int hashCode, final long pointer, final boolean spillingAllowed)
+			throws IOException
 	{
 		// find the position to put the hash code and pointer
 		final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
 		if (count < NUM_ENTRIES_PER_BUCKET)
 		{
 			// we are good in our current bucket, put the values
-			bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode);	// hash code
+			bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode);    // hash code
 			bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
 			bucket.putShort(bucketInSegmentPos + HEADER_COUNT_OFFSET, (short) (count + 1)); // update count
 		}
@@ -943,6 +927,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				overflowSeg = getNextBuffer();
 				if (overflowSeg == null) {
 					// no memory available to create overflow bucket. we need to spill a partition
+					if (!spillingAllowed) {
+						throw new IOException("Hashtable memory ran out in a non-spillable situation. " +
+								"This is probably related to wrong size calculations.");
+					}
 					final int spilledPart = spillPartition();
 					if (spilledPart == p.getPartitionNumber()) {
 						// this bucket is no longer in-memory
@@ -1005,13 +993,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				number, recursionLevel, this.availableMemory.remove(this.availableMemory.size() - 1),
 				this, this.segmentSize);
 	}
-	/**
-	 * @param numPartitions
-	 */
+
+	
 	protected void createPartitions(int numPartitions, int recursionLevel) {
 		// sanity check
 		ensureNumBuffersReturned(numPartitions);
-		
+
 		this.currentEnumerator = this.ioManager.createChannelEnumerator();
 		
 		this.partitionsBeingBuilt.clear();
@@ -1122,7 +1109,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 										this.currentEnumerator.next(), this.writeBehindBuffers);
 		this.writeBehindBuffersAvailable += numBuffersFreed;
 		// grab as many buffers as are available directly
-		MemorySegment currBuff = null;
+		MemorySegment currBuff;
 		while (this.writeBehindBuffersAvailable > 0 && (currBuff = this.writeBehindBuffers.poll()) != null) {
 			this.availableMemory.add(currBuff);
 			this.writeBehindBuffersAvailable--;
@@ -1150,10 +1137,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	/**
 	 * Set all the bucket memory except bucket header as the bit set of bloom filter, and use hash code of build records
 	 * to build bloom filter.
-	 *
-	 * @param bucketInSegmentPos
-	 * @param bucket
-	 * @param p
 	 */
 	final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
 		final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
@@ -1256,7 +1239,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			this.writeBehindBuffersAvailable--;
 			
 			// grab as many more buffers as are available directly
-			MemorySegment currBuff = null;
+			MemorySegment currBuff;
 			while (this.writeBehindBuffersAvailable > 0 && (currBuff = this.writeBehindBuffers.poll()) != null) {
 				this.availableMemory.add(currBuff);
 				this.writeBehindBuffersAvailable--;
@@ -1267,12 +1250,20 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			return null;
 		}
 	}
-	
 
+
+	/**
+	 * This is the method called by the partitions to request memory to serialize records.
+	 * It automatically spills partitions, if memory runs out. 
+	 * 
+	 * @return The next available memory segment.
+	 */
 	@Override
 	public MemorySegment nextSegment() {
 		final MemorySegment seg = getNextBuffer();
-		if (seg == null) {
+		if (seg != null) {
+			return seg;
+		} else {
 			try {
 				spillPartition();
 			} catch (IOException ioex) {
@@ -1286,8 +1277,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 			} else {
 				return fromSpill;
 			}
-		} else {
-			return seg;
 		}
 	}
 
@@ -1342,7 +1331,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 		// ----------------------------------------------------------------------------------------
 		
 		final long totalSize = ((long) bufferSize) * numBuffers;
-		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_TABLE_BYTES);
 		final long bucketBytes = numRecordsStorable * RECORD_TABLE_BYTES;
 		final long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
 		
@@ -1369,7 +1358,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 	 * @param code The integer to be hashed.
 	 * @return The hash code for the integer.
 	 */
-	public static final int hash(int code, int level) {
+	public static int hash(int code, int level) {
 		final int rotation = level * 11;
 		
 		code = (code << rotation) | (code >>> -rotation);
@@ -1487,7 +1476,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				
 				final int overflowSegNum = (int) (forwardPointer >>> 32);
 				this.bucket = this.overflowSegments[overflowSegNum];
-				this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+				this.bucketInSegmentOffset = (int) forwardPointer;
 				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 				this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 				this.numInSegment = 0;
@@ -1537,7 +1526,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 
 				final int overflowSegNum = (int) (forwardPointer >>> 32);
 				this.bucket = this.overflowSegments[overflowSegNum];
-				this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+				this.bucketInSegmentOffset = (int) forwardPointer;
 				this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 				this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 				this.numInSegment = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index fd5fcde..b7a7262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -101,7 +101,7 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
 						final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
 						final int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
 						final MemorySegment bucket = this.buckets[bucketArrayPos];
-						insertBucketEntry(part, bucket, bucketInSegmentPos, hashCode, pointer);
+						insertBucketEntry(part, bucket, bucketInSegmentPos, hashCode, pointer, true);
 					}
 				} else {
 					this.writeBehindBuffersAvailable--; // we are not in-memory, thus the probe side buffer will grab one wbb.

http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
new file mode 100644
index 0000000..0d8b81e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class HashTableTest {
+
+	private final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer;
+	private final TypeSerializer<Long> probeSerializer;
+	
+	private final TypeComparator<Tuple2<Long, byte[]>> buildComparator;
+	private final TypeComparator<Long> probeComparator;
+
+	private final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator;
+
+
+	public HashTableTest() {
+		TypeSerializer<?>[] fieldSerializers = { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE };
+		@SuppressWarnings("unchecked")
+		Class<Tuple2<Long, byte[]>> clazz = (Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class;
+		this.buildSerializer = new TupleSerializer<Tuple2<Long, byte[]>>(clazz, fieldSerializers);
+		
+		this.probeSerializer = LongSerializer.INSTANCE;
+
+		TypeComparator<?>[] comparators = { new LongComparator(true) };
+		TypeSerializer<?>[] comparatorSerializers = { LongSerializer.INSTANCE };
+
+		this.buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(new int[] {0}, comparators, comparatorSerializers);
+
+		this.probeComparator = new LongComparator(true);
+
+		this.pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
+
+			private long ref;
+
+			@Override
+			public void setReference(Long reference) {
+				ref = reference;
+			}
+
+			@Override
+			public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
+				//noinspection UnnecessaryUnboxing
+				return candidate.f0.longValue() == ref;
+			}
+
+			@Override
+			public int compareToReference(Tuple2<Long, byte[]> candidate) {
+				long x = ref;
+				long y = candidate.f0;
+				return (x < y) ? -1 : ((x == y) ? 0 : 1);
+			}
+		};
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This tests a combination of values that lead to a corner case situation where memory
+	 * was missing and the computation deadlocked.
+	 */
+	@Test
+	public void testBufferMissingForProbing() {
+
+		final IOManager ioMan = new IOManagerAsync();
+
+		try {
+			final int pageSize = 32*1024;
+			final int numSegments = 34;
+			final int numRecords = 3400;
+			final int recordLen = 270;
+
+			final byte[] payload = new byte[recordLen - 8 - 4];
+			
+			List<MemorySegment> memory = getMemory(numSegments, pageSize);
+			
+			MutableHashTable<Tuple2<Long, byte[]>, Long> table = new MutableHashTable<>(
+					buildSerializer, probeSerializer, buildComparator, probeComparator,
+					pairComparator, memory, ioMan, 16, false);
+			
+			table.open(new TupleBytesIterator(payload, numRecords), new LongIterator(10000));
+			
+			try {
+				while (table.nextRecord()) {
+					MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
+					while (matches.next() != null);
+				}
+			}
+			catch (RuntimeException e) {
+				if (!e.getMessage().contains("exceeded maximum number of recursions")) {
+					e.printStackTrace();
+					fail("Test failed with unexpected exception");
+				} 
+			}
+			finally {
+				table.close();
+			}
+			
+			checkNoTempFilesRemain(ioMan);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioMan.shutdown();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+		ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+		for (int i = 0; i < numSegments; i++) {
+			list.add(new MemorySegment(new byte[segmentSize]));
+		}
+		return list;
+	}
+	
+	private static void checkNoTempFilesRemain(IOManager ioManager) {
+		for (File dir : ioManager.getSpillingDirectories()) {
+			for (String file : dir.list()) {
+				if (file != null && !(file.equals(".") || file.equals(".."))) {
+					fail("hash table did not clean up temp files. remaining file: " + file);
+				}
+			}
+		}
+	}
+
+	private static class TupleBytesIterator implements MutableObjectIterator<Tuple2<Long, byte[]>> {
+
+		private final byte[] payload;
+		private final int numRecords;
+		
+		private int count = 0;
+
+		TupleBytesIterator(byte[] payload, int numRecords) {
+			this.payload = payload;
+			this.numRecords = numRecords;
+		}
+
+		@Override
+		public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
+			return next();
+		}
+
+		@Override
+		public Tuple2<Long, byte[]> next() {
+			if (count++ < numRecords) {
+				return new Tuple2<>(42L, payload);
+			} else {
+				return null;
+			}
+		}
+	}
+
+	private static class LongIterator implements MutableObjectIterator<Long> {
+
+		private final long numRecords;
+		private long value = 0;
+
+		LongIterator(long numRecords) {
+			this.numRecords = numRecords;
+		}
+
+		@Override
+		public Long next(Long aLong) {
+			return next();
+		}
+
+		@Override
+		public Long next() {
+			if (value < numRecords) {
+				return value++;
+			} else {
+				return null;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
new file mode 100644
index 0000000..56ee1da
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+public class RecordsAndWidthsCombinationCheck {
+
+	public static void main(String[] args) throws Exception {
+
+		@SuppressWarnings("unchecked")
+		final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer = 
+				new TupleSerializer<Tuple2<Long, byte[]>>(
+						(Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class,
+						new TypeSerializer<?>[] { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE });
+		
+		final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE;
+
+		final TypeComparator<Tuple2<Long, byte[]>> buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(
+				new int[] {0},
+				new TypeComparator<?>[] { new LongComparator(true) },
+				new TypeSerializer<?>[] { LongSerializer.INSTANCE });
+		
+		final TypeComparator<Long> probeComparator = new LongComparator(true);
+
+		final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
+
+			private long ref;
+
+			@Override
+			public void setReference(Long reference) {
+				ref = reference;
+			}
+
+			@Override
+			public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
+				//noinspection UnnecessaryUnboxing
+				return candidate.f0.longValue() == ref;
+			}
+
+			@Override
+			public int compareToReference(Tuple2<Long, byte[]> candidate) {
+				long x = ref;
+				long y = candidate.f0;
+				return (x < y) ? -1 : ((x == y) ? 0 : 1);
+			}
+		};
+
+		final IOManager ioMan = new IOManagerAsync();
+
+		try {
+			final int pageSize = 32*1024;
+			final int numSegments = 34;
+
+			for (int num = 3400; num < 3550; num++) {
+				final int numRecords = num;
+
+				for (int recordLen = 270; recordLen < 320; recordLen++) {
+
+					final byte[] payload = new byte[recordLen - 8 - 4];
+
+					System.out.println("testing " + numRecords + " / " + recordLen);
+
+					List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+					// we create a hash table that thinks the records are super large. that makes it choose initially
+					// a lot of memory for the partition buffers, and start with a smaller hash table. that way
+					// we trigger a hash table growth early.
+					MutableHashTable<Tuple2<Long, byte[]>, Long> table = new MutableHashTable<>(
+							buildSerializer, probeSerializer, buildComparator, probeComparator,
+							pairComparator, memory, ioMan, 16, false);
+
+					final MutableObjectIterator<Tuple2<Long, byte[]>> buildInput = new MutableObjectIterator<Tuple2<Long, byte[]>>() {
+
+						private int count = 0;
+
+						@Override
+						public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
+							return next();
+						}
+
+						@Override
+						public Tuple2<Long, byte[]> next() {
+							if (count++ < numRecords) {
+								return new Tuple2<>(42L, payload);
+							} else {
+								return null;
+							}
+						}
+					};
+
+					// probe side
+					final MutableObjectIterator<Long> probeInput = new MutableObjectIterator<Long>() {
+
+						private final long numRecords = 10000;
+						private long value = 0;
+
+						@Override
+						public Long next(Long aLong) {
+							return next();
+						}
+
+						@Override
+						public Long next() {
+							if (value < numRecords) {
+								return value++;
+							} else {
+								return null;
+							}
+						}
+					};
+
+					table.open(buildInput, probeInput);
+
+					try {
+						while (table.nextRecord()) {
+							MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
+							while (matches.next() != null);
+						}
+					}
+					catch (RuntimeException e) {
+						if (!e.getMessage().contains("exceeded maximum number of recursions")) {
+							throw e;
+						}
+					}
+					finally {
+						table.close();
+					}
+
+					// make sure no temp files are left
+					checkNoTempFilesRemain(ioMan);
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			ioMan.shutdown();
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
+	private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+		ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+		for (int i = 0; i < numSegments; i++) {
+			list.add(new MemorySegment(new byte[segmentSize]));
+		}
+		return list;
+	}
+	
+	private static void checkNoTempFilesRemain(IOManager ioManager) {
+		for (File dir : ioManager.getSpillingDirectories()) {
+			for (String file : dir.list()) {
+				if (file != null && !(file.equals(".") || file.equals(".."))) {
+					fail("hash table did not clean up temp files. remaining file: " + file);
+				}
+			}
+		}
+	}
+}


[2/2] flink git commit: [FLINK-2575] [runtime] Deactivate join bloom filters by default

Posted by se...@apache.org.
[FLINK-2575] [runtime] Deactivate join bloom filters by default


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ced6a199
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ced6a199
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ced6a199

Branch: refs/heads/master
Commit: ced6a1993815fc61fc0c4b68370d9af76de68c71
Parents: e02c301
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 26 12:29:58 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 26 18:48:17 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/configuration/ConfigConstants.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ced6a199/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d145eb2..2ca7c36 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -550,7 +550,7 @@ public final class ConfigConstants {
 	/**
 	 * Default setting for the switch for hash join bloom filters for spilled partitions.
 	 */
-	public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;
+	public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = false;
 	
 	/**
 	 * The default value for the maximum spilling fan in/out.