You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/16 18:50:48 UTC
[3/3] git commit: [FLINK-955] ResizingHashTable: automatic resizing,
IndexOutOfBoundsException fixed, pointers
[FLINK-955] ResizingHashTable: automatic resizing, IndexOutOfBoundsException fixed, pointers
This closes #57
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/c04c9bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/c04c9bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/c04c9bbb
Branch: refs/heads/master
Commit: c04c9bbb27905b1e3332aca7073a64fab24ade1c
Parents: 8ed2b76
Author: rwaury <ro...@googlemail.com>
Authored: Mon Jun 23 23:31:48 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 16 18:49:51 2014 +0200
----------------------------------------------------------------------
.../operators/hash/CompactingHashTable.java | 590 +++++++++++++------
.../operators/hash/InMemoryPartition.java | 60 +-
.../apache/flink/runtime/util/FileUtils.java | 2 -
.../apache/flink/runtime/util/IntArrayList.java | 74 +++
.../util/KeyGroupedMutableObjectIterator.java | 6 +-
.../flink/runtime/util/LongArrayList.java | 74 +++
.../operators/hash/MemoryHashTableTest.java | 383 ++++++++++--
7 files changed, 946 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 890ffe3..239786d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.hash;
-
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,26 +31,24 @@ import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memorymanager.ListMemorySegmentSource;
+import org.apache.flink.runtime.util.IntArrayList;
+import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
-
/**
* An implementation of an in-memory Hash Table for variable-length records.
* <p>
* The design of this class follows on many parts the design presented in
* "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al..
- *<p>
- *
- *
+ * <p>
* <hr>
- *
* The layout of the buckets inside a memory segment is as follows:
*
* <pre>
* +----------------------------- Bucket x ----------------------------
- * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) |
* |
* |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
* | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -61,8 +57,8 @@ import org.apache.flink.util.MutableObjectIterator;
* | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
* |
* +---------------------------- Bucket x + 1--------------------------
- * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * |Partition (1 byte) | reserved (3 bytes) | element count (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) |
* |
* |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
* | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -73,10 +69,7 @@ import org.apache.flink.util.MutableObjectIterator;
* | ...
* |
* </pre>
- * @param <T>
- *
- * @param T record type stored in hash table
- *
+ * @param <T> Record type stored in hash table
*/
public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
@@ -121,7 +114,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* actual hash table buckets, consisting of a 4 byte hash value and an 8 byte
* pointer, plus the overhead for the stored length field.
*/
- private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES;
+ private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES + 2;
// -------------------------- Bucket Size and Structure -------------------------------------
@@ -131,7 +124,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
private static final int BUCKET_HEADER_LENGTH = 16;
- private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_OVERHEAD_BYTES;
+ private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_TABLE_BYTES;
private static final int BUCKET_POINTER_START_OFFSET = BUCKET_HEADER_LENGTH + (NUM_ENTRIES_PER_BUCKET * HASH_CODE_LEN);
@@ -214,6 +207,11 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
*/
private int numBuckets;
+ /**
+ * flag necessary so a resize is never triggered during a resize since the code paths are interleaved
+ */
+ private boolean isResizing = false;
+
private AtomicBoolean closed = new AtomicBoolean();
private boolean running = true;
@@ -237,7 +235,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
throw new NullPointerException();
}
if (memorySegments.size() < MIN_NUM_MEMORY_SEGMENTS) {
- throw new IllegalArgumentException("Too few memory segments provided. Hash Join needs at least " +
+ throw new IllegalArgumentException("Too few memory segments provided. Hash Table needs at least " +
MIN_NUM_MEMORY_SEGMENTS + " memory segments.");
}
@@ -275,8 +273,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
/**
* Build the hash table
- *
- * @throws IOException Thrown, if an I/O problem occurs while spilling a partition.
*/
public void open() {
// sanity checks
@@ -300,7 +296,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* Closes the hash table. This effectively releases all internal structures and closes all
* open files and removes them. The call to this method is valid both as a cleanup after the
* complete inputs were properly processed, and as an cancellation call, which cleans up
- * all resources that are currently held by the hash join.
+ * all resources that are currently held by the hash join. If another process still access the hash
+ * table after close has been called no operations will be performed.
*/
public void close() {
// make sure that we close only once
@@ -355,12 +352,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// get the basic characteristics of the bucket
final int partitionNumber = bucket.get(bucketInSegmentPos + HEADER_PARTITION_OFFSET);
- final InMemoryPartition<T> p = this.partitions.get(partitionNumber);
+ InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
long pointer;
try {
- pointer = p.appendRecord(record);
+ pointer = partition.appendRecord(record);
if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
}
@@ -368,44 +365,34 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
try {
compactPartition(partitionNumber);
// retry append
- pointer = this.partitions.get(partitionNumber).appendRecord(record);
+ partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+ pointer = partition.appendRecord(record);
} catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
}
} catch (IndexOutOfBoundsException e1) {
try {
compactPartition(partitionNumber);
// retry append
- pointer = this.partitions.get(partitionNumber).appendRecord(record);
+ partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+ pointer = partition.appendRecord(record);
} catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
}
}
- insertBucketEntryFromStart(p, bucket, bucketInSegmentPos, hashCode, pointer);
+ insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hashCode, pointer);
}
@@ -435,6 +422,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
if(this.closed.get()) {
return;
}
+
final int searchHashCode = hash(this.buildSideComparator.hash(record));
final int posHashCode = searchHashCode % this.numBuckets;
@@ -446,7 +434,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// get the basic characteristics of the bucket
final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
- final InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
+ InMemoryPartition<T> partition = this.partitions.get(partitionNumber);
final MemorySegment[] overflowSegments = partition.overflowSegments;
this.buildSideComparator.setReference(record);
@@ -490,21 +478,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
try {
compactPartition(partition.getPartitionNumber());
// retry append
- newPointer = this.partitions.get(partitionNumber).appendRecord(record);
+ partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+ newPointer = partition.appendRecord(record);
} catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
}
bucket.putLong(pointerOffset, newPointer);
return;
@@ -514,21 +497,16 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
try {
compactPartition(partition.getPartitionNumber());
// retry append
- newPointer = this.partitions.get(partitionNumber).appendRecord(record);
+ partition = this.partitions.get(partitionNumber); // compaction invalidates reference
+ newPointer = partition.appendRecord(record);
} catch (EOFException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
} catch (IndexOutOfBoundsException ex) {
- throw new RuntimeException("Memory ran out. Compaction failed. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length +
- " Message: " + ex.getMessage());
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
}
bucket.putLong(pointerOffset, newPointer);
return;
@@ -546,6 +524,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
if (newForwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
// nothing found. append and insert
long pointer = partition.appendRecord(record);
+ //insertBucketEntryFromStart(partition, originalBucket, originalBucketOffset, searchHashCode, pointer);
insertBucketEntryFromSearch(partition, originalBucket, bucket, originalBucketOffset, bucketInSegmentOffset, countInSegment, currentForwardPointer, searchHashCode, pointer);
if((pointer >> this.pageSizeInBits) > this.compactionMemory.getBlockCount()) {
this.compactionMemory.allocateSegments((int)(pointer >> this.pageSizeInBits));
@@ -567,6 +546,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
int bucketInSegmentPos, int hashCode, long pointer)
throws IOException
{
+ boolean checkForResize = false;
// find the position to put the hash code and pointer
final int count = bucket.getInt(bucketInSegmentPos + HEADER_COUNT_OFFSET);
if (count < NUM_ENTRIES_PER_BUCKET) {
@@ -574,8 +554,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode); // hash code
bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
bucket.putInt(bucketInSegmentPos + HEADER_COUNT_OFFSET, count + 1); // update count
- }
- else {
+ } else {
// we need to go to the overflow buckets
final long originalForwardPointer = bucket.getLong(bucketInSegmentPos + HEADER_FORWARD_OFFSET);
final long forwardForNewBucket;
@@ -596,14 +575,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
seg.putLong(segOffset + BUCKET_POINTER_START_OFFSET + (obCount * POINTER_LEN), pointer); // pointer
seg.putInt(segOffset + HEADER_COUNT_OFFSET, obCount + 1); // update count
return;
- }
- else {
+ } else {
// no space here, we need a new bucket. this current overflow bucket will be the
// target of the new overflow bucket
forwardForNewBucket = originalForwardPointer;
}
- }
- else {
+ } else {
// no overflow bucket yet, so we need a first one
forwardForNewBucket = BUCKET_FORWARD_POINTER_NOT_SET;
}
@@ -628,8 +605,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
p.overflowSegments[p.numOverflowSegments] = overflowSeg;
p.numOverflowSegments++;
- }
- else {
+ checkForResize = true;
+ } else {
// there is space in the last overflow bucket
overflowBucketNum = p.numOverflowSegments - 1;
overflowSeg = p.overflowSegments[overflowBucketNum];
@@ -653,17 +630,23 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// set the count to one
overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
+ if(checkForResize && !this.isResizing) {
+ // check if we should resize buckets
+ if(this.buckets.length <= getOverflowSegmentCount()) {
+ resizeHashTable();
+ }
+ }
}
}
- private final void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) {
+ private final void insertBucketEntryFromSearch(InMemoryPartition<T> partition, MemorySegment originalBucket, MemorySegment currentBucket, int originalBucketOffset, int currentBucketOffset, int countInCurrentBucket, long currentForwardPointer, int hashCode, long pointer) throws IOException {
+ boolean checkForResize = false;
if (countInCurrentBucket < NUM_ENTRIES_PER_BUCKET) {
// we are good in our current bucket, put the values
currentBucket.putInt(currentBucketOffset + BUCKET_HEADER_LENGTH + (countInCurrentBucket * HASH_CODE_LEN), hashCode); // hash code
currentBucket.putLong(currentBucketOffset + BUCKET_POINTER_START_OFFSET + (countInCurrentBucket * POINTER_LEN), pointer); // pointer
currentBucket.putInt(currentBucketOffset + HEADER_COUNT_OFFSET, countInCurrentBucket + 1); // update count
- }
- else {
+ } else {
// we need a new overflow bucket
MemorySegment overflowSeg;
final int overflowBucketNum;
@@ -684,8 +667,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
partition.overflowSegments[partition.numOverflowSegments] = overflowSeg;
partition.numOverflowSegments++;
- }
- else {
+ checkForResize = true;
+ } else {
// there is space in the last overflow segment
overflowBucketNum = partition.numOverflowSegments - 1;
overflowSeg = partition.overflowSegments[overflowBucketNum];
@@ -708,7 +691,13 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer
// set the count to one
- overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
+ overflowSeg.putInt(overflowBucketOffset + HEADER_COUNT_OFFSET, 1);
+ if(checkForResize && !this.isResizing) {
+ // check if we should resize buckets
+ if(this.buckets.length <= getOverflowSegmentCount()) {
+ resizeHashTable();
+ }
+ }
}
}
@@ -782,11 +771,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
if (s > 0) {
return this.availableMemory.remove(s-1);
} else {
- throw new RuntimeException("Memory ran out. numPartitions: " + this.partitions.size() +
- " minPartition: " + getMinPartition() +
- " maxPartition: " + getMaxPartition() +
- " number of overflow segments: " + getOverflowSegmentCount() +
- " bucketSize: " + this.buckets.length);
+ throw new RuntimeException("Memory ran out. " + getMemoryConsumptionString());
}
}
@@ -798,7 +783,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
* Gets the number of partitions to be used for an initial hash-table, when no estimates are
* available.
* <p>
- * The current logic makes sure that there are always between 10 and 127 partitions, and close
+ * The current logic makes sure that there are always between 10 and 32 partitions, and close
* to 0.1 of the number of buffers.
*
* @param numBuffers The number of buffers available.
@@ -808,6 +793,53 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
return Math.max(10, Math.min(numBuffers / 10, MAX_NUM_PARTITIONS));
}
+ /**
+ * @return String containing a summary of the memory consumption for error messages
+ */
+ private String getMemoryConsumptionString() {
+ String result = new String("numPartitions: " + this.partitions.size() +
+ " minPartition: " + getMinPartition() +
+ " maxPartition: " + getMaxPartition() +
+ " number of overflow segments: " + getOverflowSegmentCount() +
+ " bucketSize: " + this.buckets.length +
+ " Overall memory: " + getSize() +
+ " Partition memory: " + getPartitionSize());
+ return result;
+ }
+
+ /**
+ * Size of all memory segments owned by this hash table
+ *
+ * @return size in bytes
+ */
+ private long getSize() {
+ long numSegments = 0;
+ numSegments += this.availableMemory.size();
+ numSegments += this.buckets.length;
+ for(InMemoryPartition<T> p : this.partitions) {
+ numSegments += p.getBlockCount();
+ numSegments += p.numOverflowSegments;
+ }
+ numSegments += this.compactionMemory.getBlockCount();
+ return numSegments*this.segmentSize;
+ }
+
+ /**
+ * Size of all memory segments owned by the partitions of this hash table excluding the compaction partition
+ *
+ * @return size in bytes
+ */
+ private long getPartitionSize() {
+ long numSegments = 0;
+ for(InMemoryPartition<T> p : this.partitions) {
+ numSegments += p.getBlockCount();
+ }
+ return numSegments*this.segmentSize;
+ }
+
+ /**
+ * @return number of memory segments in the largest partition
+ */
private int getMaxPartition() {
int maxPartition = 0;
for(InMemoryPartition<T> p1 : this.partitions) {
@@ -818,6 +850,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
return maxPartition;
}
+ /**
+ * @return number of memory segments in the smallest partition
+ */
private int getMinPartition() {
int minPartition = Integer.MAX_VALUE;
for(InMemoryPartition<T> p1 : this.partitions) {
@@ -828,6 +863,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
return minPartition;
}
+ /**
+ * @return number of memory segments used in overflow buckets
+ */
private int getOverflowSegmentCount() {
int result = 0;
for(InMemoryPartition<T> p : this.partitions) {
@@ -836,29 +874,20 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
return result;
}
+ /**
+ * tries to find a good value for the number of buckets
+ * will ensure that the number of buckets is a multiple of numPartitions
+ *
+ * @return number of buckets
+ */
private static final int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes) {
- // ----------------------------------------------------------------------------------------
- // the following observations hold:
- // 1) If the records are assumed to be very large, then many buffers need to go to the partitions
- // and fewer to the table
- // 2) If the records are small, then comparatively many have to go to the buckets, and fewer to the
- // partitions
- // 3) If the bucket-table is chosen too small, we will eventually get many collisions and will grow the
- // hash table, incrementally adding buffers.
- // 4) If the bucket-table is chosen to be large and we actually need more buffers for the partitions, we
- // cannot subtract them afterwards from the table
- //
- // ==> We start with a comparatively small hash-table. We aim for a 200% utilization of the bucket table
- // when all the partition buffers are full. Most likely, that will cause some buckets to be re-hashed
- // and grab additional buffers away from the partitions.
- // NOTE: This decision may be subject to changes after conclusive experiments!
- // ----------------------------------------------------------------------------------------
-
final long totalSize = ((long) bufferSize) * numBuffers;
final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
- final long bucketBytes = numRecordsStorable * RECORD_TABLE_BYTES;
- final long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
-
+ final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
+ long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
+ while(numBuckets % numPartitions != 0) {
+ numBuckets++;
+ }
return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numBuckets;
}
@@ -874,22 +903,199 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
/**
+ * Attempts to double the number of buckets
+ *
+ * @return true on success
+ * @throws IOException
+ */
+ private boolean resizeHashTable() throws IOException {
+ final int newNumBuckets = 2*this.numBuckets;
+ final int bucketsPerSegment = this.bucketsPerSegmentMask + 1;
+ final int newNumSegments = (newNumBuckets + (bucketsPerSegment-1)) / bucketsPerSegment;
+ final int additionalSegments = newNumSegments-this.buckets.length;
+ final int numPartitions = this.partitions.size();
+ if(this.availableMemory.size() < additionalSegments) {
+ for(int i = 0; i < numPartitions; i++) {
+ compactPartition(i);
+ if(this.availableMemory.size() >= additionalSegments) {
+ break;
+ }
+ }
+ }
+ if(this.availableMemory.size() < additionalSegments || this.closed.get()) {
+ return false;
+ } else {
+ this.isResizing = true;
+ // allocate new buckets
+ final int startOffset = (this.numBuckets * HASH_BUCKET_SIZE) % this.segmentSize;
+ MemorySegment[] newBuckets = new MemorySegment[additionalSegments];
+ final int oldNumBuckets = this.numBuckets;
+ final int oldNumSegments = this.buckets.length;
+ MemorySegment[] mergedBuckets = new MemorySegment[newNumSegments];
+ System.arraycopy(this.buckets, 0, mergedBuckets, 0, this.buckets.length);
+ System.arraycopy(newBuckets, 0, mergedBuckets, this.buckets.length, newBuckets.length);
+ this.buckets = mergedBuckets;
+ this.numBuckets = newNumBuckets;
+ // initialize all new buckets
+ boolean oldSegment = (startOffset != 0);
+ final int startSegment = oldSegment ? (oldNumSegments-1) : oldNumSegments;
+ for (int i = startSegment, bucket = oldNumBuckets; i < newNumSegments && bucket < this.numBuckets; i++) {
+ MemorySegment seg;
+ int bucketOffset = 0;
+ if(oldSegment) { // the first couple of new buckets may be located on an old segment
+ seg = this.buckets[i];
+ for (int k = (oldNumBuckets % bucketsPerSegment) ; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
+ bucketOffset = k * HASH_BUCKET_SIZE;
+ // initialize the header fields
+ seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
+ seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
+ seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+ }
+ } else {
+ seg = getNextBuffer();
+ // go over all buckets in the segment
+ for (int k = 0; k < bucketsPerSegment && bucket < this.numBuckets; k++, bucket++) {
+ bucketOffset = k * HASH_BUCKET_SIZE;
+ // initialize the header fields
+ seg.put(bucketOffset + HEADER_PARTITION_OFFSET, assignPartition(bucket, (byte)numPartitions));
+ seg.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
+ seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+ }
+ }
+ this.buckets[i] = seg;
+ oldSegment = false; // we write on at most one old segment
+ }
+ int hashOffset = 0;
+ int hash = 0;
+ int pointerOffset = 0;
+ long pointer = 0;
+ IntArrayList hashList = new IntArrayList(NUM_ENTRIES_PER_BUCKET);
+ LongArrayList pointerList = new LongArrayList(NUM_ENTRIES_PER_BUCKET);
+ IntArrayList overflowHashes = new IntArrayList(64);
+ LongArrayList overflowPointers = new LongArrayList(64);
+ // go over all buckets and split them between old and new buckets
+ for(int i = 0; i < numPartitions; i++) {
+ InMemoryPartition<T> partition = this.partitions.get(i);
+ final MemorySegment[] overflowSegments = partition.overflowSegments;
+ int posHashCode = 0;
+ for (int j = 0, bucket = i; j < this.buckets.length && bucket < oldNumBuckets; j++) {
+ MemorySegment segment = this.buckets[j];
+ // go over all buckets in the segment belonging to the partition
+ for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < oldNumBuckets; k += numPartitions, bucket += numPartitions) {
+ int bucketOffset = k * HASH_BUCKET_SIZE;
+ if((int)segment.get(bucketOffset + HEADER_PARTITION_OFFSET) != i) {
+ throw new IOException("Accessed wrong bucket! wanted: " + i + " got: " + segment.get(bucketOffset + HEADER_PARTITION_OFFSET));
+ }
+ // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+ int countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+ int numInSegment = 0;
+ pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+ hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
+ while (true) {
+ while (numInSegment < countInSegment) {
+ hash = segment.getInt(hashOffset);
+ if((hash % this.numBuckets) != bucket && (hash % this.numBuckets) != (bucket+oldNumBuckets)) {
+ throw new IOException("wanted: " + bucket + " or " + (bucket + oldNumBuckets) + " got: " + hash%this.numBuckets);
+ }
+ pointer = segment.getLong(pointerOffset);
+ hashList.add(hash);
+ pointerList.add(pointer);
+ pointerOffset += POINTER_LEN;
+ hashOffset += HASH_CODE_LEN;
+ numInSegment++;
+ }
+ // this segment is done. check if there is another chained bucket
+ final long forwardPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
+ if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+ break;
+ }
+ final int overflowSegNum = (int) (forwardPointer >>> 32);
+ segment = overflowSegments[overflowSegNum];
+ bucketOffset = (int)(forwardPointer & 0xffffffff);
+ countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+ pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+ hashOffset = bucketOffset + BUCKET_HEADER_LENGTH;
+ numInSegment = 0;
+ }
+ segment = this.buckets[j];
+ bucketOffset = k * HASH_BUCKET_SIZE;
+ // reset bucket for re-insertion
+ segment.putInt(bucketOffset + HEADER_COUNT_OFFSET, 0);
+ segment.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET);
+ // refill table
+ if(hashList.size() != pointerList.size()) {
+ throw new IOException("Pointer and hash counts do not match. hashes: " + hashList.size() + " pointer: " + pointerList.size());
+ }
+ int newSegmentIndex = (bucket + oldNumBuckets) / bucketsPerSegment;
+ MemorySegment newSegment = this.buckets[newSegmentIndex];
+ // we need to avoid overflows in the first run
+ int oldBucketCount = 0;
+ int newBucketCount = 0;
+ while(!hashList.isEmpty()) {
+ hash = hashList.removeInt(hashList.size()-1);
+ pointer = pointerList.removeLong(pointerList.size()-1);
+ posHashCode = hash % this.numBuckets;
+ if(posHashCode == bucket && oldBucketCount < NUM_ENTRIES_PER_BUCKET) {
+ bucketOffset = (bucket % bucketsPerSegment) * HASH_BUCKET_SIZE;
+ insertBucketEntryFromStart(partition, segment, bucketOffset, hash, pointer);
+ oldBucketCount++;
+ } else if(posHashCode == (bucket + oldNumBuckets) && newBucketCount < NUM_ENTRIES_PER_BUCKET) {
+ bucketOffset = ((bucket + oldNumBuckets) % bucketsPerSegment) * HASH_BUCKET_SIZE;
+ insertBucketEntryFromStart(partition, newSegment, bucketOffset, hash, pointer);
+ newBucketCount++;
+ } else if(posHashCode == (bucket + oldNumBuckets) || posHashCode == bucket) {
+ overflowHashes.add(hash);
+ overflowPointers.add(pointer);
+ } else {
+ throw new IOException("Accessed wrong bucket. Target: " + bucket + " or " + (bucket + oldNumBuckets) + " Hit: " + posHashCode);
+ }
+ }
+ hashList.clear();
+ pointerList.clear();
+ }
+ }
+ // reset partition's overflow buckets and reclaim their memory
+ this.availableMemory.addAll(partition.resetOverflowBuckets());
+ // clear overflow lists
+ int bucketArrayPos = 0;
+ int bucketInSegmentPos = 0;
+ MemorySegment bucket = null;
+ while(!overflowHashes.isEmpty()) {
+ hash = overflowHashes.removeInt(overflowHashes.size()-1);
+ pointer = overflowPointers.removeLong(overflowPointers.size()-1);
+ posHashCode = hash % this.numBuckets;
+ bucketArrayPos = posHashCode >>> this.bucketsPerSegmentBits;
+ bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
+ bucket = this.buckets[bucketArrayPos];
+ insertBucketEntryFromStart(partition, bucket, bucketInSegmentPos, hash, pointer);
+ }
+ overflowHashes.clear();
+ overflowPointers.clear();
+ }
+ this.isResizing = false;
+ return true;
+ }
+ }
+
+ /**
* Compacts (garbage collects) partition with copy-compact strategy using compaction partition
*
- * @param partition partition number
+ * @param partitionNumber partition to compact
* @throws IOException
*/
- private void compactPartition(int partitionNumber) throws IOException {
- // stop if no garbage exists or table is closed
- if(this.partitions.get(partitionNumber).isCompacted() || this.closed.get()) {
+ private void compactPartition(final int partitionNumber) throws IOException {
+ // do nothing if table was closed, parameter is invalid or no garbage exists
+ if(this.closed.get() || partitionNumber >= this.partitions.size() || this.partitions.get(partitionNumber).isCompacted()) {
return;
}
// release all segments owned by compaction partition
this.compactionMemory.clearAllMemory(availableMemory);
this.compactionMemory.allocateSegments(1);
+ this.compactionMemory.pushDownPages();
T tempHolder = this.buildSideSerializer.createInstance();
+ final int numPartitions = this.partitions.size();
InMemoryPartition<T> partition = this.partitions.remove(partitionNumber);
- final int numPartitions = this.partitions.size() + 1; // dropped one earlier
+ MemorySegment[] overflowSegments = partition.overflowSegments;
long pointer = 0L;
int pointerOffset = 0;
int bucketOffset = 0;
@@ -900,73 +1106,62 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
for (int k = bucket % bucketsPerSegment; k < bucketsPerSegment && bucket < this.numBuckets; k += numPartitions, bucket += numPartitions) {
bucketOffset = k * HASH_BUCKET_SIZE;
if((int)segment.get(bucketOffset + HEADER_PARTITION_OFFSET) != partitionNumber) {
- throw new IOException("Accessed wrong bucket! ");
- }
- int count = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
- for (int j = 0; j < NUM_ENTRIES_PER_BUCKET && j < count; j++) {
- pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET + (j * POINTER_LEN);
- pointer = segment.getLong(pointerOffset);
- partition.readRecordAt(pointer, tempHolder);
- pointer = this.compactionMemory.appendRecord(tempHolder);
- segment.putLong(pointerOffset, pointer);
+ throw new IOException("Accessed wrong bucket! wanted: " + partitionNumber + " got: " + segment.get(bucketOffset + HEADER_PARTITION_OFFSET));
}
- long overflowPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
- if(overflowPointer != BUCKET_FORWARD_POINTER_NOT_SET) {
- // scan overflow buckets
- int current = NUM_ENTRIES_PER_BUCKET;
- bucketOffset = (int) (overflowPointer & 0xffffffff);
- pointerOffset = ((int) (overflowPointer & 0xffffffff)) + BUCKET_POINTER_START_OFFSET;
- int overflowSegNum = (int) (overflowPointer >>> 32);
- count += partition.overflowSegments[overflowSegNum].getInt(bucketOffset + HEADER_COUNT_OFFSET);
- while(current < count) {
- pointer = partition.overflowSegments[overflowSegNum].getLong(pointerOffset);
+ // loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
+ int countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+ int numInSegment = 0;
+ pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+ while (true) {
+ while (numInSegment < countInSegment) {
+ pointer = segment.getLong(pointerOffset);
partition.readRecordAt(pointer, tempHolder);
pointer = this.compactionMemory.appendRecord(tempHolder);
- partition.overflowSegments[overflowSegNum].putLong(pointerOffset, pointer);
- current++;
- if(current % NUM_ENTRIES_PER_BUCKET == 0) {
- count += partition.overflowSegments[overflowSegNum].getInt(bucketOffset + HEADER_COUNT_OFFSET);
- overflowPointer = partition.overflowSegments[overflowSegNum].getLong(bucketOffset + HEADER_FORWARD_OFFSET);
- if(overflowPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
- break;
- }
- overflowSegNum = (int) (overflowPointer >>> 32);
- bucketOffset = (int) (overflowPointer & 0xffffffff);
- pointerOffset = ((int) (overflowPointer & 0xffffffff)) + BUCKET_POINTER_START_OFFSET;
- } else {
- pointerOffset += POINTER_LEN;
- }
+ segment.putLong(pointerOffset, pointer);
+ pointerOffset += POINTER_LEN;
+ numInSegment++;
+ }
+ // this segment is done. check if there is another chained bucket
+ final long forwardPointer = segment.getLong(bucketOffset + HEADER_FORWARD_OFFSET);
+ if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
+ break;
}
+ final int overflowSegNum = (int) (forwardPointer >>> 32);
+ segment = overflowSegments[overflowSegNum];
+ bucketOffset = (int)(forwardPointer & 0xffffffff);
+ countInSegment = segment.getInt(bucketOffset + HEADER_COUNT_OFFSET);
+ pointerOffset = bucketOffset + BUCKET_POINTER_START_OFFSET;
+ numInSegment = 0;
}
+ segment = this.buckets[i];
}
}
// swap partition with compaction partition
this.compactionMemory.setPartitionNumber(partitionNumber);
this.partitions.add(partitionNumber, compactionMemory);
- this.compactionMemory = partition;
- this.partitions.get(partitionNumber).overflowSegments = this.compactionMemory.overflowSegments;
- this.partitions.get(partitionNumber).numOverflowSegments = this.compactionMemory.numOverflowSegments;
- this.partitions.get(partitionNumber).nextOverflowBucket = this.compactionMemory.nextOverflowBucket;
+ this.partitions.get(partitionNumber).overflowSegments = partition.overflowSegments;
+ this.partitions.get(partitionNumber).numOverflowSegments = partition.numOverflowSegments;
+ this.partitions.get(partitionNumber).nextOverflowBucket = partition.nextOverflowBucket;
this.partitions.get(partitionNumber).setCompaction(true);
+ //this.partitions.get(partitionNumber).pushDownPages();
+ this.compactionMemory = partition;
this.compactionMemory.resetRecordCounter();
this.compactionMemory.setPartitionNumber(-1);
+ this.compactionMemory.overflowSegments = null;
+ this.compactionMemory.numOverflowSegments = 0;
+ this.compactionMemory.nextOverflowBucket = 0;
// try to allocate maximum segment count
- int maxSegmentNumber = 0;
- for (InMemoryPartition<T> e : this.partitions) {
- if(e.getBlockCount() > maxSegmentNumber) {
- maxSegmentNumber = e.getBlockCount();
- }
- }
+ this.compactionMemory.clearAllMemory(this.availableMemory);
+ int maxSegmentNumber = this.getMaxPartition();
this.compactionMemory.allocateSegments(maxSegmentNumber);
- if(this.compactionMemory.getBlockCount() > maxSegmentNumber) {
- this.compactionMemory.releaseSegments(maxSegmentNumber, availableMemory);
- }
+ this.compactionMemory.resetRWViews();
+ this.compactionMemory.pushDownPages();
}
/**
* Compacts partition but may not reclaim all garbage
*
- * @param partition partition number
+ * @param partitionNumber partition number
* @throws IOException
*/
@SuppressWarnings("unused")
@@ -1043,6 +1238,12 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
}
}
+ /**
+ * utility function that inserts all entries from a bucket and its overflow buckets into the cache
+ *
+ * @return true if last bucket was not reached yet
+ * @throws IOException
+ */
private boolean fillCache() throws IOException {
if(currentBucketIndex >= table.numBuckets) {
return false;
@@ -1069,7 +1270,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
partition.readRecordAt(pointer, target);
cache.add(target);
} catch (IOException e) {
- throw new RuntimeException("Error deserializing record from the hashtable: " + e.getMessage(), e);
+ throw new RuntimeException("Error deserializing record from the Hash Table: " + e.getMessage(), e);
}
}
// this segment is done. check if there is another chained bucket
@@ -1124,8 +1325,8 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// get the basic characteristics of the bucket
final int partitionNumber = bucket.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET);
- final InMemoryPartition<T> partition = partitions.get(partitionNumber);
- final MemorySegment[] overflowSegments = partition.overflowSegments;
+ final InMemoryPartition<T> p = partitions.get(partitionNumber);
+ final MemorySegment[] overflowSegments = p.overflowSegments;
this.pairComparator.setReference(probeSideRecord);
@@ -1150,10 +1351,10 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
// deserialize the key to check whether it is really equal, or whether we had only a hash collision
try {
- partition.readRecordAt(pointer, targetForMatch);
+ p.readRecordAt(pointer, targetForMatch);
if (this.pairComparator.equalToReference(targetForMatch)) {
- this.partition = partition;
+ this.partition = p;
this.bucket = bucket;
this.pointerOffsetInBucket = pointerOffset;
return true;
@@ -1187,9 +1388,46 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
if(closed.get()) {
return;
}
- long newPointer = this.partition.appendRecord(record);
+ long newPointer;
+ try {
+ newPointer = this.partition.appendRecord(record);
+ } catch (EOFException e) {
+ // system is out of memory so we attempt to reclaim memory with a copy compact run
+ try {
+ int partitionNumber = this.partition.getPartitionNumber();
+ compactPartition(partitionNumber);
+ // retry append
+ this.partition = partitions.get(partitionNumber);
+ newPointer = this.partition.appendRecord(record);
+ } catch (EOFException ex) {
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
+ } catch (IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
+ }
+ } catch (IndexOutOfBoundsException e) {
+ // system is out of memory so we attempt to reclaim memory with a copy compact run
+ try {
+ int partitionNumber = this.partition.getPartitionNumber();
+ compactPartition(partitionNumber);
+ // retry append
+ this.partition = partitions.get(partitionNumber);
+ newPointer = this.partition.appendRecord(record);
+ } catch (EOFException ex) {
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
+ } catch (IndexOutOfBoundsException ex) {
+ throw new RuntimeException("Memory ran out. Compaction failed. " +
+ getMemoryConsumptionString() +
+ " Message: " + ex.getMessage());
+ }
+ }
this.bucket.putLong(this.pointerOffsetInBucket, newPointer);
- this.partition.setCompaction(false); //FIXME Do we really create garbage here?
+ this.partition.setCompaction(false);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index 3ccca9c..b90ca1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -57,9 +57,9 @@ public class InMemoryPartition<T> {
private final ListMemorySegmentSource availableMemory;
- private final WriteView writeView;
+ private WriteView writeView;
- private final ReadView readView;
+ private ReadView readView;
private long recordCounter; // number of records in this partition including garbage
@@ -69,6 +69,10 @@ public class InMemoryPartition<T> {
private boolean compacted; // overwritten records since allocation or last full compaction
+ private int pageSize; // segment size in bytes
+
+ private int pageSizeInBits;
+
// --------------------------------------------------------------------------------------------------
@@ -100,6 +104,10 @@ public class InMemoryPartition<T> {
// empty partitions have no garbage
this.compacted = true;
+ this.pageSize = pageSize;
+
+ this.pageSizeInBits = pageSizeInBits;
+
this.writeView = new WriteView(this.partitionPages, memSource, pageSize, pageSizeInBits);
this.readView = new ReadView(this.partitionPages, pageSize, pageSizeInBits);
}
@@ -148,6 +156,38 @@ public class InMemoryPartition<T> {
}
/**
+ * resets read and write views and should only be used on compaction partition
+ */
+ public void resetRWViews() {
+ this.writeView.resetTo(0L);
+ this.readView.setReadPosition(0L);
+ }
+
+ public void pushDownPages() {
+ this.writeView = new WriteView(this.partitionPages, availableMemory, pageSize, pageSizeInBits);
+ this.readView = new ReadView(this.partitionPages, pageSize, pageSizeInBits);
+ }
+
+ /**
+ * resets overflow bucket counters and returns freed memory and should only be used for resizing
+ *
+ * @return freed memory segments
+ */
+ public ArrayList<MemorySegment> resetOverflowBuckets() {
+ this.numOverflowSegments = 0;
+ this.nextOverflowBucket = 0;
+
+ ArrayList<MemorySegment> result = new ArrayList<MemorySegment>(this.overflowSegments.length);
+ for(int i = 0; i < this.overflowSegments.length; i++) {
+ if(this.overflowSegments[i] != null) {
+ result.add(this.overflowSegments[i]);
+ }
+ }
+ this.overflowSegments = new MemorySegment[2];
+ return result;
+ }
+
+ /**
* @return true if garbage exists in partition
*/
public boolean isCompacted() {
@@ -179,8 +219,7 @@ public class InMemoryPartition<T> {
this.serializer.serialize(record, this.writeView);
this.recordCounter++;
return pointer;
- }
- catch (EOFException e) {
+ } catch (EOFException e) {
// we ran out of pages.
// first, reset the pages and then we need to trigger a compaction
//int oldCurrentBuffer =
@@ -224,8 +263,7 @@ public class InMemoryPartition<T> {
for (int k = 0; k < this.numOverflowSegments; k++) {
target.add(this.overflowSegments[k]);
}
- }
-
+ }
// return the partition buffers
target.addAll(this.partitionPages);
this.partitionPages.clear();
@@ -248,12 +286,6 @@ public class InMemoryPartition<T> {
}
}
- public void releaseSegments(int maxSegmentNumber, ArrayList<MemorySegment> target) {
- while(getBlockCount() > maxSegmentNumber) {
- target.add(partitionPages.remove(partitionPages.size()-1));
- }
- }
-
@Override
public String toString() {
return String.format("Partition %d - %d records, %d partition blocks, %d bucket overflow blocks", getPartitionNumber(), getRecordCount(), getBlockCount(), this.numOverflowSegments);
@@ -285,6 +317,7 @@ public class InMemoryPartition<T> {
this.memSource = memSource;
this.sizeBits = pageSizeBits;
this.sizeMask = pageSize - 1;
+ this.segmentNumberOffset = 0;
}
@@ -326,7 +359,7 @@ public class InMemoryPartition<T> {
private static final class ReadView extends AbstractPagedInputView implements SeekableDataInputView {
private final ArrayList<MemorySegment> segments;
-
+
private final int segmentSizeBits;
private final int segmentSizeMask;
@@ -346,6 +379,7 @@ public class InMemoryPartition<T> {
this.segments = segments;
this.segmentSizeBits = segmentSizeBits;
this.segmentSizeMask = segmentSize - 1;
+ this.segmentNumberOffset = 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
index 27142f7..e985d8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/FileUtils.java
@@ -16,12 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
/**
* This is a utility class to deal with temporary files.
- *
*/
public final class FileUtils {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
new file mode 100644
index 0000000..c359211
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+/**
+ * Minimal implementation of an array-backed list of ints
+ */
+public class IntArrayList {
+
+ private int size;
+
+ private int[] array;
+
+ public IntArrayList(final int capacity) {
+ this.size = 0;
+ this.array = new int[capacity];
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean add(final int number) {
+ grow(size+1);
+ array[size++] = number;
+ return true;
+ }
+
+ public int removeInt(int index) {
+ if(index >= size) {
+ throw new IndexOutOfBoundsException("Index (" + index + ") is greater than or equal to list size (" + size + ")");
+ }
+ final int old = array[ index ];
+ size--;
+ if(index != size) {
+ System.arraycopy(array, index+1, array, index, size-index );
+ }
+ return old;
+ }
+
+ public void clear() {
+ size = 0;
+ }
+
+ public boolean isEmpty() {
+ return (size==0);
+ }
+
+ private void grow(final int length) {
+ if(length > array.length) {
+ final int newLength = (int)Math.max(Math.min(2L * array.length, Integer.MAX_VALUE-8), length);
+ final int[] t = new int[newLength];
+ System.arraycopy(array, 0, t, 0, size);
+ array = t;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
index 24df4e3..1e5fead 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyGroupedMutableObjectIterator.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.util;
import java.io.IOException;
@@ -28,10 +27,9 @@ import org.apache.flink.util.MutableObjectIterator;
/**
* The KeyValueIterator returns a key and all values that belong to the key (share the same key).
* A sub-iterator over all values with the same key is provided.
- *
*/
-public final class KeyGroupedMutableObjectIterator<E>
-{
+public final class KeyGroupedMutableObjectIterator<E> {
+
private final MutableObjectIterator<E> iterator;
private final TypeSerializer<E> serializer;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
new file mode 100644
index 0000000..926a00c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+/**
+ * Minimal implementation of an array-backed list of longs
+ */
+public class LongArrayList {
+
+ private int size;
+
+ private long[] array;
+
+ public LongArrayList(int capacity) {
+ this.size = 0;
+ this.array = new long[capacity];
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public boolean add(long number) {
+ grow(size+1);
+ array[size++] = number;
+ return true;
+ }
+
+ public long removeLong(int index) {
+ if(index >= size) {
+ throw new IndexOutOfBoundsException( "Index (" + index + ") is greater than or equal to list size (" + size + ")" );
+ }
+ final long old = array[index];
+ size--;
+ if(index != size) {
+ System.arraycopy(array, index+1, array, index, size-index );
+ }
+ return old;
+ }
+
+ public void clear() {
+ size = 0;
+ }
+
+ public boolean isEmpty() {
+ return (size==0);
+ }
+
+ private void grow(int length) {
+ if(length > array.length) {
+ final int newLength = (int)Math.max(Math.min(2L * array.length, Integer.MAX_VALUE-8), length);
+ final long[] t = new long[newLength];
+ System.arraycopy(array, 0, t, 0, size);
+ array = t;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/c04c9bbb/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
index fbc4f65..b644da1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
@@ -18,13 +18,13 @@
package org.apache.flink.runtime.operators.hash;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparat
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;
+import org.powermock.reflect.Whitebox;
public class MemoryHashTableTest {
@@ -82,12 +83,16 @@ public class MemoryHashTableTest {
private final TypePairComparator<IntPair, IntList> pairComparatorPL =new IntPairListPairComparator();
- private final int SIZE = 80; //FIXME 75 triggers serialization bug in testVariableLengthBuildAndRetrieve
+ private final int SIZE = 75;
private final int NUM_PAIRS = 100000;
private final int NUM_LISTS = 100000;
+ private final int ADDITIONAL_MEM = 100;
+
+ private final int NUM_REWRITES = 10;
+
private final TypeSerializer<StringPair> serializerS = new StringPairSerializer();
@@ -96,7 +101,7 @@ public class MemoryHashTableTest {
private final TypePairComparator<StringPair, StringPair> pairComparatorS = new StringPairPairComparator();
-
+ @Test
public void testDifferentProbers() {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
@@ -106,11 +111,13 @@ public class MemoryHashTableTest {
AbstractHashTableProber<IntPair, IntPair> prober2 = table.getProber(comparator, pairComparator);
assertFalse(prober1 == prober2);
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
}
@Test
public void testBuildAndRetrieve() {
-
try {
final int NUM_MEM_PAGES = 32 * NUM_PAIRS / PAGE_SIZE;
@@ -134,8 +141,7 @@ public class MemoryHashTableTest {
table.close();
assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
}
@@ -143,7 +149,6 @@ public class MemoryHashTableTest {
@Test
public void testEntryIterator() {
-
try {
final int NUM_MEM_PAGES = SIZE * NUM_LISTS / PAGE_SIZE;
final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
@@ -167,8 +172,7 @@ public class MemoryHashTableTest {
assertTrue(sum == result);
assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
}
@@ -197,7 +201,8 @@ public class MemoryHashTableTest {
assertTrue(listProber.getMatchFor(lists[i], target));
assertArrayEquals(lists[i].getValue(), target.getValue());
}
-
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
} catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
@@ -218,7 +223,6 @@ public class MemoryHashTableTest {
try {
table.insert(lists[i]);
} catch (Exception e) {
- //System.out.println("index: " + i + " ");
throw e;
}
}
@@ -241,14 +245,13 @@ public class MemoryHashTableTest {
}
for (int i = 0; i < NUM_LISTS; i++) {
- assertTrue(prober.getMatchFor(overwriteLists[i], target));
+ assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
}
table.close();
assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
}
@@ -288,14 +291,13 @@ public class MemoryHashTableTest {
}
for (int i = 0; i < NUM_LISTS; i++) {
- assertTrue(prober.getMatchFor(lists[i], target));
+ assertTrue("" + i, prober.getMatchFor(lists[i], target));
assertArrayEquals(lists[i].getValue(), target.getValue());
}
table.close();
assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
}
@@ -343,8 +345,321 @@ public class MemoryHashTableTest {
table.close();
assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testRepeatedBuildAndRetrieve() {
+ try {
+ final int NUM_MEM_PAGES = SIZE * NUM_LISTS / PAGE_SIZE;
+
+ final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
+
+ AbstractMutableHashTable<IntList> table = new CompactingHashTable<IntList>(serializerV, comparatorV, getMemory(NUM_MEM_PAGES, PAGE_SIZE));
+ table.open();
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ try {
+ table.insert(lists[i]);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+
+ AbstractHashTableProber<IntList, IntList> prober = table.getProber(comparatorV, pairComparatorV);
+ IntList target = new IntList();
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue(prober.getMatchFor(lists[i], target));
+ assertArrayEquals(lists[i].getValue(), target.getValue());
+ }
+
+ IntList[] overwriteLists;
+
+ for(int k = 0; k < NUM_REWRITES; k++) {
+ overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
+ // test replacing
+ IntList tempHolder = new IntList();
+ for (int i = 0; i < NUM_LISTS; i++) {
+ table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+ }
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+ assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
+ }
+ }
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testProberUpdate() {
+ try {
+ final int NUM_MEM_PAGES = SIZE * NUM_LISTS / PAGE_SIZE;
+
+ final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
+
+ AbstractMutableHashTable<IntList> table = new CompactingHashTable<IntList>(serializerV, comparatorV, getMemory(NUM_MEM_PAGES, PAGE_SIZE));
+ table.open();
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ try {
+ table.insert(lists[i]);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
+
+ AbstractHashTableProber<IntList, IntList> prober = table.getProber(comparatorV, pairComparatorV);
+ IntList target = new IntList();
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue(""+i,prober.getMatchFor(lists[i], target));
+ assertArrayEquals(lists[i].getValue(), target.getValue());
+ prober.updateMatch(overwriteLists[i]);
+ }
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+ assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
+ }
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Error: " + e.getMessage());
}
- catch (Exception e) {
+ }
+
+ @Test
+ public void testResize() {
+ try {
+ final int NUM_MEM_PAGES = 30 * NUM_PAIRS / PAGE_SIZE;
+ final IntPair[] pairs = getRandomizedIntPairs(NUM_PAIRS, rnd);
+
+ List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+ CompactingHashTable<IntPair> table = new CompactingHashTable<IntPair>(serializer, comparator, memory);
+ table.open();
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ table.insert(pairs[i]);
+ }
+
+ AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
+ IntPair target = new IntPair();
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+ Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES + ADDITIONAL_MEM, table.getFreeMemory().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDoubleResize() {
+ try {
+ final int NUM_MEM_PAGES = 30 * NUM_PAIRS / PAGE_SIZE;
+ final IntPair[] pairs = getRandomizedIntPairs(NUM_PAIRS, rnd);
+
+ List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+ CompactingHashTable<IntPair> table = new CompactingHashTable<IntPair>(serializer, comparator, memory);
+ table.open();
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ table.insert(pairs[i]);
+ }
+
+ AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
+ IntPair target = new IntPair();
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+ Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+ b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES + ADDITIONAL_MEM + ADDITIONAL_MEM, table.getFreeMemory().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTripleResize() {
+ try {
+ final int NUM_MEM_PAGES = 30 * NUM_PAIRS / PAGE_SIZE;
+ final IntPair[] pairs = getRandomizedIntPairs(NUM_PAIRS, rnd);
+
+ List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+ CompactingHashTable<IntPair> table = new CompactingHashTable<IntPair>(serializer, comparator, memory);
+ table.open();
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ table.insert(pairs[i]);
+ }
+
+ AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
+ IntPair target = new IntPair();
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+ Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+ b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(2*ADDITIONAL_MEM, PAGE_SIZE));
+ b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_PAIRS; i++) {
+ assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+ assertEquals(pairs[i].getValue(), target.getValue());
+ }
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES + 4*ADDITIONAL_MEM, table.getFreeMemory().size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testResizeWithCompaction(){
+ try {
+ final int NUM_MEM_PAGES = (SIZE * NUM_LISTS / PAGE_SIZE);
+
+ final IntList[] lists = getRandomizedIntLists(NUM_LISTS, rnd);
+
+ List<MemorySegment> memory = getMemory(NUM_MEM_PAGES, PAGE_SIZE);
+ CompactingHashTable<IntList> table = new CompactingHashTable<IntList>(serializerV, comparatorV, memory);
+ table.open();
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ try {
+ table.insert(lists[i]);
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ AbstractHashTableProber<IntList, IntList> prober = table.getProber(comparatorV, pairComparatorV);
+ IntList target = new IntList();
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue(prober.getMatchFor(lists[i], target));
+ assertArrayEquals(lists[i].getValue(), target.getValue());
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(ADDITIONAL_MEM, PAGE_SIZE));
+ Boolean b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue(prober.getMatchFor(lists[i], target));
+ assertArrayEquals(lists[i].getValue(), target.getValue());
+ }
+
+ final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
+
+ // test replacing
+ IntList tempHolder = new IntList();
+ for (int i = 0; i < NUM_LISTS; i++) {
+ table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
+ }
+
+ Field list = Whitebox.getField(CompactingHashTable.class, "partitions");
+ @SuppressWarnings("unchecked")
+ ArrayList<InMemoryPartition<IntList>> partitions = (ArrayList<InMemoryPartition<IntList>>) list.get(table);
+ int numPartitions = partitions.size();
+ for(int i = 0; i < numPartitions; i++) {
+ Whitebox.invokeMethod(table, "compactPartition", i);
+ }
+
+ // make sure there is enough memory for resize
+ memory.addAll(getMemory(2*ADDITIONAL_MEM, PAGE_SIZE));
+ b = Whitebox.<Boolean>invokeMethod(table, "resizeHashTable");
+ assertTrue(b.booleanValue());
+
+ for (int i = 0; i < NUM_LISTS; i++) {
+ assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+ assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
+ }
+
+ table.close();
+ assertEquals("Memory lost", NUM_MEM_PAGES + 3*ADDITIONAL_MEM, table.getFreeMemory().size());
+ } catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
}
@@ -352,7 +667,6 @@ public class MemoryHashTableTest {
@Test
public void testVariableLengthStringBuildAndRetrieve() {
-
try {
final int NUM_MEM_PAGES = 40 * NUM_PAIRS / PAGE_SIZE;
@@ -364,13 +678,6 @@ public class MemoryHashTableTest {
MutableObjectIterator<StringPair> updateTester = new UniformStringPairGenerator(NUM_PAIRS, 1, false);
- //long start = 0L;
- //long end = 0L;
-
- //long first = System.currentTimeMillis();
-
- //System.out.println("Creating and filling CompactingHashMap...");
- //start = System.currentTimeMillis();
AbstractMutableHashTable<StringPair> table = new CompactingHashTable<StringPair>(serializerS, comparatorS, getMemory(NUM_MEM_PAGES, PAGE_SIZE));
table.open();
@@ -378,47 +685,27 @@ public class MemoryHashTableTest {
while(buildInput.next(target) != null) {
table.insert(target);
}
- //end = System.currentTimeMillis();
- //System.out.println("HashMap ready. Time: " + (end-start) + " ms");
-
- //System.out.println("Starting first probing run...");
- //start = System.currentTimeMillis();
AbstractHashTableProber<StringPair, StringPair> prober = table.getProber(comparatorS, pairComparatorS);
StringPair temp = new StringPair();
while(probeTester.next(target) != null) {
- assertTrue(prober.getMatchFor(target, temp));
+ assertTrue("" + target.getKey(), prober.getMatchFor(target, temp));
assertEquals(temp.getValue(), target.getValue());
}
- //end = System.currentTimeMillis();
- //System.out.println("Probing done. Time: " + (end-start) + " ms");
- //System.out.println("Starting update...");
- //start = System.currentTimeMillis();
while(updater.next(target) != null) {
target.setValue(target.getValue());
table.insertOrReplaceRecord(target, temp);
}
- //end = System.currentTimeMillis();
- //System.out.println("Update done. Time: " + (end-start) + " ms");
- //System.out.println("Starting second probing run...");
- //start = System.currentTimeMillis();
while (updateTester.next(target) != null) {
assertTrue(prober.getMatchFor(target, temp));
assertEquals(target.getValue(), temp.getValue());
}
- //end = System.currentTimeMillis();
- //System.out.println("Probing done. Time: " + (end-start) + " ms");
table.close();
-
- //end = System.currentTimeMillis();
- //System.out.println("Overall time: " + (end-first) + " ms");
-
assertEquals("Memory lost", NUM_MEM_PAGES, table.getFreeMemory().size());
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
fail("Error: " + e.getMessage());
}