You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/05 22:06:16 UTC
[1/2] flink git commit: [hotfix] Remove unused and outdated
HashFunctionCollisionBenchmark
Repository: flink
Updated Branches:
refs/heads/master 3e73496c5 -> 925ac1f76
[hotfix] Remove unused and outdated HashFunctionCollisionBenchmark
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5226f0b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5226f0b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5226f0b4
Branch: refs/heads/master
Commit: 5226f0b4634da95f9cedfb092d6b736f0ad471b9
Parents: 3e73496
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 4 15:35:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 17:40:37 2015 +0200
----------------------------------------------------------------------
.../hash/HashFunctionCollisionBenchmark.java | 448 -------------------
.../operators/hash/util/LastBitsToRange.java | 54 ---
.../operators/hash/util/RandomIterator.java | 65 ---
.../operators/hash/util/RangeCalculator.java | 46 --
.../operators/hash/util/RangeIterator.java | 61 ---
.../operators/hash/util/StepRangeIterator.java | 68 ---
6 files changed, 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java
deleted file mode 100644
index d96cd46..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashFunctionCollisionBenchmark.java
+++ /dev/null
@@ -1,448 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.util.ArrayList;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.runtime.operators.hash.MutableHashTable;
-import org.apache.flink.runtime.operators.hash.MultiLevelHashTester.BucketBoundaries;
-import org.apache.flink.runtime.operators.hash.util.LastBitsToRange;
-import org.apache.flink.runtime.operators.hash.util.RandomIterator;
-import org.apache.flink.runtime.operators.hash.util.RangeCalculator;
-import org.apache.flink.runtime.operators.hash.util.RangeIterator;
-import org.apache.flink.runtime.operators.hash.util.StepRangeIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test distribution of hash function for multiple levels
- *
- *
- */
-public class HashFunctionCollisionBenchmark {
-
- private static final Logger LOG = LoggerFactory.getLogger(HashFunctionCollisionBenchmark.class);
-
- private static final long SEED = 561349061987311L;
-
- @Test
- public void testStepSeventeen() {
-
- // Define numbers of buckets on each level
- RangeCalculator[] rangeCalculators = {
- new LastBitsToRange(10), // 2^10=1024 Buckets on level 0
- new LastBitsToRange(10), // 2^10=1024 Buckets on level 1
- new LastBitsToRange(10) }; // 2^10=1024 Buckets on level 2
-
- Iterator<Integer> importIterator = new StepRangeIterator(-30000000,
- 30000000, 17);
-
- MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
- rangeCalculators);
-
- BucketBoundaries[] boundaries = {
- new BucketBoundaries(3000, 3700, 5, 0.01),
- new BucketBoundaries(0, 20,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001),
- new BucketBoundaries(0, 3,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.000001) };
-
- LOG.debug("Start Step Seventeen hash test");
- ht.runTest(boundaries);
- LOG.debug("End Step Seventeen hash test");
- }
-
- @Test
- public void testThreeLevel() {
-
- // Define numbers of buckets on each level
- RangeCalculator[] rangeCalculators = {
- new LastBitsToRange(10), // 2^10=1024 Buckets on level 0
- new LastBitsToRange(10), // 2^10=1024 Buckets on level 1
- new LastBitsToRange(10) }; // 2^10=1024 Buckets on level 2
-
- Iterator<Integer> importIterator = new RangeIterator(-1000000, 1000000);
-
- MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
- rangeCalculators);
-
- BucketBoundaries[] boundaries = {
- new BucketBoundaries(1800, 2110, 5, 0.01),
- new BucketBoundaries(0, 15,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001),
- new BucketBoundaries(0, 2,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.000001) };
-
- LOG.debug("Start Three Level hash test");
- ht.runTest(boundaries);
- LOG.debug("End Three Level hash test");
- }
-
- @Test
- public void testRandom() {
-
- // Define numbers of buckets on each level
- RangeCalculator[] rangeCalculators = {
- new LastBitsToRange(10), // 2^10=1024 Buckets on level 0
- new LastBitsToRange(10), // 2^10=1024 Buckets on level 1
- new LastBitsToRange(10) }; // 2^10=1024 Buckets on level 2
-
- Iterator<Integer> importIterator = new RandomIterator(SEED, 2000000);
-
- MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
- rangeCalculators);
-
- BucketBoundaries[] boundaries = {
- new BucketBoundaries(1800, 2110, 5, 0.01),
- new BucketBoundaries(0, 15,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001),
- new BucketBoundaries(0, 2,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.000001) };
-
- LOG.debug("Start Random hash test");
- ht.runTest(boundaries);
- LOG.debug("End Random hash test");
- }
-
- @Test
- public void testTwoLevel() {
-
- // Define numbers of buckets on each level
- RangeCalculator[] rangeCalculators = {
- new LastBitsToRange(12), // 2^12=4096 Buckets on level 0
- new LastBitsToRange(12) }; // 2^12=4096 Buckets on level 1
-
- Iterator<Integer> importIterator = new RangeIterator(-1000000, 1000000);
-
- MultiLevelHashTester ht = new MultiLevelHashTester(importIterator,
- rangeCalculators);
-
- BucketBoundaries[] boundaries = {
- new BucketBoundaries(400, 600, 5, 0.01),
- new BucketBoundaries(0, 4,
- BucketBoundaries.MAX_EMPTY_UNBOUNDED, 0.0001) };
-
- LOG.debug("Start Two Level hash test");
- ht.runTest(boundaries);
- LOG.debug("End Two Level hash test");
- }
-
-}
-
-class MultiLevelHashTester {
-
- private static final Logger LOG = LoggerFactory.getLogger(MultiLevelHashTester.class);
-
- private final int maxLevel;
- private final Iterator<Integer> importIterator;
- private final RangeCalculator[] rangeCalculators;
- private final HashMap<Integer, Object> rootMap = new HashMap<Integer, Object>();
- private final ArrayList<SortedMap<Integer, Integer>> bucketSizesPerLevel;
-
- /**
- * @param importIterator
- * Iterator over values to be used in test run
- * @param rangeCalculators
- * For each level a range calculator which defines how to map
- * from hash to bucket
- */
- public MultiLevelHashTester(Iterator<Integer> importIterator,
- RangeCalculator[] rangeCalculators) {
- this.importIterator = importIterator;
- this.rangeCalculators = rangeCalculators;
- this.maxLevel = rangeCalculators.length;
- this.bucketSizesPerLevel = new ArrayList<SortedMap<Integer, Integer>>(
- maxLevel);
-
- for (int i = 0; i < maxLevel; i++) {
- bucketSizesPerLevel.add(i, new TreeMap<Integer, Integer>());
- }
- }
-
- /**
- * Run the test by: - Adding values from iterator to map - Creating
- * histogram over bucket sizes per level - Printing histogram informations
- *
- * @param boundaries
- * Expected results for each level
- */
- public void runTest(BucketBoundaries[] boundaries) {
- addValues();
- collectStatistics(rootMap, 0);
- if (LOG.isDebugEnabled() == true) {
- printStatistics();
- }
- checkBoundaries(boundaries);
- }
-
- private void checkBoundaries(BucketBoundaries[] boundaries) {
- for (int level = 0; level < boundaries.length; level++) {
- int lowerBound = boundaries[level].getLowerBound();
- int upperBound = boundaries[level].getUpperBound();
- int bucketCountInLevel = 0;
- int bucketCountOutOfRange = 0;
-
- SortedMap<Integer, Integer> levelMap = bucketSizesPerLevel
- .get(level);
- Iterator<Integer> bucketSizeIterator = levelMap.keySet().iterator();
-
- while (bucketSizeIterator.hasNext()) {
- int bucketSize = bucketSizeIterator.next();
- if (bucketSize != 0) {
- int countForBucketSize = levelMap.get(bucketSize);
- bucketCountInLevel += countForBucketSize;
- if (lowerBound > bucketSize || upperBound < bucketSize) {
- bucketCountOutOfRange += countForBucketSize;
- }
-
- }
- }
- double bucketsOutOfRange = (double) bucketCountOutOfRange
- / (double) bucketCountInLevel;
- double maxBucketsOutOfRange = boundaries[level]
- .getPercentOutOfRange();
- Assert.assertTrue("More than " + (maxBucketsOutOfRange * 100)
- + "% of buckets out of range in level " + level,
- bucketsOutOfRange <= maxBucketsOutOfRange);
-
- int maxEmpty = boundaries[level].getMaxEmpty();
- Assert.assertTrue(
- "More than " + maxEmpty + " empty buckets in level "
- + level,
- (maxEmpty == BucketBoundaries.MAX_EMPTY_UNBOUNDED)
- || (levelMap.get(0) <= boundaries[level]
- .getMaxEmpty()));
- }
- }
-
- /**
- * Find for each value the right bucket on the deepest level and increase
- * its count
- */
- @SuppressWarnings("unchecked")
- private void addValues() {
-
- while (importIterator.hasNext()) {
- int nextValue = importIterator.next();
-
- HashMap<Integer, Object> mapForCurrentLevel = rootMap;
-
- for (int i = 0; i < maxLevel - 1; i++) {
- int hashValue = MutableHashTable.hash(nextValue, i);
- int bucket = rangeCalculators[i].getBucket(hashValue);
- Object nextObject = mapForCurrentLevel.get(bucket);
- if (nextObject == null) {
- HashMap<Integer, Object> mapForNextLevel = new HashMap<Integer, Object>();
- mapForCurrentLevel.put(bucket, mapForNextLevel);
- mapForCurrentLevel = mapForNextLevel;
-
- } else {
- mapForCurrentLevel = (HashMap<Integer, Object>) nextObject;
- }
- }
-
- int lastHashValue = MutableHashTable.hash(nextValue, maxLevel - 1);
- int deepestBucketNr = rangeCalculators[maxLevel - 1]
- .getBucket(lastHashValue);
- Object countOnDeepestLevel = mapForCurrentLevel
- .get(deepestBucketNr);
- if (countOnDeepestLevel == null) {
- mapForCurrentLevel.put(deepestBucketNr, 1);
- } else {
- mapForCurrentLevel.put(deepestBucketNr,
- ((Integer) countOnDeepestLevel) + 1);
- }
-
- }
- }
-
- private void printStatistics() {
- for (int level = 0; level < maxLevel; level++) {
- int bucketCountInLevel = 0;
-
- SortedMap<Integer, Integer> levelMap = bucketSizesPerLevel
- .get(level);
- Iterator<Integer> bucketSizeIterator = levelMap.keySet().iterator();
-
- LOG.debug("Statistics for level: " + level);
- LOG.debug("----------------------------------------------");
- LOG.debug("");
- LOG.debug("Bucket Size | Count");
- LOG.debug("------------------------");
-
- int i = 0;
- while (bucketSizeIterator.hasNext()) {
- int bucketSize = bucketSizeIterator.next();
- if (bucketSize != 0) {
- int countForBucketSize = levelMap.get(bucketSize);
- bucketCountInLevel += countForBucketSize;
- Formatter formatter = new Formatter();
- formatter.format(" %10d | %10d", bucketSize, countForBucketSize);
-
- if (levelMap.size() < 20 || i < 3 || i >= (levelMap.size() - 3)) {
- LOG.debug(formatter.out().toString());
- } else if (levelMap.size() / 2 == i) {
- LOG.debug(" .. | ..");
- LOG.debug(formatter.out().toString());
- LOG.debug(" .. | ..");
- }
- i++;
- formatter.close();
- }
- }
-
- LOG.debug("");
- LOG.debug("Number of non-empty buckets in level: "
- + bucketCountInLevel);
- LOG.debug("Number of empty buckets in level : "
- + levelMap.get(0));
- LOG.debug("Number of different bucket sizes : "
- + (levelMap.size() - 1));
- LOG.debug("");
- LOG.debug("");
- LOG.debug("");
- }
- }
-
- /**
- * Create histogram over bucket sizes
- *
- * @param map
- * Map to be analyzed
- * @param level
- * Level on which the map is located in
- * @return The total count of hashed values in the map
- */
- private int collectStatistics(HashMap<Integer, Object> map, int level) {
- SortedMap<Integer, Integer> bucketSizesForLevel = bucketSizesPerLevel
- .get(level);
-
- Iterator<Object> bucketIterator = map.values().iterator();
- int bucketCount = 0;
- int totalValueCount = 0;
-
- while (bucketIterator.hasNext()) {
- bucketCount++;
-
- Integer hashValuesInBucket;
- // If we are already on the deepest level, get the count in the
- // bucket, otherwise
- // recursively examine the subtree
- if (level == maxLevel - 1) {
- hashValuesInBucket = (Integer) bucketIterator.next();
- } else {
- @SuppressWarnings("unchecked")
- HashMap<Integer, Object> nextMap = (HashMap<Integer, Object>) bucketIterator
- .next();
- hashValuesInBucket = collectStatistics(nextMap, level + 1);
- }
- totalValueCount += hashValuesInBucket;
- Integer countOfBucketSizes = bucketSizesForLevel
- .get(hashValuesInBucket);
- if (countOfBucketSizes == null) {
- countOfBucketSizes = 1;
- } else {
- countOfBucketSizes += 1;
- }
- bucketSizesForLevel.put(hashValuesInBucket, countOfBucketSizes);
- }
-
- Integer countOfEmptyBuckets = bucketSizesForLevel.get(0);
- if (countOfEmptyBuckets == null) {
- countOfEmptyBuckets = rangeCalculators[level].getBucketCount()
- - bucketCount;
- } else {
- countOfEmptyBuckets += rangeCalculators[level].getBucketCount()
- - bucketCount;
- }
- bucketSizesForLevel.put(0, countOfEmptyBuckets);
-
- return totalValueCount;
- }
-
- /**
- * Expected results for bucket sizes per level
- *
- *
- */
- static class BucketBoundaries {
-
- public static final int MAX_EMPTY_UNBOUNDED = -1;
- private int lowerBound;
- private int upperBound;
- private int maxEmpty;
- private double percentOutOfRange;
-
- /**
- *
- *
- * @param lowerBound Lower bound for bucket sizes
- * @param upperBound Upper bound for bucket sizes
- * @param maxEmpty Maximum number of empty buckets
- * @param percentOutOfRange Maximum percentage of buckets out of range
- */
- public BucketBoundaries(int lowerBound, int upperBound, int maxEmpty,
- double percentOutOfRange) {
- this.lowerBound = lowerBound;
- this.upperBound = upperBound;
- this.maxEmpty = maxEmpty;
- this.percentOutOfRange = percentOutOfRange;
- }
-
- /**
- *
- * @return Lower bound for bucket sizes
- */
- public int getLowerBound() {
- return lowerBound;
- }
-
- /**
- *
- * @return Upper bound for bucket sizes
- */
- public int getUpperBound() {
- return upperBound;
- }
-
- /**
- *
- * @return Maximum number of empty buckets
- */
- public int getMaxEmpty() {
- return maxEmpty;
- }
-
- /**
- *
- * @return Maximum percentage of buckets out of range
- */
- public double getPercentOutOfRange() {
- return percentOutOfRange;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java
deleted file mode 100644
index 6b7caa1..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/LastBitsToRange.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-/**
- * Takes the last m bits of a hash as bucket number
- *
- *
- */
-public class LastBitsToRange implements RangeCalculator {
-
- private final int mask;
- private final int bucketCount;
-
- /**
- * Create object which calculates bucket number according to the last m bits
- *
- * @param numberOfLastBits
- * Number of bits to be used for bucket calculation
- */
- public LastBitsToRange(int numberOfLastBits) {
- bucketCount = (int) Math.pow(2, numberOfLastBits);
- mask = bucketCount - 1;
- }
-
-
- @Override
- public int getBucket(int hash) {
- return hash & mask;
- }
-
-
- public int getBucketCount() {
- return bucketCount;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java
deleted file mode 100644
index 08e504b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RandomIterator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-import java.util.Iterator;
-import java.util.Random;
-
-/**
- * Gives a number of random integer values
- *
- *
- */
-public class RandomIterator implements Iterator<Integer> {
-
- private final Random randomGenerator;
- private final int count;
- private int currentCount;
-
- /**
- * Creates an iterator which gives random integer numbers
- *
- * @param seed
- * Seed for random generator
- * @param count
- * Number of values to be returned
- */
- public RandomIterator(long seed, int count) {
- this.randomGenerator = new Random(seed);
- this.count = count;
- this.currentCount = 0;
- }
-
- @Override
- public boolean hasNext() {
- return (currentCount < count);
- }
-
- @Override
- public Integer next() {
- currentCount++;
- return randomGenerator.nextInt();
- }
-
- @Override
- public void remove() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java
deleted file mode 100644
index 0f0162d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeCalculator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-/**
- * Interface for defining which hash code will be placed in which bucket
- *
- *
- */
-public interface RangeCalculator {
-
- /**
- * Calculates the number of the bucket in which the value should be placed
- * in
- *
- * @param hash
- * The hash of the value
- * @return The number of the bucket
- */
- public int getBucket(int hash);
-
- /**
- * Get the total bucket count
- *
- * @return The total bucket count
- */
- public int getBucketCount();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java
deleted file mode 100644
index 82bac8e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/RangeIterator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-import java.util.Iterator;
-
-/**
- * Iterates over a range of integer values
- *
- *
- */
-public class RangeIterator implements Iterator<Integer> {
-
- private final int maxValue;
- private int currentValue;
-
- /**
- * Create an iterator over the range from minValue to maxValue
- *
- * @param minValue
- * Smallest value returned by the iterator
- * @param maxValue
- * Largest value returned by the iterator
- */
- public RangeIterator(int minValue, int maxValue) {
- this.maxValue = maxValue;
- currentValue = minValue;
- }
-
- @Override
- public boolean hasNext() {
- return (currentValue < maxValue);
- }
-
- @Override
- public Integer next() {
- return currentValue++;
- }
-
- @Override
- public void remove() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5226f0b4/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java
deleted file mode 100644
index 7355754..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/util/StepRangeIterator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash.util;
-
-import java.util.Iterator;
-
-/**
- * Iterates over a range of integer values with a certain step width
- *
- *
- */
-public class StepRangeIterator implements Iterator<Integer> {
-
- private final int maxValue;
- private int currentValue;
- private final int step;
-
- /**
- * Create an iterator over the range from minValue to maxValue with a
- * certain step width
- *
- * @param minValue
- * Smallest value returned by the iterator
- * @param maxValue
- * Largest value returned by the iterator
- * @param step
- * Step width of the iterator
- */
- public StepRangeIterator(int minValue, int maxValue, int step) {
- this.maxValue = maxValue;
- currentValue = minValue;
- this.step = step;
- }
-
- @Override
- public boolean hasNext() {
- return (currentValue < maxValue);
- }
-
- @Override
- public Integer next() {
- int temp = currentValue;
- currentValue += step;
- return temp;
- }
-
- @Override
- public void remove() {
- }
-
-}
[2/2] flink git commit: [FLINK-1916] [FLINK-2361] [runtime] Fix
EOFException and entry loss in CompactingHashTable
Posted by se...@apache.org.
[FLINK-1916] [FLINK-2361] [runtime] Fix EOFException and entry loss in CompactingHashTable
Also a lot of code cleanups in CompactingHashTable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/925ac1f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/925ac1f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/925ac1f7
Branch: refs/heads/master
Commit: 925ac1f76bb84986764495407049a77552169d84
Parents: 5226f0b
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 3 20:46:17 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 5 21:37:15 2015 +0200
----------------------------------------------------------------------
.../common/typeutils/GenericPairComparator.java | 3 -
.../SolutionSetFastUpdateOutputCollector.java | 12 +-
.../io/SolutionSetUpdateOutputCollector.java | 13 +-
.../task/AbstractIterativePactTask.java | 3 +-
.../hash/AbstractMutableHashTable.java | 4 +-
.../operators/hash/CompactingHashTable.java | 629 ++++++++-----------
.../operators/hash/InMemoryPartition.java | 6 +-
.../operators/hash/MutableHashTable.java | 2 +-
.../apache/flink/runtime/util/IntArrayList.java | 4 +-
.../operators/hash/CompactingHashTableTest.java | 254 ++++++++
.../hash/HashTablePerformanceComparison.java | 15 +-
.../operators/hash/MemoryHashTableTest.java | 28 +-
12 files changed, 564 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
index 09d4aee..e3bf4ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericPairComparator.java
@@ -20,9 +20,6 @@ package org.apache.flink.api.common.typeutils;
import java.io.Serializable;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-
public class GenericPairComparator<T1, T2> extends TypePairComparator<T1, T2>
implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
index 3b2eda5..f326d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetFastUpdateOutputCollector.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.iterative.io;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
@@ -38,23 +37,20 @@ public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> {
private final Collector<T> delegate;
private final CompactingHashTable<T> solutionSet;
-
- private final T tmpHolder;
- public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
- this(solutionSet, serializer, null);
+ public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
+ this(solutionSet, null);
}
- public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
+ public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
this.solutionSet = solutionSet;
this.delegate = delegate;
- this.tmpHolder = serializer.createInstance();
}
@Override
public void collect(T record) {
try {
- solutionSet.insertOrReplaceRecord(record, tmpHolder);
+ solutionSet.insertOrReplaceRecord(record);
if (delegate != null) {
delegate.collect(record);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
index f400b7e..c39efa5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/io/SolutionSetUpdateOutputCollector.java
@@ -16,12 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.iterative.io;
import java.io.IOException;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
@@ -40,23 +38,20 @@ public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {
private final Collector<T> delegate;
private final CompactingHashTable<T> solutionSet;
-
- private final T tmpHolder;
- public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
- this(solutionSet, serializer, null);
+ public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
+ this(solutionSet, null);
}
- public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
+ public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
this.solutionSet = solutionSet;
this.delegate = delegate;
- this.tmpHolder = serializer.createInstance();
}
@Override
public void collect(T record) {
try {
- solutionSet.insertOrReplaceRecord(record, tmpHolder);
+ solutionSet.insertOrReplaceRecord(record);
if (delegate != null) {
delegate.collect(record);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 72434e6..67d8f56 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -329,8 +329,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
if (ss instanceof CompactingHashTable) {
@SuppressWarnings("unchecked")
CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
- TypeSerializer<OT> serializer = getOutputSerializer();
- return new SolutionSetUpdateOutputCollector<OT>(solutionSet, serializer, delegate);
+ return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
}
else if (ss instanceof JoinHashMap) {
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
index f3e8a22..86ce013 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractMutableHashTable.java
@@ -61,15 +61,13 @@ public abstract class AbstractMutableHashTable<T> {
public abstract void abort();
- public abstract void buildTable(final MutableObjectIterator<T> input) throws IOException;
-
public abstract List<MemorySegment> getFreeMemory();
// ------------- Modifier -------------
public abstract void insert(T record) throws IOException;
- public abstract void insertOrReplaceRecord(T record, T tempHolder) throws IOException;
+ public abstract void insertOrReplaceRecord(T record) throws IOException;
// ------------- Accessors -------------
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 6533e19..d07c7e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -22,10 +22,10 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -37,41 +37,20 @@ import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
/**
- * An implementation of an in-memory Hash Table for variable-length records.
- * <p>
- * The design of this class follows on many parts the design presented in
- * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al..
- * <p>
- * <hr>
- * The layout of the buckets inside a memory segment is as follows:
+ * A hash table that uses Flink's managed memory and supports replacement of records or
+ * updates to records. For an overview of the general data structure of the hash table, please
+ * refer to the description of the {@link org.apache.flink.runtime.operators.hash.MutableHashTable}.
+ *
+ * <p>The hash table is internally divided into two parts: The hash index, and the partition buffers
+ * that store the actual records. When records are inserted or updated, the hash table appends the
+ * records to its corresponding partition, and inserts or updates the entry in the hash index.
+ * In the case that the hash table runs out of memory, it compacts a partition by walking through the
+ * hash index and copying all reachable elements into a fresh partition. After that, it releases the
+ * memory of the partition to compact.</p>
*
- * <pre>
- * +----------------------------- Bucket x ----------------------------
- * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) |
- * |
- * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
- * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
- * |
- * |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
- * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
- * |
- * +---------------------------- Bucket x + 1--------------------------
- * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) |
- * |
- * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
- * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
- * |
- * |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
- * | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
- * +-------------------------------------------------------------------
- * | ...
- * |
- * </pre>
* @param <T> Record type stored in hash table
*/
-public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
+public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
private static final Logger LOG = LoggerFactory.getLogger(CompactingHashTable.class);
@@ -79,11 +58,10 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// Internal Constants
// ------------------------------------------------------------------------
+ /** The minimum number of memory segments that the compacting hash table needs to work properly */
private static final int MIN_NUM_MEMORY_SEGMENTS = 33;
- /**
- * The maximum number of partitions
- */
+ /** The maximum number of partitions */
private static final int MAX_NUM_PARTITIONS = 32;
/**
@@ -91,7 +69,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* used to determine the ratio of the number of memory segments intended for partition
* buffers and the number of memory segments in the hash-table structure.
*/
- private static final int DEFAULT_RECORD_LEN = 24; //FIXME maybe find a better default
+ private static final int DEFAULT_RECORD_LEN = 24;
/**
* The length of the hash code stored in the bucket.
@@ -155,14 +133,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// Members
// ------------------------------------------------------------------------
- /**
- * The free memory segments currently available to the hash join.
- */
+ /** The lock to synchronize state changes on */
+ private final Object stateLock = new Object();
+
+ /** The free memory segments currently available to the hash join. */
private final ArrayList<MemorySegment> availableMemory;
- /**
- * The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations.
- */
+ /** The size of the segments used by the hash join buckets.
+ * All segments must be of equal size to ease offset computations. */
private final int segmentSize;
/**
@@ -173,63 +151,59 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
*/
private final int bucketsPerSegmentMask;
- /**
- * The number of bits that describe the position of a bucket in a memory segment. Computed as log2(bucketsPerSegment).
- */
+ /** The number of bits that describe the position of a bucket in a memory segment.
+ * Computed as log2(bucketsPerSegment). */
private final int bucketsPerSegmentBits;
- /**
- * An estimate for the average record length.
- */
+ /** An estimate for the average record length. */
private final int avgRecordLen;
+
+ private final int pageSizeInBits;
// ------------------------------------------------------------------------
- /**
- * The partitions of the hash table.
- */
+ /** The partitions of the hash table. */
private final ArrayList<InMemoryPartition<T>> partitions;
- /**
- * The array of memory segments that contain the buckets which form the actual hash-table
- * of hash-codes and pointers to the elements.
- */
+ /** The array of memory segments that contain the buckets which form the actual hash-table
+ * of hash-codes and pointers to the elements. */
private MemorySegment[] buckets;
- /**
- * temporary storage for partition compaction (always attempts to allocate as many segments as the largest partition)
- */
+ /** Temporary storage for partition compaction (always attempts to allocate
+ * as many segments as the largest partition) */
private InMemoryPartition<T> compactionMemory;
- /**
- * The number of buckets in the current table. The bucket array is not necessarily fully
- * used, when not all buckets that would fit into the last segment are actually used.
- */
+ /** The number of buckets in the current table. The bucket array is not necessarily fully
+ * used, when not all buckets that would fit into the last segment are actually used. */
private int numBuckets;
- /**
- * flag necessary so a resize is never triggered during a resize since the code paths are interleaved
- */
- private boolean isResizing = false;
+ /** Flag to interrupt closed loops */
+ private boolean running = true;
+
+ /** Flag to mark the table as open / closed */
+ private boolean closed;
- private AtomicBoolean closed = new AtomicBoolean();
+ /** Flag necessary so a resize is never triggered during a resize since the code paths are interleaved */
+ private boolean isResizing;
- private boolean running = true;
-
- private int pageSizeInBits;
// ------------------------------------------------------------------------
// Construction and Teardown
// ------------------------------------------------------------------------
- public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments)
- {
+ public CompactingHashTable(TypeSerializer<T> buildSideSerializer,
+ TypeComparator<T> buildSideComparator,
+ List<MemorySegment> memorySegments) {
this(buildSideSerializer, buildSideComparator, memorySegments, DEFAULT_RECORD_LEN);
}
- public CompactingHashTable(TypeSerializer<T> buildSideSerializer, TypeComparator<T> buildSideComparator, List<MemorySegment> memorySegments, int avgRecordLen)
- {
+ public CompactingHashTable(TypeSerializer<T> buildSideSerializer,
+ TypeComparator<T> buildSideComparator,
+ List<MemorySegment> memorySegments,
+ int avgRecordLen) {
+
super(buildSideSerializer, buildSideComparator);
+
// some sanity checks first
if (memorySegments == null) {
throw new NullPointerException();
@@ -252,6 +226,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
if ( (this.segmentSize & this.segmentSize - 1) != 0) {
throw new IllegalArgumentException("Hash Table requires buffers whose size is a power of 2.");
}
+
+ this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
+
int bucketsPerSegment = this.segmentSize >> NUM_INTRA_BUCKET_BITS;
if (bucketsPerSegment == 0) {
throw new IllegalArgumentException("Hash Table requires buffers of at least " + HASH_BUCKET_SIZE + " bytes.");
@@ -262,22 +239,26 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
this.partitions = new ArrayList<InMemoryPartition<T>>();
// because we allow to open and close multiple times, the state is initially closed
- this.closed.set(true);
+ this.closed = true;
+
// so far no partition has any MemorySegments
}
// ------------------------------------------------------------------------
- // Life-Cycle
+ // life cycle
// ------------------------------------------------------------------------
/**
- * Build the hash table
+ * Initialize the hash table
*/
+ @Override
public void open() {
- // sanity checks
- if (!this.closed.compareAndSet(true, false)) {
- throw new IllegalStateException("Hash Table cannot be opened, because it is currently not closed.");
+ synchronized (stateLock) {
+ if (!closed) {
+ throw new IllegalStateException("currently not closed.");
+ }
+ closed = false;
}
// create the partitions
@@ -290,7 +271,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
initTable(numBuckets, (byte) partitionFanOut);
}
-
/**
* Closes the hash table. This effectively releases all internal structures and closes all
@@ -299,10 +279,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* all resources that are currently held by the hash join. If another process still access the hash
* table after close has been called no operations will be performed.
*/
+ @Override
public void close() {
// make sure that we close only once
- if (!this.closed.compareAndSet(false, true)) {
- return;
+ synchronized (this.stateLock) {
+ if (this.closed) {
+ return;
+ }
+ this.closed = true;
}
LOG.debug("Closing hash table and releasing resources.");
@@ -313,45 +297,41 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// clear the memory in the partitions
clearPartitions();
}
-
+
+ @Override
public void abort() {
this.running = false;
-
LOG.debug("Cancelling hash table operations.");
}
-
+
+ @Override
public List<MemorySegment> getFreeMemory() {
- if (!this.closed.get()) {
+ if (!this.closed) {
throw new IllegalStateException("Cannot return memory while join is open.");
}
return this.availableMemory;
}
-
-
- public void buildTable(final MutableObjectIterator<T> input) throws IOException {
- T record = this.buildSideSerializer.createInstance();
-
- // go over the complete input and insert every element into the hash table
- while (this.running && ((record = input.next(record)) != null)) {
- insert(record);
- }
- }
+ // ------------------------------------------------------------------------
+ // adding data to the hash table
+ // ------------------------------------------------------------------------
+
public void buildTableWithUniqueKey(final MutableObjectIterator<T> input) throws IOException {
- T record = this.buildSideSerializer.createInstance();
- T tmp = this.buildSideSerializer.createInstance();
-
// go over the complete input and insert every element into the hash table
- while (this.running && ((record = input.next(record)) != null)) {
- insertOrReplaceRecord(record, tmp);
+
+ T value;
+ while (this.running && (value = input.next()) != null) {
+ insertOrReplaceRecord(value);
}
}
-
+
+ @Override
public final void insert(T record) throws IOException {
- if(this.closed.get()) {
+ if (this.closed) {
return;
}
+
final int hashCode = hash(this.buildSideComparator.hash(record));
final int posHashCode = hashCode % this.numBuckets;
@@ -364,60 +344,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int partitionNumber = bucket.get(bucketInSegmentPos + HEADER_PARTITION_OFFSET);
InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
-
- long pointer;
- try {
- pointer = partition.appendRecord(record);
- if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
- this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
- }
- } catch (EOFException e) {
- try {
- compactPartition(partitionNumber);
- // retry append
- partition = this.partitions.get(partitionNumber); // compaction invalidates reference
- pointer = partition.appendRecord(record);
- } catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- } catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- }
- } catch (IndexOutOfBoundsException e1) {
- try {
- compactPartition(partitionNumber);
- // retry append
- partition = this.partitions.get(partitionNumber); // compaction invalidates reference
- pointer = partition.appendRecord(record);
- } catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- } catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- }
- }
- insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer);
- }
-
-
- @Override
- public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
- return new HashTableProber<PT>(probeSideComparator, pairComparator);
- }
-
- /**
- *
- * @return Iterator over hash table
- * @see EntryIterator
- */
- public MutableObjectIterator<T> getEntryIterator() {
- return new EntryIterator(this);
+ long pointer = insertRecordIntoPartition(record, partition, false);
+ insertBucketEntryFromStart(bucket, bucketInSegmentPos, hashCode, pointer, partitionNumber);
}
/**
@@ -425,11 +353,10 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* May trigger expensive compaction.
*
* @param record record to insert or replace
- * @param tempHolder instance of T that will be overwritten
* @throws IOException
*/
- public void insertOrReplaceRecord(T record, T tempHolder) throws IOException {
- if(this.closed.get()) {
+ public void insertOrReplaceRecord(T record) throws IOException {
+ if (this.closed) {
return;
}
@@ -437,14 +364,15 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int posHashCode = searchHashCode % this.numBuckets;
// get the bucket for the given hash code
- MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
- int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+ final MemorySegment originalBucket = this.buckets[posHashCode >> this.bucketsPerSegmentBits];
+ final int originalBucketOffset = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+
MemorySegment bucket = originalBucket;
int bucketInSegmentOffset = originalBucketOffset;
// get the basic characteristics of the bucket
final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
- InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
+ final InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
final MemorySegment[] overflowSegments = partition.overflowSegments;
this.buildSideComparator.setReference(record);
@@ -471,58 +399,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
numInSegment++;
// deserialize the key to check whether it is really equal, or whether we had only a hash collision
- try {
- tempHolder = partition.readRecordAt(pointer, tempHolder);
- if (this.buildSideComparator.equalToReference(tempHolder)) {
- long newPointer = partition.appendRecord(record);
- bucket.putLong(pointerOffset, newPointer);
- partition.setCompaction(false);
- if((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
- this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
- }
- return;
- }
- } catch (EOFException e) {
- // system is out of memory so we attempt to reclaim memory with a copy compact run
- long newPointer;
- try {
- compactPartition(partition.getPartitionNumber());
- // retry append
- partition = this.partitions.get(partitionNumber); // compaction invalidates reference
- newPointer = partition.appendRecord(record);
- } catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- } catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- }
- bucket.putLong(pointerOffset, newPointer);
- return;
- } catch (IndexOutOfBoundsException e) {
- // system is out of memory so we attempt to reclaim memory with a copy compact run
- long newPointer;
- try {
- compactPartition(partition.getPartitionNumber());
- // retry append
- partition = this.partitions.get(partitionNumber); // compaction invalidates reference
- newPointer = partition.appendRecord(record);
- } catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- } catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- }
+ T valueAtPosition = partition.readRecordAt(pointer);
+ if (this.buildSideComparator.equalToReference(valueAtPosition)) {
+ long newPointer = insertRecordIntoPartition(record, partition, true);
bucket.putLong(pointerOffset, newPointer);
return;
- } catch (IOException e) {
- throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
- }
+ }
}
else {
numInSegment++;
@@ -532,28 +414,89 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// this segment is done. check if there is another chained bucket
long newForwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
if (newForwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+
// nothing found. append and insert
- long pointer = partition.appendRecord(record);
- //insertBucketEntryFromStart(partition, originalBucket, originalBucketOffset, searchHashCode, pointer);
- insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
- if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
- this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
+ long pointer = insertRecordIntoPartition(record, partition, false);
+
+ if (countInSegment < NUM_ENTRIES_PER_BUCKET) {
+ // we are good in our current bucket, put the values
+ bucket.putInt(bucketInSegmentOffset + BUCKET_HEADER_LENGTH + (countInSegment * HASH_CODE_LEN), searchHashCode); // hash code
+ bucket.putLong(bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (countInSegment * POINTER_LEN), pointer); // pointer
+ bucket.putInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET, countInSegment + 1); // update count
+ }
+ else {
+ insertBucketEntryFromStart(originalBucket, originalBucketOffset, searchHashCode, pointer, partitionNumber);
}
return;
}
final int overflowSegNum = (int) (newForwardPointer >>> 32);
bucket = overflowSegments[overflowSegNum];
- bucketInSegmentOffset = (int) (newForwardPointer & 0xffffffff);
+ bucketInSegmentOffset = (int) newForwardPointer;
countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
numInSegment = 0;
currentForwardPointer = newForwardPointer;
}
}
+
+ private long insertRecordIntoPartition(T record, InMemoryPartition<T> partition,
+ boolean fragments) throws IOException {
+ try {
+ long pointer = partition.appendRecord(record);
+ if (fragments) {
+ partition.setIsCompacted(false);
+ }
+ if ((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
+ this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
+ }
+ return pointer;
+ }
+ catch (Exception e) {
+ if (e instanceof EOFException || e instanceof IndexOutOfBoundsException) {
+ // this indicates an out of memory situation
+ try {
+ final int partitionNumber = partition.getPartitionNumber();
+ compactPartition(partitionNumber);
+
+ // retry append
+ partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+ long newPointer = partition.appendRecord(record);
+ if ((newPointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
+ this.compactionMemory.allocateSegments((int)(newPointer >> this.pageSizeInBits));
+ }
+ return newPointer;
+ }
+ catch (EOFException ex) {
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() + " Message: " + ex.getMessage());
+ }
+ catch (IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() + " Message: " + ex.getMessage());
+ }
+ }
+ else if (e instanceof IOException) {
+ throw (IOException) e;
+ }
+ else //noinspection ConstantConditions
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ else {
+ throw new RuntimeException("Writing record to compacting hash table failed", e);
+ }
+ }
+ }
- private void insertBucketEntryFromStart(InMemoryPartition<T> p, MemorySegment bucket,
- int bucketInSegmentPos, int hashCode, long pointer)
+
+ /**
+ * IMPORTANT!!! We pass only the partition number, because we must make sure we get a fresh
+ * partition reference. The partition reference used during search for the key may have become
+ * invalid during the compaction.
+ */
+ private void insertBucketEntryFromStart(MemorySegment bucket, int bucketInSegmentPos,
+ int hashCode, long pointer, int partitionNumber)
throws IOException
{
boolean checkForResize = false;
@@ -564,8 +507,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode); // hash code
bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
bucket.putInt(bucketInSegmentPos + HEADER_COUNT_OFFSET, count + 1); // update count
- } else {
+ }
+ else {
// we need to go to the overflow buckets
+ final InMemoryPartition<T> p = this.partitions.get(partitionNumber);
+
final long originalForwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
final long forwardForNewBucket;
@@ -573,7 +519,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// forward pointer set
final int overflowSegNum = (int) (originalForwardPointer >>> 32);
- final int segOffset = (int) (originalForwardPointer & 0xffffffff);
+ final int segOffset = (int) originalForwardPointer;
final MemorySegment seg = p.overflowSegments[overflowSegNum];
final int obCount = seg.getInt(segOffset + HEADER_COUNT_OFFSET);
@@ -635,31 +581,42 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
bucket.putLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET, pointerToNewBucket);
// finally, insert the values into the overflow buckets
- overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode); // hash code
+ overflowSeg.putInt(overflowBucketOffset + BUCKET_HEADER_LENGTH, hashCode); // hash code
overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
// set the count to one
- overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
- if(checkForResize && !this.isResizing) {
+ overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
+
+ if (checkForResize && !this.isResizing) {
// check if we should resize buckets
- if(this.buckets.length <= getOverflowSegmentCount()) {
+ if (this.buckets.length <= getOverflowSegmentCount()) {
resizeHashTable();
}
}
}
}
-
- private void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException {
+
+ /**
+ * IMPORTANT!!! We pass only the partition number, because we must make sure we get a fresh
+ * partition reference. The partition reference used during search for the key may have become
+ * invalid during the compaction.
+ */
+ private void insertBucketEntryFromSearch(MemorySegment originalBucket, MemorySegment currentBucket,
+ int originalBucketOffset, int currentBucketOffset,
+ int countInCurrentBucket, long originalForwardPointer,
+ int hashCode, long pointer, int partitionNumber) throws IOException {
boolean checkForResize = false;
if (countInCurrentBucket < NUM_ENTRIES_PER_BUCKET) {
// we are good in our current bucket, put the values
- currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode); // hash code
+ currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode); // hash code
currentBucket.putLong(currentBucketOffset + BUCKET_POINTER_START_OFFSET + (countInCurrentBucket * POINTER_LEN), pointer); // pointer
currentBucket.putInt(currentBucketOffset + HEADER_COUNT_OFFSET, countInCurrentBucket + 1); // update count
- } else {
- // we need a new overflow bucket
+ }
+ else {
+ // we go to a new overflow bucket
+ final InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
MemorySegment overflowSeg;
- final int overflowBucketNum;
+ final int overflowSegmentNum;
final int overflowBucketOffset;
// first, see if there is space for an overflow bucket remaining in the last overflow segment
@@ -667,7 +624,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// no space left in last bucket, or no bucket yet, so create an overflow segment
overflowSeg = getNextBuffer();
overflowBucketOffset = 0;
- overflowBucketNum = partition.numOverflowSegments;
+ overflowSegmentNum = partition.numOverflowSegments;
// add the new overflow segment
if (partition.overflowSegments.length <= partition.numOverflowSegments) {
@@ -678,10 +635,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
partition.numOverflowSegments++;
checkForResize = true;
- } else {
+ }
+ else {
// there is space in the last overflow segment
- overflowBucketNum = partition.numOverflowSegments - 1;
- overflowSeg = partition.overflowSegments[overflowBucketNum];
+ overflowSegmentNum = partition.numOverflowSegments - 1;
+ overflowSeg = partition.overflowSegments[overflowSegmentNum];
overflowBucketOffset = partition.nextOverflowBucket << NUM_INTRA_BUCKET_BITS;
}
@@ -690,10 +648,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
partition.nextOverflowBucket = (partition.nextOverflowBucket == this.bucketsPerSegmentMask ? 0 : partition.nextOverflowBucket + 1);
// insert the new overflow bucket in the chain of buckets
+
// 1) set the old forward pointer
// 2) let the bucket in the main table point to this one
- overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, currentForwardPointer);
- final long pointerToNewBucket = (((long) overflowBucketNum) << 32) | ((long) overflowBucketOffset);
+ overflowSeg.putLong(overflowBucketOffset + HEADER_FORWARD_OFFSET, originalForwardPointer);
+ final long pointerToNewBucket = (((long) overflowSegmentNum) << 32) | ((long) overflowBucketOffset);
originalBucket.putLong(originalBucketOffset + HEADER_FORWARD_OFFSET, pointerToNewBucket);
// finally, insert the values into the overflow buckets
@@ -710,16 +669,33 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
}
}
+
+ // --------------------------------------------------------------------------------------------
+ // Access to the entries
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public <PT> HashTableProber<PT> getProber(TypeComparator<PT> probeSideComparator, TypePairComparator<PT, T> pairComparator) {
+ return new HashTableProber<PT>(probeSideComparator, pairComparator);
+ }
+
+ /**
+ *
+ * @return Iterator over hash table
+ * @see EntryIterator
+ */
+ public MutableObjectIterator<T> getEntryIterator() {
+ return new EntryIterator(this);
+ }
// --------------------------------------------------------------------------------------------
- // Setup and Tear Down of Structures
+ // Setup and Tear Down of Structures
// --------------------------------------------------------------------------------------------
private void createPartitions(int numPartitions) {
this.partitions.clear();
ListMemorySegmentSource memSource = new ListMemorySegmentSource(this.availableMemory);
- this.pageSizeInBits = MathUtils.log2strict(this.segmentSize);
for (int i = 0; i < numPartitions; i++) {
this.partitions.add(new InMemoryPartition<T>(this.buildSideSerializer, i, memSource, this.segmentSize, pageSizeInBits));
@@ -728,8 +704,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
private void clearPartitions() {
- for (int i = 0; i < this.partitions.size(); i++) {
- InMemoryPartition<T> p = this.partitions.get(i);
+ for (InMemoryPartition<T> p : this.partitions) {
p.clearAllMemory(this.availableMemory);
}
this.partitions.clear();
@@ -768,14 +743,14 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// set the counters back
this.numBuckets = 0;
if (this.buckets != null) {
- for (int i = 0; i < this.buckets.length; i++) {
- this.availableMemory.add(this.buckets[i]);
+ for (MemorySegment bucket : this.buckets) {
+ this.availableMemory.add(bucket);
}
this.buckets = null;
}
}
- private final MemorySegment getNextBuffer() {
+ private MemorySegment getNextBuffer() {
// check if the list directly offers memory
int s = this.availableMemory.size();
if (s > 0) {
@@ -799,7 +774,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* @param numBuffers The number of buffers available.
* @return The number of partitions to use.
*/
- private static final int getPartitioningFanOutNoEstimates(int numBuffers) {
+ private static int getPartitioningFanOutNoEstimates(int numBuffers) {
return Math.max(10, Math.min(numBuffers / 10, MAX_NUM_PARTITIONS));
}
@@ -807,14 +782,13 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* @return String containing a summary of the memory consumption for error messages
*/
private String getMemoryConsumptionString() {
- String result = new String("numPartitions: " + this.partitions.size() +
+ return "numPartitions: " + this.partitions.size() +
" minPartition: " + getMinPartition() +
" maxPartition: " + getMaxPartition() +
" number of overflow segments: " + getOverflowSegmentCount() +
" bucketSize: " + this.buckets.length +
- " Overall memory: " + getSize() +
- " Partition memory: " + getPartitionSize());
- return result;
+ " Overall memory: " + getSize() +
+ " Partition memory: " + getPartitionSize();
}
/**
@@ -878,7 +852,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
*/
private int getOverflowSegmentCount() {
int result = 0;
- for(InMemoryPartition<T> p : this.partitions) {
+ for (InMemoryPartition<T> p : this.partitions) {
result += p.numOverflowSegments;
}
return result;
@@ -890,7 +864,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
*
* @return number of buckets
*/
- private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
+ private static int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
final long totalSize = ((long) bufferSize) * numBuffers;
final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
@@ -908,7 +882,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* @param numPartitions number of partitions
* @return The hash code for the integer.
*/
- private static final byte assignPartition(int bucket, byte numPartitions) {
+ private static byte assignPartition(int bucket, byte numPartitions) {
return (byte) (bucket % numPartitions);
}
@@ -924,26 +898,27 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int newNumSegments = (newNumBuckets + (bucketsPerSegment-1)) / bucketsPerSegment;
final int additionalSegments = newNumSegments-this.buckets.length;
final int numPartitions = this.partitions.size();
- if(this.availableMemory.size() < additionalSegments) {
- for(int i = 0; i < numPartitions; i++) {
+
+ if (this.availableMemory.size() < additionalSegments) {
+ for (int i = 0; i < numPartitions; i++) {
compactPartition(i);
if(this.availableMemory.size() >= additionalSegments) {
break;
}
}
}
- if(this.availableMemory.size() < additionalSegments || this.closed.get()) {
+
+ if (this.availableMemory.size() < additionalSegments || this.closed) {
return false;
- } else {
+ }
+ else {
this.isResizing = true;
// allocate new buckets
final int startOffset = (this.numBuckets * HASH_BUCKET_SIZE) % this.segmentSize;
- MemorySegment[] newBuckets = new MemorySegment[additionalSegments];
final int oldNumBuckets = this.numBuckets;
final int oldNumSegments = this.buckets.length;
MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments];
System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length);
- System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length);
this.buckets = mergedBuckets;
this.numBuckets = newNumBuckets;
// initialize all new buckets
@@ -951,7 +926,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int startSegment = oldSegment ? (oldNumSegments-1) : oldNumSegments;
for (int i = startSegment, bucket = oldNumBuckets; i < newNumSegments && bucket < this.numBuckets; i++) {
MemorySegment seg;
- int bucketOffset = 0;
+ int bucketOffset;
if(oldSegment) { // the first couple of new buckets may be located on an old segment
seg = this.buckets[i];
for (int k = (oldNumBuckets % bucketsPerSegment) ; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
@@ -965,7 +940,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
seg = getNextBuffer();
// go over all buckets in the segment
for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
- bucketOffset = k * HASH_BUCKET_SIZE;
+ bucketOffset = k * HASH_BUCKET_SIZE;
// initialize the header fields
seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
@@ -975,19 +950,21 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
this.buckets[i] = seg;
oldSegment = false; // we write on at most one old segment
}
- int hashOffset = 0;
- int hash = 0;
- int pointerOffset = 0;
- long pointer = 0;
+ int hashOffset;
+ int hash;
+ int pointerOffset;
+ long pointer;
IntArrayList hashList = new IntArrayList(NUM_ENTRIES_PER_BUCKET);
LongArrayList pointerList = new LongArrayList(NUM_ENTRIES_PER_BUCKET);
IntArrayList overflowHashes = new IntArrayList(64);
LongArrayList overflowPointers = new LongArrayList(64);
+
// go over all buckets and split them between old and new buckets
- for(int i = 0; i < numPartitions; i++) {
+ for (int i = 0; i < numPartitions; i++) {
InMemoryPartition<T> partition = this.partitions.get(i);
final MemorySegment[] overflowSegments = partition.overflowSegments;
- int posHashCode = 0;
+
+ int posHashCode;
for (int j = 0, bucket = i; j < this.buckets.length && bucket < oldNumBuckets; j++) {
MemorySegment segment = this.buckets[j];
// go over all buckets in the segment belonging to the partition
@@ -1021,7 +998,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
final int overflowSegNum = (int) (forwardPointer >>> 32);
segment = overflowSegments[overflowSegNum];
- bucketOffset = (int)(forwardPointer & 0xffffffff);
+ bucketOffset = (int) forwardPointer;
countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
@@ -1038,25 +1015,29 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
int newSegmentIndex = (bucket + oldNumBuckets) / bucketsPerSegment;
MemorySegment newSegment = this.buckets[newSegmentIndex];
+
// we need to avoid overflows in the first run
int oldBucketCount = 0;
int newBucketCount = 0;
- while(!hashList.isEmpty()) {
+ while (!hashList.isEmpty()) {
hash = hashList.removeLast();
pointer = pointerList.removeLong(pointerList.size()-1);
posHashCode = hash % this.numBuckets;
- if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
+ if (posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
bucketOffset = (bucket % bucketsPerSegment) * HASH_BUCKET_SIZE;
- insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer);
+ insertBucketEntryFromStart(segment, bucketOffset, hash, pointer, partition.getPartitionNumber());
oldBucketCount++;
- } else if(posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
+ }
+ else if (posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
bucketOffset = ((bucket + oldNumBuckets) % bucketsPerSegment) * HASH_BUCKET_SIZE;
- insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer);
+ insertBucketEntryFromStart(newSegment, bucketOffset, hash, pointer, partition.getPartitionNumber());
newBucketCount++;
- } else if(posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
+ }
+ else if (posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
overflowHashes.add(hash);
overflowPointers.add(pointer);
- } else {
+ }
+ else {
throw new IOException("Accessed wrong bucket. Target: " + bucket + " or " + (bucket + oldNumBuckets) + " Hit: " + posHashCode);
}
}
@@ -1067,9 +1048,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// reset partition's overflow buckets and reclaim their memory
this.availableMemory.addAll(partition.resetOverflowBuckets());
// clear overflow lists
- int bucketArrayPos = 0;
- int bucketInSegmentPos = 0;
- MemorySegment bucket = null;
+ int bucketArrayPos;
+ int bucketInSegmentPos;
+ MemorySegment bucket;
while(!overflowHashes.isEmpty()) {
hash = overflowHashes.removeLast();
pointer = overflowPointers.removeLong(overflowPointers.size()-1);
@@ -1077,7 +1058,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
bucket = this.buckets[bucketArrayPos];
- insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hash, pointer);
+ insertBucketEntryFromStart(bucket, bucketInSegmentPos, hash, pointer, partition.getPartitionNumber());
}
overflowHashes.clear();
overflowPointers.clear();
@@ -1095,7 +1076,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
*/
private void compactPartition(final int partitionNumber) throws IOException {
// do nothing if table was closed, parameter is invalid or no garbage exists
- if(this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
+ if (this.closed || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
return;
}
// release all segments owned by compaction partition
@@ -1106,9 +1087,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int numPartitions = this.partitions.size();
InMemoryPartition<T> partition = this.partitions.remove(partitionNumber);
MemorySegment[] overflowSegments = partition.overflowSegments;
- long pointer = 0L;
- int pointerOffset = 0;
- int bucketOffset = 0;
+ long pointer;
+ int pointerOffset;
+ int bucketOffset;
final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
for (int i = 0, bucket = partitionNumber; i < this.buckets.length && bucket < this.numBuckets; i++) {
MemorySegment segment = this.buckets[i];
@@ -1138,7 +1119,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
final int overflowSegNum = (int) (forwardPointer >>> 32);
segment = overflowSegments[overflowSegNum];
- bucketOffset = (int)(forwardPointer & 0xffffffff);
+ bucketOffset = (int) forwardPointer;
countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
numInSegment = 0;
@@ -1152,7 +1133,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
this.partitions.get(partitionNumber).overflowSegments = partition.overflowSegments;
this.partitions.get(partitionNumber).numOverflowSegments = partition.numOverflowSegments;
this.partitions.get(partitionNumber).nextOverflowBucket = partition.nextOverflowBucket;
- this.partitions.get(partitionNumber).setCompaction(true);
+ this.partitions.get(partitionNumber).setIsCompacted(true);
//this.partitions.get(partitionNumber).pushDownPages();
this.compactionMemory = partition;
this.compactionMemory.resetRecordCounter();
@@ -1169,22 +1150,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
/**
- * Compacts partition but may not reclaim all garbage
- *
- * @param partitionNumber partition number
- * @throws IOException
- */
- @SuppressWarnings("unused")
- private void fastCompactPartition(int partitionNumber) throws IOException {
- // stop if no garbage exists
- if(this.partitions.get(partitionNumber).isCompacted()) {
- return;
- }
- //TODO IMPLEMENT ME
- return;
- }
-
- /**
* This function hashes an integer value. It is adapted from Bob Jenkins' website
* <a href="http://www.burtleburtle.net/bob/hash/integer.html">http://www.burtleburtle.net/bob/hash/integer.html</a>.
* The hash function has the <i>full avalanche</i> property, meaning that every bit of the value to be hashed
@@ -1193,7 +1158,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* @param code The integer to be hashed.
* @return The hash code for the integer.
*/
- private static final int hash(int code) {
+ private static int hash(int code) {
code = (code + 0x7ed55d16) + (code << 12);
code = (code ^ 0xc761c23c) ^ (code >>> 19);
code = (code + 0x165667b1) + (code << 5);
@@ -1235,7 +1200,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
@Override
public T next() throws IOException {
- if(done || this.table.closed.get()) {
+ if (done || this.table.closed) {
return null;
} else if(!cache.isEmpty()) {
return cache.remove(cache.size()-1);
@@ -1294,7 +1259,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
final int overflowSegNum = (int) (forwardPointer >>> 32);
bucket = overflowSegments[overflowSegNum];
- bucketOffset = (int)(forwardPointer & 0xffffffff);
+ bucketOffset = (int) forwardPointer;
countInSegment = bucket.getInt(bucketOffset + HEADER_COUNT_OFFSET);
posInSegment = bucketOffset + BUCKET_POINTER_START_OFFSET;
numInSegment = 0;
@@ -1326,7 +1291,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
public T getMatchFor(PT probeSideRecord, T reuse) {
- if(closed.get()) {
+ if (closed) {
return null;
}
final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
@@ -1391,7 +1356,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int overflowSegNum = (int) (forwardPointer >>> 32);
bucket = overflowSegments[overflowSegNum];
- bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+ bucketInSegmentOffset = (int) forwardPointer;
countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
numInSegment = 0;
@@ -1399,7 +1364,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
public T getMatchFor(PT probeSideRecord) {
- if(closed.get()) {
+ if (closed) {
return null;
}
final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
@@ -1464,7 +1429,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
final int overflowSegNum = (int) (forwardPointer >>> 32);
bucket = overflowSegments[overflowSegNum];
- bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+ bucketInSegmentOffset = (int) forwardPointer;
countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
numInSegment = 0;
@@ -1472,49 +1437,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
public void updateMatch(T record) throws IOException {
- if(closed.get()) {
+ if (closed) {
return;
}
- long newPointer;
- try {
- newPointer = this.partition.appendRecord(record);
- } catch (EOFException e) {
- // system is out of memory so we attempt to reclaim memory with a copy compact run
- try {
- int partitionNumber = this.partition.getPartitionNumber();
- compactPartition(partitionNumber);
- // retry append
- this.partition = partitions.get(partitionNumber);
- newPointer = this.partition.appendRecord(record);
- } catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- } catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- }
- } catch (IndexOutOfBoundsException e) {
- // system is out of memory so we attempt to reclaim memory with a copy compact run
- try {
- int partitionNumber = this.partition.getPartitionNumber();
- compactPartition(partitionNumber);
- // retry append
- this.partition = partitions.get(partitionNumber);
- newPointer = this.partition.appendRecord(record);
- } catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- } catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. " +
- getMemoryConsumptionString() +
- " Message: " + ex.getMessage());
- }
- }
+ long newPointer = insertRecordIntoPartition(record, this.partition, true);
this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
- this.partition.setCompaction(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index 7fb997e..ffb66fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -199,7 +199,7 @@ public class InMemoryPartition<T> {
*
* @param compacted compaction status
*/
- public void setCompaction(boolean compacted) {
+ public void setIsCompacted(boolean compacted) {
this.compacted = compacted;
}
@@ -281,9 +281,9 @@ public class InMemoryPartition<T> {
* @param numberOfSegments allocation count
*/
public void allocateSegments(int numberOfSegments) {
- while(getBlockCount() < numberOfSegments) {
+ while (getBlockCount() < numberOfSegments) {
MemorySegment next = this.availableMemory.nextSegment();
- if(next != null) {
+ if (next != null) {
this.partitionPages.add(next);
} else {
return;
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 9416796..7f07cfb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -886,7 +886,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
// forward pointer set
final int overflowSegNum = (int) (originalForwardPointer >>> 32);
- final int segOffset = (int) (originalForwardPointer & 0xffffffff);
+ final int segOffset = (int) originalForwardPointer;
final MemorySegment seg = p.overflowSegments[overflowSegNum];
final short obCount = seg.getShort(segOffset + HEADER_COUNT_OFFSET);
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
index 999c4b0..27d958a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -71,10 +71,12 @@ public class IntArrayList {
public static final IntArrayList EMPTY = new IntArrayList(0) {
+ @Override
public boolean add(int number) {
throw new UnsupportedOperationException();
}
-
+
+ @Override
public int removeLast() {
throw new UnsupportedOperationException();
};
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
new file mode 100644
index 0000000..e3b697e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+public class CompactingHashTableTest {
+
+ private final TypeSerializer<Tuple2<Long, String>> serializer;
+ private final TypeComparator<Tuple2<Long, String>> comparator;
+
+ private final TypeComparator<Long> probeComparator;
+
+ private final TypePairComparator<Long, Tuple2<Long, String>> pairComparator;
+
+
+ public CompactingHashTableTest() {
+ TypeSerializer<?>[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE };
+ @SuppressWarnings("unchecked")
+ Class<Tuple2<Long, String>> clazz = (Class<Tuple2<Long, String>>) (Class<?>) Tuple2.class;
+ this.serializer = new TupleSerializer<Tuple2<Long, String>>(clazz, fieldSerializers);
+
+ TypeComparator<?>[] comparators = { new LongComparator(true) };
+ TypeSerializer<?>[] comparatorSerializers = { LongSerializer.INSTANCE };
+
+ this.comparator = new TupleComparator<Tuple2<Long, String>>(new int[] {0}, comparators, comparatorSerializers);
+
+ this.probeComparator = new LongComparator(true);
+
+ this.pairComparator = new TypePairComparator<Long, Tuple2<Long, String>>() {
+
+ private long ref;
+
+ @Override
+ public void setReference(Long reference) {
+ ref = reference;
+ }
+
+ @Override
+ public boolean equalToReference(Tuple2<Long, String> candidate) {
+ //noinspection UnnecessaryUnboxing
+ return candidate.f0.longValue() == ref;
+ }
+
+ @Override
+ public int compareToReference(Tuple2<Long, String> candidate) {
+ long x = ref;
+ long y = candidate.f0;
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+ };
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testHashTableGrowthWithInsert() {
+ try {
+ final int numElements = 1000000;
+
+ List<MemorySegment> memory = getMemory(10000, 32 * 1024);
+
+ // we create a hash table that thinks the records are super large. that makes it choose initially
+ // a lot of memory for the partition buffers, and start with a smaller hash table. that way
+ // we trigger a hash table growth early.
+ CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
+ serializer, comparator, memory, 10000);
+ table.open();
+
+ for (long i = 0; i < numElements; i++) {
+ table.insert(new Tuple2<Long, String>(i, String.valueOf(i)));
+ }
+
+ // make sure that all elements are contained via the entry iterator
+ {
+ BitSet bitSet = new BitSet(numElements);
+ MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator();
+ Tuple2<Long, String> next;
+ while ((next = iter.next()) != null) {
+ assertNotNull(next.f0);
+ assertNotNull(next.f1);
+ assertEquals(next.f0.longValue(), Long.parseLong(next.f1));
+
+ bitSet.set(next.f0.intValue());
+ }
+
+ assertEquals(numElements, bitSet.cardinality());
+ }
+
+ // make sure all entries are contained via the prober
+ {
+ CompactingHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper =
+ table.getProber(probeComparator, pairComparator);
+
+ for (long i = 0; i < numElements; i++) {
+ assertNotNull(proper.getMatchFor(i));
+ assertNull(proper.getMatchFor(i + numElements));
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * This test validates that records are not lost via "insertOrReplace()" as in bug [FLINK-2361]
+ */
+ @Test
+ public void testHashTableGrowthWithInsertOrReplace() {
+ try {
+ final int numElements = 1000000;
+
+ List<MemorySegment> memory = getMemory(10000, 32 * 1024);
+
+ // we create a hash table that thinks the records are super large. that makes it choose initially
+ // a lot of memory for the partition buffers, and start with a smaller hash table. that way
+ // we trigger a hash table growth early.
+ CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
+ serializer, comparator, memory, 10000);
+ table.open();
+
+ for (long i = 0; i < numElements; i++) {
+ table.insertOrReplaceRecord(new Tuple2<Long, String>(i, String.valueOf(i)));
+ }
+
+ // make sure that all elements are contained via the entry iterator
+ {
+ BitSet bitSet = new BitSet(numElements);
+ MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator();
+ Tuple2<Long, String> next;
+ while ((next = iter.next()) != null) {
+ assertNotNull(next.f0);
+ assertNotNull(next.f1);
+ assertEquals(next.f0.longValue(), Long.parseLong(next.f1));
+
+ bitSet.set(next.f0.intValue());
+ }
+
+ assertEquals(numElements, bitSet.cardinality());
+ }
+
+ // make sure all entries are contained via the prober
+ {
+ CompactingHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper =
+ table.getProber(probeComparator, pairComparator);
+
+ for (long i = 0; i < numElements; i++) {
+ assertNotNull(proper.getMatchFor(i));
+ assertNull(proper.getMatchFor(i + numElements));
+ }
+ }
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * This test validates that new inserts (rather than updates) in "insertOrReplace()" properly
+ * react to out of memory conditions.
+ */
+ @Test
+ public void testInsertsWithInsertOrReplace() {
+ try {
+ final int numElements = 1000;
+
+ final String longString = getLongString(10000);
+ List<MemorySegment> memory = getMemory(1000, 32 * 1024);
+
+ // we create a hash table that thinks the records are super large. that makes it choose initially
+ // a lot of memory for the partition buffers, and start with a smaller hash table. that way
+ // we trigger a hash table growth early.
+ CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
+ serializer, comparator, memory, 100);
+ table.open();
+
+ // first, we insert some elements
+ for (long i = 0; i < numElements; i++) {
+ table.insertOrReplaceRecord(new Tuple2<Long, String>(i, longString));
+ }
+
+ // now, we replace the same elements, causing fragmentation
+ for (long i = 0; i < numElements; i++) {
+ table.insertOrReplaceRecord(new Tuple2<Long, String>(i, longString));
+ }
+
+ // now we insert an additional set of elements. without compaction during this insertion,
+ // the memory will run out
+ for (long i = 0; i < numElements; i++) {
+ table.insertOrReplaceRecord(new Tuple2<Long, String>(i + numElements, longString));
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+ ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ list.add(new MemorySegment(new byte[segmentSize]));
+ }
+ return list;
+ }
+
+ private static String getLongString(int length) {
+ StringBuilder bld = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ bld.append('a');
+ }
+ return bld.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index a8941a4..0c656d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -27,10 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
-import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.hash.MutableHashTable;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
@@ -38,6 +34,7 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
+
import org.junit.Test;
import static org.junit.Assert.*;
@@ -72,8 +69,8 @@ public class HashTablePerformanceComparison {
MutableObjectIterator<IntPair> updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
- long start = 0L;
- long end = 0L;
+ long start;
+ long end;
long first = System.currentTimeMillis();
@@ -105,7 +102,7 @@ public class HashTablePerformanceComparison {
start = System.currentTimeMillis();
while(updater.next(target) != null) {
target.setValue(target.getValue()*-1);
- table.insertOrReplaceRecord(target, temp);
+ table.insertOrReplaceRecord(target);
}
end = System.currentTimeMillis();
System.out.println("Update done. Time: " + (end-start) + " ms");
@@ -147,8 +144,8 @@ public class HashTablePerformanceComparison {
MutableObjectIterator<IntPair> updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
- long start = 0L;
- long end = 0L;
+ long start;
+ long end;
long first = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/flink/blob/925ac1f7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
index 2ebcd43..3dcf688 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
@@ -27,9 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
-import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
import org.apache.flink.runtime.operators.testutils.types.IntList;
import org.apache.flink.runtime.operators.testutils.types.IntListComparator;
@@ -45,7 +42,9 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
+
import org.junit.Test;
+
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.*;
@@ -235,9 +234,8 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
- IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
- table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+ table.insertOrReplaceRecord(overwriteLists[i]);
}
for (int i = 0; i < NUM_LISTS; i++) {
@@ -278,10 +276,9 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
- IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
if( i % 100 != 0) {
- table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+ table.insertOrReplaceRecord(overwriteLists[i]);
lists[i] = overwriteLists[i];
}
}
@@ -327,10 +324,9 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS/STEP_SIZE, rnd);
// test replacing
- IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i += STEP_SIZE) {
overwriteLists[i/STEP_SIZE].setKey(overwriteLists[i/STEP_SIZE].getKey()*STEP_SIZE);
- table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE], tempHolder);
+ table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE]);
lists[i] = overwriteLists[i/STEP_SIZE];
}
@@ -379,9 +375,8 @@ public class MemoryHashTableTest {
for(int k = 0; k < NUM_REWRITES; k++) {
overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
- IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
- table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+ table.insertOrReplaceRecord(overwriteLists[i]);
}
for (int i = 0; i < NUM_LISTS; i++) {
@@ -409,11 +404,7 @@ public class MemoryHashTableTest {
table.open();
for (int i = 0; i < NUM_LISTS; i++) {
- try {
- table.insert(lists[i]);
- } catch (Exception e) {
- throw e;
- }
+ table.insert(lists[i]);
}
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
@@ -630,9 +621,8 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
- IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
- table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+ table.insertOrReplaceRecord(overwriteLists[i]);
}
Field list = Whitebox.getField(CompactingHashTable.class, "partitions");
@@ -691,7 +681,7 @@ public class MemoryHashTableTest {
while(updater.next(target) != null) {
target.setValue(target.getValue());
- table.insertOrReplaceRecord(target, temp);
+ table.insertOrReplaceRecord(target);
}
while (updateTester.next(target) != null) {