You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/05 22:06:16 UTC

[1/2] flink git commit: [hotfix] Remove unused and outdated HashFunctionCollisionBenchmark

Repository: flink
Updated Branches:
  refs/heads/master 3e73496c5 -> 925ac1f76


[hotfix] Remove unused and outdated HashFunctionCollisionBenchmark


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5226f0b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5226f0b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5226f0b4

Branch: refs/heads/master
Commit: 5226f0b4634da95f9cedfb092d6b736f0ad471b9
Parents: 3e73496
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 4 15:35:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 17:40:37 2015 +0200

----------------------------------------------------------------------
 .../hash/HashFunctionCollisionBenchmark.java    | 448 -------------------
 .../operators/hash/util/LastBitsToRange.java    |  54 ---
 .../operators/hash/util/RandomIterator.java     |  65 ---
 .../operators/hash/util/RangeCalculator.java    |  46 --
 .../operators/hash/util/RangeIterator.java      |  61 ---
 .../operators/hash/util/StepRangeIterator.java  |  68 ---
 6 files changed, 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java
deleted file mode 100644
index d96cd46..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.util.ArrayList;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.operators.hash.MutableHashTable;
-import org.apache.flink.runtime.operators.hash.MultiLevelHashTester.BucketBoundaries;
-import org.apache.flink.runtime.operators.hash.util.LastBitsToRange;
-import org.apache.flink.runtime.operators.hash.util.RandomIterator;
-import org.apache.flink.runtime.operators.hash.util.RangeCalculator;
-import org.apache.flink.runtime.operators.hash.util.RangeIterator;
-import org.apache.flink.runtime.operators.hash.util.StepRangeIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test distribution of hash function for multiple levels
- * 
- * 
- */
-public class HashFunctionCollisionBenchmark {
-
-	private static final Logger LOG = LoggerFactory.getLogger(HashFunctionCollisionBenchmark.class);
-
-	private static final long SEED = 561349061987311L;
-
-	@Test
-	public void testStepSeventeen() {
-
-		// Define numbers of buckets on each level
-		RangeCalculator[] rangeCalculators = { 
-				new LastBitsToRange(10), // 2^10=1024 Buckets on level 0
-				new LastBitsToRange(10), // 2^10=1024 Buckets on level 1
-				new LastBitsToRange(10) }; // 2^10=1024 Buckets on level 2
-
-		Iterator<Integer> importIterator = new StepRangeIterator(-30000000,
-				30000000, 17);
-
-		MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
-				rangeCalculators);
-
-		BucketBoundaries[] boundaries = {
-				new BucketBoundaries(3000, 3700, 5, 0.01),
-				new BucketBoundaries(0, 20,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001),
-				new BucketBoundaries(0, 3,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.000001) };
-
-		LOG.debug("Start Step Seventeen hash test");
-		ht.runTest(boundaries);
-		LOG.debug("End Step Seventeen hash test");
-	}
-
-	@Test
-	public void testThreeLevel() {
-
-		// Define numbers of buckets on each level
-		RangeCalculator[] rangeCalculators = { 
-				new LastBitsToRange(10), // 2^10=1024 Buckets on level 0
-				new LastBitsToRange(10), // 2^10=1024 Buckets on level 1
-				new LastBitsToRange(10) }; // 2^10=1024 Buckets on level 2
-
-		Iterator<Integer> importIterator = new RangeIterator(-1000000, 1000000);
-
-		MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
-				rangeCalculators);
-
-		BucketBoundaries[] boundaries = {
-				new BucketBoundaries(1800, 2110, 5, 0.01),
-				new BucketBoundaries(0, 15,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001),
-				new BucketBoundaries(0, 2,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.000001) };
-
-		LOG.debug("Start Three Level hash test");
-		ht.runTest(boundaries);
-		LOG.debug("End Three Level hash test");
-	}
-
-	@Test
-	public void testRandom() {
-
-		// Define numbers of buckets on each level
-		RangeCalculator[] rangeCalculators = { 
-				new LastBitsToRange(10), // 2^10=1024 Buckets on level 0
-				new LastBitsToRange(10), // 2^10=1024 Buckets on level 1
-				new LastBitsToRange(10) }; // 2^10=1024 Buckets on level 2
-
-		Iterator<Integer> importIterator = new RandomIterator(SEED, 2000000);
-
-		MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
-				rangeCalculators);
-
-		BucketBoundaries[] boundaries = {
-				new BucketBoundaries(1800, 2110, 5, 0.01),
-				new BucketBoundaries(0, 15,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001),
-				new BucketBoundaries(0, 2,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.000001) };
-
-		LOG.debug("Start Random hash test");
-		ht.runTest(boundaries);
-		LOG.debug("End Random hash test");
-	}
-
-	@Test
-	public void testTwoLevel() {
-
-		// Define numbers of buckets on each level
-		RangeCalculator[] rangeCalculators = { 
-				new LastBitsToRange(12),	// 2^12=4096 Buckets on level 0
-				new LastBitsToRange(12) };	// 2^12=4096 Buckets on level 1
-
-		Iterator<Integer> importIterator = new RangeIterator(-1000000, 1000000);
-
-		MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
-				rangeCalculators);
-
-		BucketBoundaries[] boundaries = {
-				new BucketBoundaries(400, 600, 5, 0.01),
-				new BucketBoundaries(0, 4,
-						BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001) };
-
-		LOG.debug("Start Two Level hash test");
-		ht.runTest(boundaries);
-		LOG.debug("End Two Level hash test");
-	}
-
-}
-
-class MultiLevelHashTester {
-
-	private static final Logger LOG = LoggerFactory.getLogger(MultiLevelHashTester.class);
-
-	private final int maxLevel;
-	private final Iterator<Integer> importIterator;
-	private final RangeCalculator[] rangeCalculators;
-	private final HashMap<Integer, Object> rootMap = new HashMap<Integer, Object>();
-	private final ArrayList<SortedMap<Integer, Integer>> bucketSizesPerLevel;
-
-	/**
-	 * @param importIterator
-	 *            Iterator over values to be used in test run
-	 * @param rangeCalculators
-	 *            For each level a range calculator which defines how to map
-	 *            from hash to bucket
-	 */
-	public MultiLevelHashTester(Iterator<Integer> importIterator,
-			RangeCalculator[] rangeCalculators) {
-		this.importIterator = importIterator;
-		this.rangeCalculators = rangeCalculators;
-		this.maxLevel = rangeCalculators.length;
-		this.bucketSizesPerLevel = new ArrayList<SortedMap<Integer, Integer>>(
-				maxLevel);
-
-		for (int i = 0; i < maxLevel; i++) {
-			bucketSizesPerLevel.add(i, new TreeMap<Integer, Integer>());
-		}
-	}
-
-	/**
-	 * Run the test by: - Adding values from iterator to map - Creating
-	 * histogram over bucket sizes per level - Printing histogram informations
-	 * 
-	 * @param boundaries
-	 *            Expected results for each level
-	 */
-	public void runTest(BucketBoundaries[] boundaries) {
-		addValues();
-		collectStatistics(rootMap, 0);
-		if (LOG.isDebugEnabled() == true) {
-			printStatistics();
-		}
-		checkBoundaries(boundaries);
-	}
-
-	private void checkBoundaries(BucketBoundaries[] boundaries) {
-		for (int level = 0; level < boundaries.length; level++) {
-			int lowerBound = boundaries[level].getLowerBound();
-			int upperBound = boundaries[level].getUpperBound();
-			int bucketCountInLevel = 0;
-			int bucketCountOutOfRange = 0;
-
-			SortedMap<Integer, Integer> levelMap = bucketSizesPerLevel
-					.get(level);
-			Iterator<Integer> bucketSizeIterator = levelMap.keySet().iterator();
-
-			while (bucketSizeIterator.hasNext()) {
-				int bucketSize = bucketSizeIterator.next();
-				if (bucketSize != 0) {
-					int countForBucketSize = levelMap.get(bucketSize);
-					bucketCountInLevel += countForBucketSize;
-					if (lowerBound > bucketSize || upperBound < bucketSize) {
-						bucketCountOutOfRange += countForBucketSize;
-					}
-
-				}
-			}
-			double bucketsOutOfRange = (double) bucketCountOutOfRange
-					/ (double) bucketCountInLevel;
-			double maxBucketsOutOfRange = boundaries[level]
-					.getPercentOutOfRange();
-			Assert.assertTrue("More than " + (maxBucketsOutOfRange * 100)
-					+ "% of buckets out of range in level " + level,
-					bucketsOutOfRange <= maxBucketsOutOfRange);
-
-			int maxEmpty = boundaries[level].getMaxEmpty();
-			Assert.assertTrue(
-					"More than " + maxEmpty + " empty buckets in level "
-							+ level,
-					(maxEmpty == BucketBoundaries.MAX_EMPTY_UNBOUNDED)
-							|| (levelMap.get(0) <= boundaries[level]
-									.getMaxEmpty()));
-		}
-	}
-
-	/**
-	 * Find for each value the right bucket on the deepest level and increase
-	 * its count
-	 */
-	@SuppressWarnings("unchecked")
-	private void addValues() {
-
-		while (importIterator.hasNext()) {
-			int nextValue = importIterator.next();
-
-			HashMap<Integer, Object> mapForCurrentLevel = rootMap;
-
-			for (int i = 0; i < maxLevel - 1; i++) {
-				int hashValue = MutableHashTable.hash(nextValue, i);
-				int bucket = rangeCalculators[i].getBucket(hashValue);
-				Object nextObject = mapForCurrentLevel.get(bucket);
-				if (nextObject == null) {
-					HashMap<Integer, Object> mapForNextLevel = new HashMap<Integer, Object>();
-					mapForCurrentLevel.put(bucket, mapForNextLevel);
-					mapForCurrentLevel = mapForNextLevel;
-
-				} else {
-					mapForCurrentLevel = (HashMap<Integer, Object>) nextObject;
-				}
-			}
-
-			int lastHashValue = MutableHashTable.hash(nextValue, maxLevel - 1);
-			int deepestBucketNr = rangeCalculators[maxLevel - 1]
-					.getBucket(lastHashValue);
-			Object countOnDeepestLevel = mapForCurrentLevel
-					.get(deepestBucketNr);
-			if (countOnDeepestLevel == null) {
-				mapForCurrentLevel.put(deepestBucketNr, 1);
-			} else {
-				mapForCurrentLevel.put(deepestBucketNr,
-						((Integer) countOnDeepestLevel) + 1);
-			}
-
-		}
-	}
-
-	private void printStatistics() {
-		for (int level = 0; level < maxLevel; level++) {
-			int bucketCountInLevel = 0;
-
-			SortedMap<Integer, Integer> levelMap = bucketSizesPerLevel
-					.get(level);
-			Iterator<Integer> bucketSizeIterator = levelMap.keySet().iterator();
-
-			LOG.debug("Statistics for level: " + level);
-			LOG.debug("----------------------------------------------");
-			LOG.debug("");
-			LOG.debug("Bucket Size |      Count");
-			LOG.debug("------------------------");
-
-			int i = 0;
-			while (bucketSizeIterator.hasNext()) {
-				int bucketSize = bucketSizeIterator.next();
-				if (bucketSize != 0) {
-					int countForBucketSize = levelMap.get(bucketSize);
-					bucketCountInLevel += countForBucketSize;
-					Formatter formatter = new Formatter();
-					formatter.format(" %10d | %10d", bucketSize, countForBucketSize);
-
-					if (levelMap.size() < 20 || i < 3 || i >= (levelMap.size() - 3)) {
-						LOG.debug(formatter.out().toString());
-					} else if (levelMap.size() / 2 == i) {
-						LOG.debug("         .. |         ..");
-						LOG.debug(formatter.out().toString());
-						LOG.debug("         .. |         ..");
-					}
-					i++;
-					formatter.close();
-				}
-			}
-
-			LOG.debug("");
-			LOG.debug("Number of non-empty buckets in level: "
-					+ bucketCountInLevel);
-			LOG.debug("Number of empty buckets in level    : "
-					+ levelMap.get(0));
-			LOG.debug("Number of different bucket sizes    : "
-					+ (levelMap.size() - 1));
-			LOG.debug("");
-			LOG.debug("");
-			LOG.debug("");
-		}
-	}
-
-	/**
-	 * Create histogram over bucket sizes
-	 * 
-	 * @param map
-	 *            Map to be analyzed
-	 * @param level
-	 *            Level on which the map is located in
-	 * @return The total count of hashed values in the map
-	 */
-	private int collectStatistics(HashMap<Integer, Object> map, int level) {
-		SortedMap<Integer, Integer> bucketSizesForLevel = bucketSizesPerLevel
-				.get(level);
-
-		Iterator<Object> bucketIterator = map.values().iterator();
-		int bucketCount = 0;
-		int totalValueCount = 0;
-
-		while (bucketIterator.hasNext()) {
-			bucketCount++;
-
-			Integer hashValuesInBucket;
-			// If we are already on the deepest level, get the count in the
-			// bucket, otherwise
-			// recursively examine the subtree
-			if (level == maxLevel - 1) {
-				hashValuesInBucket = (Integer) bucketIterator.next();
-			} else {
-				@SuppressWarnings("unchecked")
-				HashMap<Integer, Object> nextMap = (HashMap<Integer, Object>) bucketIterator
-						.next();
-				hashValuesInBucket = collectStatistics(nextMap, level + 1);
-			}
-			totalValueCount += hashValuesInBucket;
-			Integer countOfBucketSizes = bucketSizesForLevel
-					.get(hashValuesInBucket);
-			if (countOfBucketSizes == null) {
-				countOfBucketSizes = 1;
-			} else {
-				countOfBucketSizes += 1;
-			}
-			bucketSizesForLevel.put(hashValuesInBucket, countOfBucketSizes);
-		}
-
-		Integer countOfEmptyBuckets = bucketSizesForLevel.get(0);
-		if (countOfEmptyBuckets == null) {
-			countOfEmptyBuckets = rangeCalculators[level].getBucketCount()
-					- bucketCount;
-		} else {
-			countOfEmptyBuckets += rangeCalculators[level].getBucketCount()
-					- bucketCount;
-		}
-		bucketSizesForLevel.put(0, countOfEmptyBuckets);
-
-		return totalValueCount;
-	}
-
-	/**
-	 * Expected results for bucket sizes per level
-	 * 
-	 *
-	 */
-	static class BucketBoundaries {
-
-		public static final int MAX_EMPTY_UNBOUNDED = -1;
-		private int lowerBound;
-		private int upperBound;
-		private int maxEmpty;
-		private double percentOutOfRange;
-
-		/**
-		 * 
-		 * 
-		 * @param lowerBound Lower bound for bucket sizes
-		 * @param upperBound Upper bound for bucket sizes
-		 * @param maxEmpty Maximum number of empty buckets
-		 * @param percentOutOfRange Maximum percentage of buckets out of range
-		 */
-		public BucketBoundaries(int lowerBound, int upperBound, int maxEmpty,
-				double percentOutOfRange) {
-			this.lowerBound = lowerBound;
-			this.upperBound = upperBound;
-			this.maxEmpty = maxEmpty;
-			this.percentOutOfRange = percentOutOfRange;
-		}
-
-		/**
-		 * 
-		 * @return Lower bound for bucket sizes
-		 */
-		public int getLowerBound() {
-			return lowerBound;
-		}
-		
-		/**
-		 * 
-		 * @return Upper bound for bucket sizes
-		 */
-		public int getUpperBound() {
-			return upperBound;
-		}
-
-		/**
-		 * 
-		 * @return Maximum number of empty buckets
-		 */
-		public int getMaxEmpty() {
-			return maxEmpty;
-		}
-
-		/**
-		 * 
-		 * @return Maximum percentage of buckets out of range
-		 */
-		public double getPercentOutOfRange() {
-			return percentOutOfRange;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java
deleted file mode 100644
index 6b7caa1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-/**
- * Takes the last m bits of a hash as bucket number
- * 
- * 
- */
-public class LastBitsToRange implements RangeCalculator {
-
-	private final int mask;
-	private final int bucketCount;
-
-	/**
-	 * Create object which calculates bucket number according to the last m bits
-	 * 
-	 * @param numberOfLastBits
-	 *            Number of bits to be used for bucket calculation
-	 */
-	public LastBitsToRange(int numberOfLastBits) {
-		bucketCount = (int) Math.pow(2, numberOfLastBits);
-		mask = bucketCount - 1;
-	}
-
-
-	@Override
-	public int getBucket(int hash) {
-		return hash & mask;
-	}
-
-
-	public int getBucketCount() {
-		return bucketCount;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java
deleted file mode 100644
index 08e504b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-import java.util.Iterator;
-import java.util.Random;
-
-/**
- * Gives a number of random integer values
- * 
- * 
- */
-public class RandomIterator implements Iterator<Integer> {
-
-	private final Random randomGenerator;
-	private final int count;
-	private int currentCount;
-
-	/**
-	 * Creates an iterator which gives random integer numbers
-	 * 
-	 * @param seed
-	 *            Seed for random generator
-	 * @param count
-	 *            Number of values to be returned
-	 */
-	public RandomIterator(long seed, int count) {
-		this.randomGenerator = new Random(seed);
-		this.count = count;
-		this.currentCount = 0;
-	}
-
-	@Override
-	public boolean hasNext() {
-		return (currentCount < count);
-	}
-
-	@Override
-	public Integer next() {
-		currentCount++;
-		return randomGenerator.nextInt();
-	}
-
-	@Override
-	public void remove() {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java
deleted file mode 100644
index 0f0162d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-/**
- * Interface for defining which hash code will be placed in which bucket
- * 
- * 
- */
-public interface RangeCalculator {
-
-	/**
-	 * Calculates the number of the bucket in which the value should be placed
-	 * in
-	 * 
-	 * @param hash
-	 *            The hash of the value
-	 * @return The number of the bucket
-	 */
-	public int getBucket(int hash);
-
-	/**
-	 * Get the total bucket count
-	 * 
-	 * @return The total bucket count
-	 */
-	public int getBucketCount();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java
deleted file mode 100644
index 82bac8e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-import java.util.Iterator;
-
-/**
- * Iterates over a range of integer values
- * 
- * 
- */
-public class RangeIterator implements Iterator<Integer> {
-
-	private final int maxValue;
-	private int currentValue;
-
-	/**
-	 * Create an iterator over the range from minValue to maxValue
-	 * 
-	 * @param minValue
-	 *            Smallest value returned by the iterator
-	 * @param maxValue
-	 *            Largest value returned by the iterator
-	 */
-	public RangeIterator(int minValue, int maxValue) {
-		this.maxValue = maxValue;
-		currentValue = minValue;
-	}
-
-	@Override
-	public boolean hasNext() {
-		return (currentValue < maxValue);
-	}
-
-	@Override
-	public Integer next() {
-		return currentValue++;
-	}
-
-	@Override
-	public void remove() {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java
deleted file mode 100644
index 7355754..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-import java.util.Iterator;
-
-/**
- * Iterates over a range of integer values with a certain step width
- * 
- * 
- */
-public class StepRangeIterator implements Iterator<Integer> {
-
-	private final int maxValue;
-	private int currentValue;
-	private final int step;
-
-	/**
-	 * Create an iterator over the range from minValue to maxValue with a
-	 * certain step width
-	 * 
-	 * @param minValue
-	 *            Smallest value returned by the iterator
-	 * @param maxValue
-	 *            Largest value returned by the iterator
-	 * @param step
-	 *            Step width of the iterator
-	 */
-	public StepRangeIterator(int minValue, int maxValue, int step) {
-		this.maxValue = maxValue;
-		currentValue = minValue;
-		this.step = step;
-	}
-
-	@Override
-	public boolean hasNext() {
-		return (currentValue < maxValue);
-	}
-
-	@Override
-	public Integer next() {
-		int temp = currentValue;
-		currentValue += step;
-		return temp;
-	}
-
-	@Override
-	public void remove() {
-	}
-
-}


[2/2] flink git commit: [FLINK-1916] [FLINK-2361] [runtime] Fix EOFException and entry loss in CompactingHashTable

Posted by se...@apache.org.
[FLINK-1916] [FLINK-2361] [runtime] Fix EOFException and entry loss in CompactingHashTable

Also a lot of code cleanups in CompactingHashTable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/925ac1f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/925ac1f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/925ac1f7

Branch: refs/heads/master
Commit: 925ac1f76bb84986764495407049a77552169d84
Parents: 5226f0b
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 20:46:17 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 21:37:15 2015 +0200

----------------------------------------------------------------------
 .../common/typeutils/GenericPairComparator.java |   3 -
 .../SolutionSetFastUpdateOutputCollector.java   |  12 +-
 .../io/SolutionSetUpdateOutputCollector.java    |  13 +-
 .../task/AbstractIterativePactTask.java         |   3 +-
 .../hash/AbstractMutableHashTable.java          |   4 +-
 .../operators/hash/CompactingHashTable.java     | 629 ++++++++-----------
 .../operators/hash/InMemoryPartition.java       |   6 +-
 .../operators/hash/MutableHashTable.java        |   2 +-
 .../apache/flink/runtime/util/IntArrayList.java |   4 +-
 .../operators/hash/CompactingHashTableTest.java | 254 ++++++++
 .../hash/HashTablePerformanceComparison.java    |  15 +-
 .../operators/hash/MemoryHashTableTest.java     |  28 +-
 12 files changed, 564 insertions(+), 409 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
index 09d4aee..e3bf4ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
@@ -20,9 +20,6 @@ package org.apache.flink.api.common.typeutils;
 
 import java.io.Serializable;
 
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-
 
 public class GenericPairComparator<T1, T2> extends TypePairComparator<T1, T2>
 		implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index 3b2eda5..f326d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.iterative.io;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.util.Collector;
 
@@ -38,23 +37,20 @@ public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> {
 	private final Collector<T> delegate;
 
 	private final CompactingHashTable<T> solutionSet;
-	
-	private final T tmpHolder;
 
-	public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
-		this(solutionSet, serializer, null);
+	public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
+		this(solutionSet, null);
 	}
 
-	public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
+	public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
 		this.solutionSet = solutionSet;
 		this.delegate = delegate;
-		this.tmpHolder = serializer.createInstance();
 	}
 
 	@Override
 	public void collect(T record) {
 		try {
-			solutionSet.insertOrReplaceRecord(record, tmpHolder);
+			solutionSet.insertOrReplaceRecord(record);
 			if (delegate != null) {
 				delegate.collect(record);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index f400b7e..c39efa5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -16,12 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.iterative.io;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.util.Collector;
 
@@ -40,23 +38,20 @@ public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {
 	private final Collector<T> delegate;
 
 	private final CompactingHashTable<T> solutionSet;
-	
-	private final T tmpHolder;
 
-	public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
-		this(solutionSet, serializer, null);
+	public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
+		this(solutionSet, null);
 	}
 
-	public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
+	public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
 		this.solutionSet = solutionSet;
 		this.delegate = delegate;
-		this.tmpHolder = serializer.createInstance();
 	}
 
 	@Override
 	public void collect(T record) {
 		try {
-			solutionSet.insertOrReplaceRecord(record, tmpHolder);
+			solutionSet.insertOrReplaceRecord(record);
 			if (delegate != null) {
 				delegate.collect(record);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 72434e6..67d8f56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -329,8 +329,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 		if (ss instanceof CompactingHashTable) {
 			@SuppressWarnings("unchecked")
 			CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
-			TypeSerializer<OT> serializer = getOutputSerializer();
-			return new SolutionSetUpdateOutputCollector<OT>(solutionSet, serializer, delegate);
+			return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
 		}
 		else if (ss instanceof JoinHashMap) {
 			@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
index f3e8a22..86ce013 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
@@ -61,15 +61,13 @@ public abstract class AbstractMutableHashTable<T> {
 	
 	public abstract void abort();
 	
-	public abstract void buildTable(final MutableObjectIterator<T> input) throws IOException;
-	
 	public abstract List<MemorySegment> getFreeMemory();
 	
 	// ------------- Modifier -------------
 	
 	public abstract void insert(T record) throws IOException;
 	
-	public abstract void insertOrReplaceRecord(T record, T tempHolder) throws IOException;
+	public abstract void insertOrReplaceRecord(T record) throws IOException;
 	
 	// ------------- Accessors -------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 6533e19..d07c7e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -22,10 +22,10 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,41 +37,20 @@ import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.util.MutableObjectIterator;
 
 /**
- * An implementation of an in-memory Hash Table for variable-length records. 
- * <p>
- * The design of this class follows on many parts the design presented in
- * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al..
- * <p>
- * <hr>
- * The layout of the buckets inside a memory segment is as follows:
+ * A hash table that uses Flink's managed memory and supports replacement of records or
+ * updates to records. For an overview of the general data structure of the hash table, please
+ * refer to the description of the {@link org.apache.flink.runtime.operators.hash.MutableHashTable}. 
+ * 
+ * <p>The hash table is internally divided into two parts: The hash index, and the partition buffers
+ * that store the actual records. When records are inserted or updated, the hash table appends the
+ * records to its corresponding partition, and inserts or updates the entry in the hash index.
+ * In the case that the hash table runs out of memory, it compacts a partition by walking through the
+ * hash index and copying all reachable elements into a fresh partition. After that, it releases the
+ * memory of the partition to compact.</p>
  * 
- * <pre>
- * +----------------------------- Bucket x ----------------------------
- * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) |
- * |
- * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
- * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
- * |
- * |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
- * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
- * |
- * +---------------------------- Bucket x + 1--------------------------
- * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) |
- * |
- * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
- * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
- * |
- * |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
- * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
- * +-------------------------------------------------------------------
- * | ...
- * |
- * </pre>
  * @param <T> Record type stored in hash table
  */
-public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
+public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(CompactingHashTable.class);
 	
@@ -79,11 +58,10 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	//                         Internal Constants
 	// ------------------------------------------------------------------------
 	
+	/** The minimum number of memory segments that the compacting hash table needs to work properly */
 	private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
 	
-	/**
-	 * The maximum number of partitions
-	 */
+	/** The maximum number of partitions */
 	private static final int MAX_NUM_PARTITIONS = 32;
 	
 	/**
@@ -91,7 +69,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * used to determine the ratio of the number of memory segments intended for partition
 	 * buffers and the number of memory segments in the hash-table structure. 
 	 */
-	private static final int DEFAULT_RECORD_LEN = 24; //FIXME maybe find a better default
+	private static final int DEFAULT_RECORD_LEN = 24;
 	
 	/**
 	 * The length of the hash code stored in the bucket.
@@ -155,14 +133,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	//                              Members
 	// ------------------------------------------------------------------------
 
-	/**
-	 * The free memory segments currently available to the hash join.
-	 */
+	/** The lock to synchronize state changes on */
+	private final Object stateLock = new Object();
+	
+	/** The free memory segments currently available to the hash join. */
 	private final ArrayList<MemorySegment> availableMemory;
 	
-	/**
-	 * The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations.
-	 */
+	/** The size of the segments used by the hash join buckets.
+	 * All segments must be of equal size to ease offset computations. */
 	private final int segmentSize;
 	
 	/**
@@ -173,63 +151,59 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 */
 	private final int bucketsPerSegmentMask;
 	
-	/**
-	 * The number of bits that describe the position of a bucket in a memory segment. Computed as log2(bucketsPerSegment).
-	 */
+	/** The number of bits that describe the position of a bucket in a memory segment.
+	 * Computed as log2(bucketsPerSegment). */
 	private final int bucketsPerSegmentBits;
 	
-	/**
-	 * An estimate for the average record length.
-	 */
+	/** An estimate for the average record length. */
 	private final int avgRecordLen;
+
+	private final int pageSizeInBits;
 	
 	// ------------------------------------------------------------------------
 	
-	/**
-	 * The partitions of the hash table.
-	 */
+	/** The partitions of the hash table. */
 	private final ArrayList<InMemoryPartition<T>> partitions;
 	
-	/**
-	 * The array of memory segments that contain the buckets which form the actual hash-table
-	 * of hash-codes and pointers to the elements.
-	 */
+	/** The array of memory segments that contain the buckets which form the actual hash-table
+	 * of hash-codes and pointers to the elements. */
 	private MemorySegment[] buckets;
 	
-	/**
-	 * temporary storage for partition compaction (always attempts to allocate as many segments as the largest partition)
-	 */
+	/** Temporary storage for partition compaction (always attempts to allocate
+	 * as many segments as the largest partition) */
 	private InMemoryPartition<T> compactionMemory;
 	
-	/**
-	 * The number of buckets in the current table. The bucket array is not necessarily fully
-	 * used, when not all buckets that would fit into the last segment are actually used.
-	 */
+	/** The number of buckets in the current table. The bucket array is not necessarily fully
+	 * used, when not all buckets that would fit into the last segment are actually used. */
 	private int numBuckets;
 	
-	/**
-	 * flag necessary so a resize is never triggered during a resize since the code paths are interleaved
-	 */
-	private boolean isResizing = false;
+	/** Flag to interrupt closed loops */
+	private boolean running = true;
+
+	/** Flag to mark the table as open / closed */
+	private boolean closed;
 	
-	private AtomicBoolean closed = new AtomicBoolean();
+	/** Flag necessary so a resize is never triggered during a resize since the code paths are interleaved */
+	private boolean isResizing;
 	
-	private boolean running = true;
-		
-	private int pageSizeInBits;
 
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
 	// ------------------------------------------------------------------------
 	
-	public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments)
-	{
+	public CompactingHashTable(TypeSerializer<T> buildSideSerializer,
+								TypeComparator<T> buildSideComparator,
+								List<MemorySegment> memorySegments) {
 		this(buildSideSerializer, buildSideComparator, memorySegments, DEFAULT_RECORD_LEN);
 	}
 	
-	public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments, int avgRecordLen)
-	{
+	public CompactingHashTable(TypeSerializer<T> buildSideSerializer,
+								TypeComparator<T> buildSideComparator,
+								List<MemorySegment> memorySegments,
+								int avgRecordLen) {
+		
 		super(buildSideSerializer, buildSideComparator);
+		
 		// some sanity checks first
 		if (memorySegments == null) {
 			throw new NullPointerException();
@@ -252,6 +226,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		if ( (this.segmentSize & this.segmentSize - 1) != 0) {
 			throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2.");
 		}
+		
+		this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
+		
 		int bucketsPerSegment = this.segmentSize >> NUM_INTRA_BUCKET_BITS;
 		if (bucketsPerSegment == 0) {
 			throw new IllegalArgumentException("Hash Table requires buffers of at least " + HASH_BUCKET_SIZE + " bytes.");
@@ -262,22 +239,26 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		this.partitions = new ArrayList<InMemoryPartition<T>>();
 		
 		// because we allow to open and close multiple times, the state is initially closed
-		this.closed.set(true);
+		this.closed = true;
+		
 		// so far no partition has any MemorySegments
 	}
 	
 	
 	// ------------------------------------------------------------------------
-	//                              Life-Cycle
+	//  life cycle
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Build the hash table
+	 * Initialize the hash table
 	 */
+	@Override
 	public void open() {
-		// sanity checks
-		if (!this.closed.compareAndSet(true, false)) {
-			throw new IllegalStateException("Hash Table cannot be opened, because it is currently not closed.");
+		synchronized (stateLock) {
+			if (!closed) {
+				throw new IllegalStateException("currently not closed.");
+			}
+			closed = false;
 		}
 		
 		// create the partitions
@@ -290,7 +271,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		
 		initTable(numBuckets, (byte) partitionFanOut);
 	}
-
 	
 	/**
 	 * Closes the hash table. This effectively releases all internal structures and closes all
@@ -299,10 +279,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * all resources that are currently held by the hash join. If another process still access the hash 
 	 * table after close has been called no operations will be performed.
 	 */
+	@Override
 	public void close() {
 		// make sure that we close only once
-		if (!this.closed.compareAndSet(false, true)) {
-			return;
+		synchronized (this.stateLock) {
+			if (this.closed) {
+				return;
+			}
+			this.closed = true;
 		}
 		
 		LOG.debug("Closing hash table and releasing resources.");
@@ -313,45 +297,41 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		// clear the memory in the partitions
 		clearPartitions();
 	}
-	
+
+	@Override
 	public void abort() {
 		this.running = false;
-		
 		LOG.debug("Cancelling hash table operations.");
 	}
-	
+
+	@Override
 	public List<MemorySegment> getFreeMemory() {
-		if (!this.closed.get()) {
+		if (!this.closed) {
 			throw new IllegalStateException("Cannot return memory while join is open.");
 		}
 		
 		return this.availableMemory;
 	}
-	
-	
-	public void buildTable(final MutableObjectIterator<T> input) throws IOException {
-		T record = this.buildSideSerializer.createInstance();
-		
-		// go over the complete input and insert every element into the hash table
-		while (this.running && ((record = input.next(record)) != null)) {
-			insert(record);
-		}
-	}
 
+	// ------------------------------------------------------------------------
+	//  adding data to the hash table
+	// ------------------------------------------------------------------------
+	
 	public void buildTableWithUniqueKey(final MutableObjectIterator<T> input) throws IOException {
-		T record = this.buildSideSerializer.createInstance();
-		T tmp = this.buildSideSerializer.createInstance();
-
 		// go over the complete input and insert every element into the hash table
-		while (this.running && ((record = input.next(record)) != null)) {
-			insertOrReplaceRecord(record, tmp);
+		
+		T value;
+		while (this.running && (value = input.next()) != null) {
+			insertOrReplaceRecord(value);
 		}
 	}
-	
+
+	@Override
 	public final void insert(T record) throws IOException {
-		if(this.closed.get()) {
+		if (this.closed) {
 			return;
 		}
+		
 		final int hashCode = hash(this.buildSideComparator.hash(record));
 		final int posHashCode = hashCode % this.numBuckets;
 		
@@ -364,60 +344,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		final int partitionNumber = bucket.get(bucketInSegmentPos + HEADER_PARTITION_OFFSET);
 		InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
 		
-		
-		long pointer;
-		try {
-			pointer = partition.appendRecord(record);
-			if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
-				this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
-			}
-		} catch (EOFException e) {
-			try {
-				compactPartition(partitionNumber);
-				// retry append
-				partition = this.partitions.get(partitionNumber); // compaction invalidates reference
-				pointer = partition.appendRecord(record);
-			} catch (EOFException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. " + 
-											getMemoryConsumptionString() +
-											" Message: " + ex.getMessage());
-			} catch (IndexOutOfBoundsException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. " + 
-											getMemoryConsumptionString() +
-											" Message: " + ex.getMessage());
-			}
-		} catch (IndexOutOfBoundsException e1) {
-			try {
-				compactPartition(partitionNumber);
-				// retry append
-				partition = this.partitions.get(partitionNumber); // compaction invalidates reference
-				pointer = partition.appendRecord(record);
-			} catch (EOFException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. " + 
-											getMemoryConsumptionString() +
-											" Message: " + ex.getMessage());
-			} catch (IndexOutOfBoundsException ex) {
-				throw new RuntimeException("Memory ran out. Compaction failed. " + 
-											getMemoryConsumptionString() +
-											" Message: " + ex.getMessage());
-			}
-		}
-		insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer);
-	}
-	
-	
-	@Override
-	public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
-		return new HashTableProber<PT>(probeSideComparator, pairComparator);
-	}
-	
-	/**
-	 * 
-	 * @return Iterator over hash table
-	 * @see EntryIterator
-	 */
-	public MutableObjectIterator<T> getEntryIterator() {
-		return new EntryIterator(this);
+		long pointer = insertRecordIntoPartition(record, partition, false);
+		insertBucketEntryFromStart(bucket, bucketInSegmentPos, hashCode, pointer, partitionNumber);
 	}
 	
 	/**
@@ -425,11 +353,10 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * May trigger expensive compaction.
 	 * 
 	 * @param record record to insert or replace
-	 * @param tempHolder instance of T that will be overwritten
 	 * @throws IOException
 	 */
-	public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
-		if(this.closed.get()) {
+	public void insertOrReplaceRecord(T record) throws IOException {
+		if (this.closed) {
 			return;
 		}
 		
@@ -437,14 +364,15 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		final int posHashCode = searchHashCode % this.numBuckets;
 		
 		// get the bucket for the given hash code
-		MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
-		int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+		final MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
+		final int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+		
 		MemorySegment bucket = originalBucket;
 		int bucketInSegmentOffset = originalBucketOffset;
 		
 		// get the basic characteristics of the bucket
 		final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
-		InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
+		final InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
 		final MemorySegment[] overflowSegments = partition.overflowSegments;
 		
 		this.buildSideComparator.setReference(record);
@@ -471,58 +399,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					numInSegment++;
 					
 					// deserialize the key to check whether it is really equal, or whether we had only a hash collision
-					try {
-						tempHolder = partition.readRecordAt(pointer, tempHolder);
-						if (this.buildSideComparator.equalToReference(tempHolder)) {
-							long newPointer = partition.appendRecord(record);
-							bucket.putLong(pointerOffset, newPointer);
-							partition.setCompaction(false);
-							if((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
-								this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
-							}
-							return;
-						}
-					} catch (EOFException e) {
-						// system is out of memory so we attempt to reclaim memory with a copy compact run
-						long newPointer;
-						try {
-							compactPartition(partition.getPartitionNumber());
-							// retry append
-							partition = this.partitions.get(partitionNumber); // compaction invalidates reference
-							newPointer = partition.appendRecord(record);
-						} catch (EOFException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. " + 
-														getMemoryConsumptionString() +
-														" Message: " + ex.getMessage());
-						} catch (IndexOutOfBoundsException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. " + 
-														getMemoryConsumptionString() +
-														" Message: " + ex.getMessage());
-						}
-						bucket.putLong(pointerOffset, newPointer);
-						return;
-					} catch (IndexOutOfBoundsException e) {
-						// system is out of memory so we attempt to reclaim memory with a copy compact run
-						long newPointer;
-						try {
-							compactPartition(partition.getPartitionNumber());
-							// retry append
-							partition = this.partitions.get(partitionNumber); // compaction invalidates reference
-							newPointer = partition.appendRecord(record);
-						} catch (EOFException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. " + 
-														getMemoryConsumptionString() +
-														" Message: " + ex.getMessage());
-						} catch (IndexOutOfBoundsException ex) {
-							throw new RuntimeException("Memory ran out. Compaction failed. " + 
-														getMemoryConsumptionString() +
-														" Message: " + ex.getMessage());
-						}
+					T valueAtPosition = partition.readRecordAt(pointer);
+					if (this.buildSideComparator.equalToReference(valueAtPosition)) {
+						long newPointer = insertRecordIntoPartition(record, partition, true);
 						bucket.putLong(pointerOffset, newPointer);
 						return;
-					} catch (IOException e) {
-						throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
-					} 
+					}
 				}
 				else {
 					numInSegment++;
@@ -532,28 +414,89 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			// this segment is done. check if there is another chained bucket
 			long newForwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
 			if (newForwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+				
 				// nothing found. append and insert
-				long pointer = partition.appendRecord(record);
-				//insertBucketEntryFromStart(partition, originalBucket, originalBucketOffset, searchHashCode, pointer);
-				insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
-				if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
-					this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
+				long pointer = insertRecordIntoPartition(record, partition, false);
+
+				if (countInSegment < NUM_ENTRIES_PER_BUCKET) {
+					// we are good in our current bucket, put the values
+					bucket.putInt(bucketInSegmentOffset + BUCKET_HEADER_LENGTH + (countInSegment * HASH_CODE_LEN), searchHashCode); // hash code
+					bucket.putLong(bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (countInSegment * POINTER_LEN), pointer); // pointer
+					bucket.putInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET, countInSegment + 1); // update count
+				}
+				else {
+					insertBucketEntryFromStart(originalBucket, originalBucketOffset, searchHashCode, pointer, partitionNumber);
 				}
 				return;
 			}
 			
 			final int overflowSegNum = (int) (newForwardPointer >>> 32);
 			bucket = overflowSegments[overflowSegNum];
-			bucketInSegmentOffset = (int) (newForwardPointer & 0xffffffff);
+			bucketInSegmentOffset = (int) newForwardPointer;
 			countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 			posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 			numInSegment = 0;
 			currentForwardPointer = newForwardPointer;
 		}
 	}
+	
+	private long insertRecordIntoPartition(T record, InMemoryPartition<T> partition,
+											boolean fragments) throws IOException {
+		try {
+			long pointer = partition.appendRecord(record);
+			if (fragments) {
+				partition.setIsCompacted(false);
+			}
+			if ((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
+				this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
+			}
+			return pointer;
+		}
+		catch (Exception e) {
+			if (e instanceof EOFException || e instanceof IndexOutOfBoundsException) {
+				// this indicates an out of memory situation
+				try {
+					final int partitionNumber = partition.getPartitionNumber();
+					compactPartition(partitionNumber);
+					
+					// retry append
+					partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+					long newPointer = partition.appendRecord(record);
+					if ((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
+						this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
+					}
+					return newPointer;
+				}
+				catch (EOFException ex) {
+					throw new RuntimeException("Memory ran out. Compaction failed. " +
+							getMemoryConsumptionString() + " Message: " + ex.getMessage());
+				}
+				catch (IndexOutOfBoundsException ex) {
+					throw new RuntimeException("Memory ran out. Compaction failed. " +
+							getMemoryConsumptionString() + " Message: " + ex.getMessage());
+				}
+			}
+			else if (e instanceof IOException) {
+				throw (IOException) e;
+			}
+			else //noinspection ConstantConditions
+				if (e instanceof RuntimeException) {
+					throw (RuntimeException) e;
+			}
+			else {
+				throw new RuntimeException("Writing record to compacting hash table failed", e);
+			}
+		}
+	}
 
-	private void insertBucketEntryFromStart(InMemoryPartition<T> p, MemorySegment bucket,
-			int bucketInSegmentPos, int hashCode, long pointer)
+
+	/**
+	 * IMPORTANT!!! We pass only the partition number, because we must make sure we get a fresh
+	 * partition reference. The partition reference used during search for the key may have become
+	 * invalid during the compaction.
+	 */
+	private void insertBucketEntryFromStart(MemorySegment bucket, int bucketInSegmentPos,
+											int hashCode, long pointer, int partitionNumber)
 	throws IOException
 	{
 		boolean checkForResize = false;
@@ -564,8 +507,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode);	// hash code
 			bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
 			bucket.putInt(bucketInSegmentPos + HEADER_COUNT_OFFSET, count + 1); // update count
-		} else {
+		}
+		else {
 			// we need to go to the overflow buckets
+			final InMemoryPartition<T> p = this.partitions.get(partitionNumber);
+			
 			final long originalForwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
 			final long forwardForNewBucket;
 			
@@ -573,7 +519,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				
 				// forward pointer set
 				final int overflowSegNum = (int) (originalForwardPointer >>> 32);
-				final int segOffset = (int) (originalForwardPointer & 0xffffffff);
+				final int segOffset = (int) originalForwardPointer;
 				final MemorySegment seg = p.overflowSegments[overflowSegNum];
 				
 				final int obCount = seg.getInt(segOffset + HEADER_COUNT_OFFSET);
@@ -635,31 +581,42 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			bucket.putLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET, pointerToNewBucket);
 			
 			// finally, insert the values into the overflow buckets
-			overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode);	// hash code
+			overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode); // hash code
 			overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
 			
 			// set the count to one
-			overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1); 
-			if(checkForResize && !this.isResizing) {
+			overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
+			
+			if (checkForResize && !this.isResizing) {
 				// check if we should resize buckets
-				if(this.buckets.length <= getOverflowSegmentCount()) {
+				if (this.buckets.length <= getOverflowSegmentCount()) {
 					resizeHashTable();
 				}
 			}
 		}
 	}
-	
-	private void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException {
+
+	/**
+	 * IMPORTANT!!! We pass only the partition number, because we must make sure we get a fresh
+	 * partition reference. The partition reference used during search for the key may have become
+	 * invalid during the compaction.
+	 */
+	private void insertBucketEntryFromSearch(MemorySegment originalBucket, MemorySegment currentBucket,
+												int originalBucketOffset, int currentBucketOffset,
+												int countInCurrentBucket, long originalForwardPointer,
+												int hashCode, long pointer, int partitionNumber) throws IOException {
 		boolean checkForResize = false;
 		if (countInCurrentBucket < NUM_ENTRIES_PER_BUCKET) {
 			// we are good in our current bucket, put the values
-			currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode);	// hash code
+			currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode); // hash code
 			currentBucket.putLong(currentBucketOffset + BUCKET_POINTER_START_OFFSET + (countInCurrentBucket * POINTER_LEN), pointer); // pointer
 			currentBucket.putInt(currentBucketOffset + HEADER_COUNT_OFFSET, countInCurrentBucket + 1); // update count
-		} else {
-			// we need a new overflow bucket
+		}
+		else {
+			// we go to a new overflow bucket
+			final InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
 			MemorySegment overflowSeg;
-			final int overflowBucketNum;
+			final int overflowSegmentNum;
 			final int overflowBucketOffset;
 			
 			// first, see if there is space for an overflow bucket remaining in the last overflow segment
@@ -667,7 +624,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				// no space left in last bucket, or no bucket yet, so create an overflow segment
 				overflowSeg = getNextBuffer();
 				overflowBucketOffset = 0;
-				overflowBucketNum = partition.numOverflowSegments;
+				overflowSegmentNum = partition.numOverflowSegments;
 				
 				// add the new overflow segment
 				if (partition.overflowSegments.length <= partition.numOverflowSegments) {
@@ -678,10 +635,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
 				partition.numOverflowSegments++;
 				checkForResize = true;
-			} else {
+			}
+			else {
 				// there is space in the last overflow segment
-				overflowBucketNum = partition.numOverflowSegments - 1;
-				overflowSeg = partition.overflowSegments[overflowBucketNum];
+				overflowSegmentNum = partition.numOverflowSegments - 1;
+				overflowSeg = partition.overflowSegments[overflowSegmentNum];
 				overflowBucketOffset = partition.nextOverflowBucket << NUM_INTRA_BUCKET_BITS;
 			}
 			
@@ -690,10 +648,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			partition.nextOverflowBucket = (partition.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : partition.nextOverflowBucket + 1);
 			
 			// insert the new overflow bucket in the chain of buckets
+			
 			// 1) set the old forward pointer
 			// 2) let the bucket in the main table point to this one
-			overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, currentForwardPointer);
-			final long pointerToNewBucket = (((long) overflowBucketNum) << 32) | ((long) overflowBucketOffset);
+			overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, originalForwardPointer);
+			final long pointerToNewBucket = (((long) overflowSegmentNum) << 32) | ((long) overflowBucketOffset);
 			originalBucket.putLong(originalBucketOffset + HEADER_FORWARD_OFFSET, pointerToNewBucket);
 			
 			// finally, insert the values into the overflow buckets
@@ -710,16 +669,33 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			}
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Access to the entries
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
+		return new HashTableProber<PT>(probeSideComparator, pairComparator);
+	}
+
+	/**
+	 *
+	 * @return Iterator over hash table
+	 * @see EntryIterator
+	 */
+	public MutableObjectIterator<T> getEntryIterator() {
+		return new EntryIterator(this);
+	}
 	
 	// --------------------------------------------------------------------------------------------
-	//                          Setup and Tear Down of Structures
+	//  Setup and Tear Down of Structures
 	// --------------------------------------------------------------------------------------------
 
 	private void createPartitions(int numPartitions) {
 		this.partitions.clear();
 		
 		ListMemorySegmentSource memSource = new ListMemorySegmentSource(this.availableMemory);
-		this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
 		
 		for (int i = 0; i < numPartitions; i++) {
 			this.partitions.add(new InMemoryPartition<T>(this.buildSideSerializer, i, memSource, this.segmentSize, pageSizeInBits));
@@ -728,8 +704,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	}
 	
 	private void clearPartitions() {
-		for (int i = 0; i < this.partitions.size(); i++) {
-			InMemoryPartition<T> p = this.partitions.get(i);
+		for (InMemoryPartition<T> p : this.partitions) {
 			p.clearAllMemory(this.availableMemory);
 		}
 		this.partitions.clear();
@@ -768,14 +743,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		// set the counters back
 		this.numBuckets = 0;
 		if (this.buckets != null) {
-			for (int i = 0; i < this.buckets.length; i++) {
-				this.availableMemory.add(this.buckets[i]);
+			for (MemorySegment bucket : this.buckets) {
+				this.availableMemory.add(bucket);
 			}
 			this.buckets = null;
 		}
 	}
 	
-	private final MemorySegment getNextBuffer() {
+	private MemorySegment getNextBuffer() {
 		// check if the list directly offers memory
 		int s = this.availableMemory.size();
 		if (s > 0) {
@@ -799,7 +774,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * @param numBuffers The number of buffers available.
 	 * @return The number of partitions to use.
 	 */
-	private static final int getPartitioningFanOutNoEstimates(int numBuffers) {
+	private static int getPartitioningFanOutNoEstimates(int numBuffers) {
 		return Math.max(10, Math.min(numBuffers / 10, MAX_NUM_PARTITIONS));
 	}
 	
@@ -807,14 +782,13 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * @return String containing a summary of the memory consumption for error messages
 	 */
 	private String getMemoryConsumptionString() {
-		String result = new String("numPartitions: " + this.partitions.size() + 
+		return "numPartitions: " + this.partitions.size() +
 				" minPartition: " + getMinPartition() +
 				" maxPartition: " + getMaxPartition() +
 				" number of overflow segments: " + getOverflowSegmentCount() +
 				" bucketSize: " + this.buckets.length +
-				" Overall memory: " + getSize() + 
-				" Partition memory: " + getPartitionSize());
-		return result;
+				" Overall memory: " + getSize() +
+				" Partition memory: " + getPartitionSize();
 	}
 	
 	/**
@@ -878,7 +852,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 */
 	private int getOverflowSegmentCount() {
 		int result = 0;
-		for(InMemoryPartition<T> p : this.partitions) {
+		for (InMemoryPartition<T> p : this.partitions) {
 			result += p.numOverflowSegments;
 		}
 		return result;
@@ -890,7 +864,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * 
 	 * @return number of buckets
 	 */
-	private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
+	private static int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
 		final long totalSize = ((long) bufferSize) * numBuffers;
 		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
 		final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
@@ -908,7 +882,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * @param numPartitions number of partitions
 	 * @return The hash code for the integer.
 	 */
-	private static final byte assignPartition(int bucket, byte numPartitions) {
+	private static byte assignPartition(int bucket, byte numPartitions) {
 		return (byte) (bucket % numPartitions);
 	}
 	
@@ -924,26 +898,27 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		final int newNumSegments = (newNumBuckets + (bucketsPerSegment-1)) / bucketsPerSegment;
 		final int additionalSegments = newNumSegments-this.buckets.length;
 		final int numPartitions = this.partitions.size();
-		if(this.availableMemory.size() < additionalSegments) {
-			for(int i = 0; i < numPartitions; i++) {
+		
+		if (this.availableMemory.size() < additionalSegments) {
+			for (int i = 0; i < numPartitions; i++) {
 				compactPartition(i);
 				if(this.availableMemory.size() >= additionalSegments) {
 					break;
 				}
 			}
 		}
-		if(this.availableMemory.size() < additionalSegments || this.closed.get()) {
+		
+		if (this.availableMemory.size() < additionalSegments || this.closed) {
 			return false;
-		} else {
+		}
+		else {
 			this.isResizing = true;
 			// allocate new buckets
 			final int startOffset = (this.numBuckets * HASH_BUCKET_SIZE) % this.segmentSize;
-			MemorySegment[] newBuckets = new MemorySegment[additionalSegments];
 			final int oldNumBuckets = this.numBuckets;
 			final int oldNumSegments = this.buckets.length;
 			MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments];
 			System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length);
-			System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length);
 			this.buckets = mergedBuckets;
 			this.numBuckets = newNumBuckets;
 			// initialize all new buckets
@@ -951,7 +926,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			final int startSegment = oldSegment ? (oldNumSegments-1) : oldNumSegments;
 			for (int i = startSegment, bucket = oldNumBuckets; i < newNumSegments && bucket < this.numBuckets; i++) {
 				MemorySegment seg;
-				int bucketOffset = 0;
+				int bucketOffset;
 				if(oldSegment) { // the first couple of new buckets may be located on an old segment
 					seg = this.buckets[i];
 					for (int k = (oldNumBuckets % bucketsPerSegment) ; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
@@ -965,7 +940,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					seg = getNextBuffer();
 					// go over all buckets in the segment
 					for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
-						bucketOffset = k * HASH_BUCKET_SIZE;	
+						bucketOffset = k * HASH_BUCKET_SIZE;
 						// initialize the header fields
 						seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
 						seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
@@ -975,19 +950,21 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				this.buckets[i] = seg;
 				oldSegment = false; // we write on at most one old segment
 			}
-			int hashOffset = 0;
-			int hash = 0;
-			int pointerOffset = 0;
-			long pointer = 0;
+			int hashOffset;
+			int hash;
+			int pointerOffset;
+			long pointer;
 			IntArrayList hashList = new IntArrayList(NUM_ENTRIES_PER_BUCKET);
 			LongArrayList pointerList = new LongArrayList(NUM_ENTRIES_PER_BUCKET);
 			IntArrayList overflowHashes = new IntArrayList(64);
 			LongArrayList overflowPointers = new LongArrayList(64);
+			
 			// go over all buckets and split them between old and new buckets
-			for(int i = 0; i < numPartitions; i++) {
+			for (int i = 0; i < numPartitions; i++) {
 				InMemoryPartition<T> partition = this.partitions.get(i);
 				final MemorySegment[] overflowSegments = partition.overflowSegments;
-				int posHashCode = 0;
+				
+				int posHashCode;
 				for (int j = 0, bucket = i; j < this.buckets.length && bucket < oldNumBuckets; j++) {
 					MemorySegment segment = this.buckets[j];
 					// go over all buckets in the segment belonging to the partition
@@ -1021,7 +998,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 							}
 							final int overflowSegNum = (int) (forwardPointer >>> 32);
 							segment = overflowSegments[overflowSegNum];
-							bucketOffset = (int)(forwardPointer & 0xffffffff);
+							bucketOffset = (int) forwardPointer;
 							countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
 							pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
 							hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
@@ -1038,25 +1015,29 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						}
 						int newSegmentIndex = (bucket + oldNumBuckets) / bucketsPerSegment;
 						MemorySegment newSegment = this.buckets[newSegmentIndex];
+						
 						// we need to avoid overflows in the first run
 						int oldBucketCount = 0;
 						int newBucketCount = 0;
-						while(!hashList.isEmpty()) {
+						while (!hashList.isEmpty()) {
 							hash = hashList.removeLast();
 							pointer = pointerList.removeLong(pointerList.size()-1);
 							posHashCode = hash % this.numBuckets;
-							if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
+							if (posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
 								bucketOffset = (bucket % bucketsPerSegment) * HASH_BUCKET_SIZE;
-								insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer);
+								insertBucketEntryFromStart(segment, bucketOffset, hash, pointer, partition.getPartitionNumber());
 								oldBucketCount++;
-							} else if(posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
+							}
+							else if (posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
 								bucketOffset = ((bucket + oldNumBuckets) % bucketsPerSegment) * HASH_BUCKET_SIZE;
-								insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer);
+								insertBucketEntryFromStart(newSegment, bucketOffset, hash, pointer, partition.getPartitionNumber());
 								newBucketCount++;
-							} else if(posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
+							}
+							else if (posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
 								overflowHashes.add(hash);
 								overflowPointers.add(pointer);
-							} else {
+							}
+							else {
 								throw new IOException("Accessed wrong bucket. Target: " + bucket + " or " + (bucket + oldNumBuckets) + " Hit: " + posHashCode);
 							}
 						}
@@ -1067,9 +1048,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				// reset partition's overflow buckets and reclaim their memory
 				this.availableMemory.addAll(partition.resetOverflowBuckets());
 				// clear overflow lists
-				int bucketArrayPos = 0;
-				int bucketInSegmentPos = 0;
-				MemorySegment bucket = null;
+				int bucketArrayPos;
+				int bucketInSegmentPos;
+				MemorySegment bucket;
 				while(!overflowHashes.isEmpty()) {
 					hash = overflowHashes.removeLast();
 					pointer = overflowPointers.removeLong(overflowPointers.size()-1);
@@ -1077,7 +1058,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
 					bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
 					bucket = this.buckets[bucketArrayPos];
-					insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hash, pointer);
+					insertBucketEntryFromStart(bucket, bucketInSegmentPos, hash, pointer, partition.getPartitionNumber());
 				}
 				overflowHashes.clear();
 				overflowPointers.clear();
@@ -1095,7 +1076,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 */
 	private void compactPartition(final int partitionNumber) throws IOException {
 		// do nothing if table was closed, parameter is invalid or no garbage exists
-		if(this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
+		if (this.closed || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
 			return;
 		}
 		// release all segments owned by compaction partition
@@ -1106,9 +1087,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		final int numPartitions = this.partitions.size();
 		InMemoryPartition<T> partition = this.partitions.remove(partitionNumber);
 		MemorySegment[] overflowSegments = partition.overflowSegments;
-		long pointer = 0L;
-		int pointerOffset = 0;
-		int bucketOffset = 0;
+		long pointer;
+		int pointerOffset;
+		int bucketOffset;
 		final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
 		for (int i = 0, bucket = partitionNumber; i < this.buckets.length && bucket < this.numBuckets; i++) {
 			MemorySegment segment = this.buckets[i];
@@ -1138,7 +1119,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					}
 					final int overflowSegNum = (int) (forwardPointer >>> 32);
 					segment = overflowSegments[overflowSegNum];
-					bucketOffset = (int)(forwardPointer & 0xffffffff);
+					bucketOffset = (int) forwardPointer;
 					countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
 					pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
 					numInSegment = 0;
@@ -1152,7 +1133,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		this.partitions.get(partitionNumber).overflowSegments = partition.overflowSegments;
 		this.partitions.get(partitionNumber).numOverflowSegments = partition.numOverflowSegments;
 		this.partitions.get(partitionNumber).nextOverflowBucket = partition.nextOverflowBucket;
-		this.partitions.get(partitionNumber).setCompaction(true);
+		this.partitions.get(partitionNumber).setIsCompacted(true);
 		//this.partitions.get(partitionNumber).pushDownPages();
 		this.compactionMemory = partition;
 		this.compactionMemory.resetRecordCounter();
@@ -1169,22 +1150,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	}
 	
 	/**
-	 * Compacts partition but may not reclaim all garbage
-	 * 
-	 * @param partitionNumber partition number
-	 * @throws IOException 
-	 */
-	@SuppressWarnings("unused")
-	private void fastCompactPartition(int partitionNumber) throws IOException {
-		// stop if no garbage exists
-		if(this.partitions.get(partitionNumber).isCompacted()) {
-			return;
-		}
-		//TODO IMPLEMENT ME
-		return;
-	}
-	
-	/**
 	 * This function hashes an integer value. It is adapted from Bob Jenkins' website
 	 * <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
 	 * The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
@@ -1193,7 +1158,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 	 * @param code The integer to be hashed.
 	 * @return The hash code for the integer.
 	 */
-	private static final int hash(int code) {
+	private static int hash(int code) {
 		code = (code + 0x7ed55d16) + (code << 12);
 		code = (code ^ 0xc761c23c) ^ (code >>> 19);
 		code = (code + 0x165667b1) + (code << 5);
@@ -1235,7 +1200,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 
 		@Override
 		public T next() throws IOException {
-			if(done || this.table.closed.get()) {
+			if (done || this.table.closed) {
 				return null;
 			} else if(!cache.isEmpty()) {
 				return cache.remove(cache.size()-1);
@@ -1294,7 +1259,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				}
 				final int overflowSegNum = (int) (forwardPointer >>> 32);
 				bucket = overflowSegments[overflowSegNum];
-				bucketOffset = (int)(forwardPointer & 0xffffffff);
+				bucketOffset = (int) forwardPointer;
 				countInSegment = bucket.getInt(bucketOffset + HEADER_COUNT_OFFSET);
 				posInSegment = bucketOffset + BUCKET_POINTER_START_OFFSET;
 				numInSegment = 0;
@@ -1326,7 +1291,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		}
 		
 		public T getMatchFor(PT probeSideRecord, T reuse) {
-			if(closed.get()) {
+			if (closed) {
 				return null;
 			}
 			final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
@@ -1391,7 +1356,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				
 				final int overflowSegNum = (int) (forwardPointer >>> 32);
 				bucket = overflowSegments[overflowSegNum];
-				bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+				bucketInSegmentOffset = (int) forwardPointer;
 				countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 				posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 				numInSegment = 0;
@@ -1399,7 +1364,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		}
 
 		public T getMatchFor(PT probeSideRecord) {
-			if(closed.get()) {
+			if (closed) {
 				return null;
 			}
 			final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
@@ -1464,7 +1429,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 
 				final int overflowSegNum = (int) (forwardPointer >>> 32);
 				bucket = overflowSegments[overflowSegNum];
-				bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+				bucketInSegmentOffset = (int) forwardPointer;
 				countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 				posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 				numInSegment = 0;
@@ -1472,49 +1437,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 		}
 
 		public void updateMatch(T record) throws IOException {
-			if(closed.get()) {
+			if (closed) {
 				return;
 			}
-			long newPointer;
-			try {
-				newPointer = this.partition.appendRecord(record);
-			} catch (EOFException e) {
-				// system is out of memory so we attempt to reclaim memory with a copy compact run
-				try {
-					int partitionNumber = this.partition.getPartitionNumber();
-					compactPartition(partitionNumber);
-					// retry append
-					this.partition = partitions.get(partitionNumber);
-					newPointer = this.partition.appendRecord(record);
-				} catch (EOFException ex) {
-					throw new RuntimeException("Memory ran out. Compaction failed. " + 
-												getMemoryConsumptionString() +
-												" Message: " + ex.getMessage());
-				} catch (IndexOutOfBoundsException ex) {
-					throw new RuntimeException("Memory ran out. Compaction failed. " + 
-												getMemoryConsumptionString() +
-												" Message: " + ex.getMessage());
-				}
-			} catch (IndexOutOfBoundsException e) {
-				// system is out of memory so we attempt to reclaim memory with a copy compact run
-				try {
-					int partitionNumber = this.partition.getPartitionNumber();
-					compactPartition(partitionNumber);
-					// retry append
-					this.partition = partitions.get(partitionNumber);
-					newPointer = this.partition.appendRecord(record);
-				} catch (EOFException ex) {
-					throw new RuntimeException("Memory ran out. Compaction failed. " + 
-												getMemoryConsumptionString() +
-												" Message: " + ex.getMessage());
-				} catch (IndexOutOfBoundsException ex) {
-					throw new RuntimeException("Memory ran out. Compaction failed. " + 
-												getMemoryConsumptionString() +
-												" Message: " + ex.getMessage());
-				}
-			}
+			long newPointer = insertRecordIntoPartition(record, this.partition, true);
 			this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
-			this.partition.setCompaction(false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index 7fb997e..ffb66fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -199,7 +199,7 @@ public class InMemoryPartition<T> {
 	 * 
 	 * @param compacted compaction status
 	 */
-	public void setCompaction(boolean compacted) {
+	public void setIsCompacted(boolean compacted) {
 		this.compacted = compacted;
 	}
 	
@@ -281,9 +281,9 @@ public class InMemoryPartition<T> {
 	 * @param numberOfSegments allocation count
 	 */
 	public void allocateSegments(int numberOfSegments) {
-		while(getBlockCount() < numberOfSegments) {
+		while (getBlockCount() < numberOfSegments) {
 			MemorySegment next = this.availableMemory.nextSegment();
-			if(next != null) {
+			if (next != null) {
 				this.partitionPages.add(next);
 			} else {
 				return;

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 9416796..7f07cfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -886,7 +886,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
 				
 				// forward pointer set
 				final int overflowSegNum = (int) (originalForwardPointer >>> 32);
-				final int segOffset = (int) (originalForwardPointer & 0xffffffff);
+				final int segOffset = (int) originalForwardPointer;
 				final MemorySegment seg = p.overflowSegments[overflowSegNum];
 				
 				final short obCount = seg.getShort(segOffset + HEADER_COUNT_OFFSET);

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
index 999c4b0..27d958a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -71,10 +71,12 @@ public class IntArrayList {
 
 	public static final IntArrayList EMPTY = new IntArrayList(0) {
 		
+		@Override
 		public boolean add(int number) {
 			throw new UnsupportedOperationException();
 		}
-		
+
+		@Override
 		public int removeLast() {
 			throw new UnsupportedOperationException();
 		};

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
new file mode 100644
index 0000000..e3b697e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+public class CompactingHashTableTest {
+	
+	private final TypeSerializer<Tuple2<Long, String>> serializer;
+	private final TypeComparator<Tuple2<Long, String>> comparator;
+	
+	private final TypeComparator<Long> probeComparator;
+
+	private final TypePairComparator<Long, Tuple2<Long, String>> pairComparator;
+	
+	
+	public CompactingHashTableTest() {
+		TypeSerializer<?>[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE };
+		@SuppressWarnings("unchecked")
+		Class<Tuple2<Long, String>> clazz = (Class<Tuple2<Long, String>>) (Class<?>) Tuple2.class;
+		this.serializer = new TupleSerializer<Tuple2<Long, String>>(clazz, fieldSerializers);
+		
+		TypeComparator<?>[] comparators = { new LongComparator(true) };
+		TypeSerializer<?>[] comparatorSerializers = { LongSerializer.INSTANCE };
+		
+		this.comparator = new TupleComparator<Tuple2<Long, String>>(new int[] {0}, comparators, comparatorSerializers);
+		
+		this.probeComparator = new LongComparator(true);
+		
+		this.pairComparator = new TypePairComparator<Long, Tuple2<Long, String>>() {
+			
+			private long ref;
+			
+			@Override
+			public void setReference(Long reference) {
+				ref = reference;
+			}
+
+			@Override
+			public boolean equalToReference(Tuple2<Long, String> candidate) {
+				//noinspection UnnecessaryUnboxing
+				return candidate.f0.longValue() == ref;
+			}
+
+			@Override
+			public int compareToReference(Tuple2<Long, String> candidate) {
+				long x = ref;
+				long y = candidate.f0;
+				return (x < y) ? -1 : ((x == y) ? 0 : 1);
+			}
+		};
+	}
+	
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+	
+	@Test
+	public void testHashTableGrowthWithInsert() {
+		try {
+			final int numElements = 1000000;
+			
+			List<MemorySegment> memory = getMemory(10000, 32 * 1024);
+			
+			// we create a hash table that thinks the records are super large. that makes it choose initially
+			// a lot of memory for the partition buffers, and start with a smaller hash table. that way
+			// we trigger a hash table growth early.
+			CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
+					serializer, comparator, memory, 10000);
+			table.open();
+			
+			for (long i = 0; i < numElements; i++) {
+				table.insert(new Tuple2<Long, String>(i, String.valueOf(i)));
+			}
+			
+			// make sure that all elements are contained via the entry iterator
+			{
+				BitSet bitSet = new BitSet(numElements);
+				MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator();
+				Tuple2<Long, String> next;
+				while ((next = iter.next()) != null) {
+					assertNotNull(next.f0);
+					assertNotNull(next.f1);
+					assertEquals(next.f0.longValue(), Long.parseLong(next.f1));
+					
+					bitSet.set(next.f0.intValue());
+				}
+				
+				assertEquals(numElements, bitSet.cardinality());
+			}
+			
+			// make sure all entries are contained via the prober
+			{
+				CompactingHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper = 
+						table.getProber(probeComparator, pairComparator);
+				
+				for (long i = 0; i < numElements; i++) {
+					assertNotNull(proper.getMatchFor(i));
+					assertNull(proper.getMatchFor(i + numElements));
+				}
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * This test validates that records are not lost via "insertOrReplace()" as in bug [FLINK-2361]
+	 */
+	@Test
+	public void testHashTableGrowthWithInsertOrReplace() {
+		try {
+			final int numElements = 1000000;
+
+			List<MemorySegment> memory = getMemory(10000, 32 * 1024);
+
+			// we create a hash table that thinks the records are super large. that makes it choose initially
+			// a lot of memory for the partition buffers, and start with a smaller hash table. that way
+			// we trigger a hash table growth early.
+			CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
+					serializer, comparator, memory, 10000);
+			table.open();
+			
+			for (long i = 0; i < numElements; i++) {
+				table.insertOrReplaceRecord(new Tuple2<Long, String>(i, String.valueOf(i)));
+			}
+
+			// make sure that all elements are contained via the entry iterator
+			{
+				BitSet bitSet = new BitSet(numElements);
+				MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator();
+				Tuple2<Long, String> next;
+				while ((next = iter.next()) != null) {
+					assertNotNull(next.f0);
+					assertNotNull(next.f1);
+					assertEquals(next.f0.longValue(), Long.parseLong(next.f1));
+
+					bitSet.set(next.f0.intValue());
+				}
+
+				assertEquals(numElements, bitSet.cardinality());
+			}
+
+			// make sure all entries are contained via the prober
+			{
+				CompactingHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper =
+						table.getProber(probeComparator, pairComparator);
+
+				for (long i = 0; i < numElements; i++) {
+					assertNotNull(proper.getMatchFor(i));
+					assertNull(proper.getMatchFor(i + numElements));
+				}
+			}
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	/**
+	 * This test validates that new inserts (rather than updates) in "insertOrReplace()" properly
+	 * react to out of memory conditions.
+	 */
+	@Test
+	public void testInsertsWithInsertOrReplace() {
+		try {
+			final int numElements = 1000;
+
+			final String longString = getLongString(10000);
+			List<MemorySegment> memory = getMemory(1000, 32 * 1024);
+
+			// we create a hash table that thinks the records are super large. that makes it choose initially
+			// a lot of memory for the partition buffers, and start with a smaller hash table. that way
+			// we trigger a hash table growth early.
+			CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
+					serializer, comparator, memory, 100);
+			table.open();
+
+			// first, we insert some elements
+			for (long i = 0; i < numElements; i++) {
+				table.insertOrReplaceRecord(new Tuple2<Long, String>(i, longString));
+			}
+
+			// now, we replace the same elements, causing fragmentation
+			for (long i = 0; i < numElements; i++) {
+				table.insertOrReplaceRecord(new Tuple2<Long, String>(i, longString));
+			}
+			
+			// now we insert an additional set of elements. without compaction during this insertion,
+			// the memory will run out
+			for (long i = 0; i < numElements; i++) {
+				table.insertOrReplaceRecord(new Tuple2<Long, String>(i + numElements, longString));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+		ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+		for (int i = 0; i < numSegments; i++) {
+			list.add(new MemorySegment(new byte[segmentSize]));
+		}
+		return list;
+	}
+
+	private static String getLongString(int length) {
+		StringBuilder bld = new StringBuilder(length);
+		for (int i = 0; i < length; i++) {
+			bld.append('a');
+		}
+		return bld.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index a8941a4..0c656d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -27,10 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
-import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.hash.MutableHashTable;
 import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
 import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
 import org.apache.flink.runtime.operators.testutils.types.IntPair;
@@ -38,6 +34,7 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -72,8 +69,8 @@ public class HashTablePerformanceComparison {
 
 			MutableObjectIterator<IntPair> updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
 			
-			long start = 0L;
-			long end = 0L;
+			long start;
+			long end;
 			
 			long first = System.currentTimeMillis();
 			
@@ -105,7 +102,7 @@ public class HashTablePerformanceComparison {
 			start = System.currentTimeMillis();
 			while(updater.next(target) != null) {
 				target.setValue(target.getValue()*-1);
-				table.insertOrReplaceRecord(target, temp);
+				table.insertOrReplaceRecord(target);
 			}
 			end = System.currentTimeMillis();
 			System.out.println("Update done. Time: " + (end-start) + " ms");
@@ -147,8 +144,8 @@ public class HashTablePerformanceComparison {
 
 			MutableObjectIterator<IntPair> updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
 			
-			long start = 0L;
-			long end = 0L;
+			long start;
+			long end;
 			
 			long first = System.currentTimeMillis();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
index 2ebcd43..3dcf688 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
@@ -27,9 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
-import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
 import org.apache.flink.runtime.operators.testutils.types.IntList;
 import org.apache.flink.runtime.operators.testutils.types.IntListComparator;
@@ -45,7 +42,9 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.junit.Test;
+
 import org.powermock.reflect.Whitebox;
 
 import static org.junit.Assert.*;
@@ -235,9 +234,8 @@ public class MemoryHashTableTest {
 			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
 			
 			// test replacing
-			IntList tempHolder = new IntList();
 			for (int i = 0; i < NUM_LISTS; i++) {
-				table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+				table.insertOrReplaceRecord(overwriteLists[i]);
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
@@ -278,10 +276,9 @@ public class MemoryHashTableTest {
 			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
 			
 			// test replacing
-			IntList tempHolder = new IntList();
 			for (int i = 0; i < NUM_LISTS; i++) {
 				if( i % 100 != 0) {
-					table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+					table.insertOrReplaceRecord(overwriteLists[i]);
 					lists[i] = overwriteLists[i];
 				}
 			}
@@ -327,10 +324,9 @@ public class MemoryHashTableTest {
 			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS/STEP_SIZE, rnd);
 			
 			// test replacing
-			IntList tempHolder = new IntList();
 			for (int i = 0; i < NUM_LISTS; i += STEP_SIZE) {
 				overwriteLists[i/STEP_SIZE].setKey(overwriteLists[i/STEP_SIZE].getKey()*STEP_SIZE);
-				table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE], tempHolder);
+				table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE]);
 				lists[i] = overwriteLists[i/STEP_SIZE];
 			}
 			
@@ -379,9 +375,8 @@ public class MemoryHashTableTest {
 			for(int k = 0; k < NUM_REWRITES; k++) {
 				overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
 				// test replacing
-				IntList tempHolder = new IntList();
 				for (int i = 0; i < NUM_LISTS; i++) {
-					table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+					table.insertOrReplaceRecord(overwriteLists[i]);
 				}
 			
 				for (int i = 0; i < NUM_LISTS; i++) {
@@ -409,11 +404,7 @@ public class MemoryHashTableTest {
 			table.open();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				try {
-					table.insert(lists[i]);
-				} catch (Exception e) {
-					throw e;
-				}
+				table.insert(lists[i]);
 			}
 
 			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
@@ -630,9 +621,8 @@ public class MemoryHashTableTest {
 			final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
 			
 			// test replacing
-			IntList tempHolder = new IntList();
 			for (int i = 0; i < NUM_LISTS; i++) {
-				table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+				table.insertOrReplaceRecord(overwriteLists[i]);
 			}
 			
 			Field list = Whitebox.getField(CompactingHashTable.class, "partitions");
@@ -691,7 +681,7 @@ public class MemoryHashTableTest {
 			
 			while(updater.next(target) != null) {
 				target.setValue(target.getValue());
-				table.insertOrReplaceRecord(target, temp);
+				table.insertOrReplaceRecord(target);
 			}
 			
 			while (updateTester.next(target) != null) {