You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/09/29 15:22:31 UTC
flink git commit: [FLINK-2763] [runtime] Fix hash table spilling
partition selection.
Repository: flink
Updated Branches:
refs/heads/master 16afb8ec6 -> af477563e
[FLINK-2763] [runtime] Fix hash table spilling partition selection.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af477563
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af477563
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af477563
Branch: refs/heads/master
Commit: af477563eb1acaab74da1a508c7e5fa37339c206
Parents: 16afb8e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 29 14:07:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 29 14:07:01 2015 +0200
----------------------------------------------------------------------
.../runtime/operators/hash/HashPartition.java | 15 ++++-
.../operators/hash/MutableHashTable.java | 4 +-
.../runtime/operators/hash/HashTableTest.java | 69 ++++++++++++++++++++
3 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 7baaee7..32fd74a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -198,6 +198,19 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
public final boolean isInMemory() {
return this.buildSideChannel == null;
}
+
+ /**
+ * Gets the number of memory segments used by this partition, which includes build side
+ * memory buffers and overflow memory segments.
+ *
+ * @return The number of occupied memory segments.
+ */
+ public int getNumOccupiedMemorySegments() {
+ // either the number of memory segments, or one for spilling
+ final int numPartitionBuffers = this.partitionBuffers != null ? this.partitionBuffers.length : 1;
+ return numPartitionBuffers + numOverflowSegments;
+ }
+
public int getBuildSideBlockCount() {
return this.partitionBuffers == null ? this.buildSideWriteBuffer.getBlockCount() : this.partitionBuffers.length;
@@ -284,7 +297,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
throw new RuntimeException("Bug in Hybrid Hash Join: " +
"Request to spill a partition that has already been spilled.");
}
- if (getBuildSideBlockCount() + this.numOverflowSegments < 2) {
+ if (getNumOccupiedMemorySegments() < 2) {
throw new RuntimeException("Bug in Hybrid Hash Join: " +
"Request to spill a partition with less than two buffers.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 2ad01aa..efaceea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -1093,8 +1093,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
for (int i = 0; i < partitions.size(); i++) {
HashPartition<BT, PT> p = partitions.get(i);
- if (p.isInMemory() && p.getBuildSideBlockCount() > largestNumBlocks) {
- largestNumBlocks = p.getBuildSideBlockCount();
+ if (p.isInMemory() && p.getNumOccupiedMemorySegments() > largestNumBlocks) {
+ largestNumBlocks = p.getNumOccupiedMemorySegments();
largestPartNum = i;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 0bca22a..92adc2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -21,19 +21,23 @@ package org.apache.flink.runtime.operators.hash;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteValueSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.types.ByteValue;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.File;
import java.util.ArrayList;
@@ -146,6 +150,47 @@ public class HashTableTest {
ioMan.shutdown();
}
}
+
+ /**
+ * This tests the case where no additional partition buffers are used at the point when spilling
+ * is triggered, testing that overflow bucket buffers are taken into account when deciding which
+ * partition to spill.
+ */
+ @Test
+ public void testSpillingFreesOnlyOverflowSegments() {
+ final IOManager ioMan = new IOManagerAsync();
+
+ final TypeSerializer<ByteValue> serializer = ByteValueSerializer.INSTANCE;
+ final TypeComparator<ByteValue> buildComparator = new ValueComparator<>(true, ByteValue.class);
+ final TypeComparator<ByteValue> probeComparator = new ValueComparator<>(true, ByteValue.class);
+
+ @SuppressWarnings("unchecked")
+ final TypePairComparator<ByteValue, ByteValue> pairComparator = Mockito.mock(TypePairComparator.class);
+
+ try {
+ final int pageSize = 32*1024;
+ final int numSegments = 34;
+
+ List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+ MutableHashTable<ByteValue, ByteValue> table = new MutableHashTable<>(
+ serializer, serializer, buildComparator, probeComparator,
+ pairComparator, memory, ioMan, 1, false);
+
+ table.open(new ByteValueIterator(100000000), new ByteValueIterator(1));
+
+ table.close();
+
+ checkNoTempFilesRemain(ioMan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ ioMan.shutdown();
+ }
+ }
// ------------------------------------------------------------------------
// Utilities
@@ -219,4 +264,28 @@ public class HashTableTest {
}
}
}
+
+ private static class ByteValueIterator implements MutableObjectIterator<ByteValue> {
+
+ private final long numRecords;
+ private long value = 0;
+
+ ByteValueIterator(long numRecords) {
+ this.numRecords = numRecords;
+ }
+
+ @Override
+ public ByteValue next(ByteValue aLong) {
+ return next();
+ }
+
+ @Override
+ public ByteValue next() {
+ if (value++ < numRecords) {
+ return new ByteValue((byte) 0);
+ } else {
+ return null;
+ }
+ }
+ }
}