You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/26 19:24:39 UTC
[1/2] flink git commit: [FLINK-2189] [runtime] Fix various issues in
hash table - check for memory availability before probing - correctly compute
memory required for recursive build fast path - remove all temp files
properly
Repository: flink
Updated Branches:
refs/heads/master e02c3019d -> 4a5fe4e63
[FLINK-2189] [runtime] Fix various issues in hash table
- check for memory availability before probing
- correctly compute memory required for recursive build fast path
- remove all temp files properly
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a5fe4e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a5fe4e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a5fe4e6
Branch: refs/heads/master
Commit: 4a5fe4e631d53c0c940c2899978ee72e23690c86
Parents: ced6a19
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 26 17:14:09 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 26 18:48:17 2015 +0200
----------------------------------------------------------------------
.../iomanager/AsynchronousFileIOChannel.java | 4 +
.../runtime/operators/hash/HashPartition.java | 82 +++----
.../operators/hash/MutableHashTable.java | 143 ++++++------
.../hash/ReOpenableMutableHashTable.java | 2 +-
.../runtime/operators/hash/HashTableTest.java | 221 +++++++++++++++++++
.../hash/RecordsAndWidthsCombinationCheck.java | 199 +++++++++++++++++
6 files changed, 517 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index cca04b7..aefeddb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -294,6 +294,10 @@ final class SegmentReadRequest implements ReadRequest {
private final MemorySegment segment;
protected SegmentReadRequest(AsynchronousFileIOChannel<MemorySegment, ReadRequest> targetChannel, MemorySegment segment) {
+ if (segment == null) {
+ throw new NullPointerException("Illegal read request with null memory segment.");
+ }
+
this.channel = targetChannel;
this.segment = segment;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index cee9ebb..b899acd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.hash;
import java.io.EOFException;
@@ -41,14 +40,13 @@ import org.apache.flink.runtime.memorymanager.AbstractPagedOutputView;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
-
/**
*
* @param <BT> The type of the build side records.
* @param <PT> The type of the probe side records.
*/
-public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView
-{
+public class HashPartition<BT, PT> extends AbstractPagedInputView implements SeekableDataInputView {
+
// --------------------------------- Table Structure Auxiliaries ------------------------------------
protected MemorySegment[] overflowSegments; // segments in which overflow buckets from the table structure are stored
@@ -71,7 +69,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
private int finalBufferLimit;
- private BuildSideBuffer<BT> buildSideWriteBuffer;
+ private BuildSideBuffer buildSideWriteBuffer;
protected ChannelWriterOutputView probeSideBuffer;
@@ -107,8 +105,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
// --------------------------------------------------------------------------------------------------
-
-
/**
* Creates a new partition, initially in memory, with one buffer for the build side. The partition is
* initialized to expect record insertions for the build side.
@@ -136,7 +132,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
this.numOverflowSegments = 0;
this.nextOverflowBucket = 0;
- this.buildSideWriteBuffer = new BuildSideBuffer<BT>(initialBuffer, memSource);
+ this.buildSideWriteBuffer = new BuildSideBuffer(initialBuffer, memSource);
}
/**
@@ -239,8 +235,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
* @return A pointer to the object in the partition, or <code>-1</code>, if the partition is spilled.
* @throws IOException Thrown, when this is a spilled partition and the write failed.
*/
- public final long insertIntoBuildBuffer(BT record) throws IOException
- {
+ public final long insertIntoBuildBuffer(BT record) throws IOException {
this.buildSideRecordCounter++;
if (isInMemory()) {
@@ -263,8 +258,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
* @param record The record to be inserted into the probe side buffers.
* @throws IOException Thrown, if the buffer is full, needs to be spilled, and spilling causes an error.
*/
- public final void insertIntoProbeBuffer(PT record) throws IOException
- {
+ public final void insertIntoProbeBuffer(PT record) throws IOException {
this.probeSideSerializer.serialize(record, this.probeSideBuffer);
this.probeSideRecordCounter++;
}
@@ -331,8 +325,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
}
/**
- * @param freeMemory
- * @param spilledPartitions
* @return The number of write-behind buffers reclaimable after this method call.
*
* @throws IOException
@@ -351,8 +343,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
this.numOverflowSegments = 0;
this.nextOverflowBucket = 0;
// return the partition buffers
- for (int i = 0; i < this.partitionBuffers.length; i++) {
- freeMemory.add(this.partitionBuffers[i]);
+ for (MemorySegment partitionBuffer : this.partitionBuffers) {
+ freeMemory.add(partitionBuffer);
}
this.partitionBuffers = null;
return 0;
@@ -366,7 +358,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
this.probeSideChannel.close();
this.buildSideChannel.deleteChannel();
this.probeSideChannel.deleteChannel();
-
return 0;
}
else {
@@ -378,10 +369,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
}
}
-
- public void clearAllMemory(List<MemorySegment> target)
- {
+ public void clearAllMemory(List<MemorySegment> target) {
// return current buffers from build side and probe side
if (this.buildSideWriteBuffer != null) {
if (this.buildSideWriteBuffer.getCurrentSegment() != null) {
@@ -405,8 +394,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
// return the partition buffers
if (this.partitionBuffers != null) {
- for (int k = 0; k < this.partitionBuffers.length; k++) {
- target.add(this.partitionBuffers[k]);
+ for (MemorySegment partitionBuffer : this.partitionBuffers) {
+ target.add(partitionBuffer);
}
this.partitionBuffers = null;
}
@@ -421,15 +410,13 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
this.probeSideChannel.close();
this.probeSideChannel.deleteChannel();
}
-
}
catch (IOException ioex) {
throw new RuntimeException("Error deleting the partition files. Some temporary files might not be removed.");
}
}
- final PartitionIterator getPartitionIterator(TypeComparator<BT> comparator) throws IOException
- {
+ final PartitionIterator getPartitionIterator(TypeComparator<BT> comparator) throws IOException {
return new PartitionIterator(comparator);
}
@@ -457,20 +444,13 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
this.probeSideChannel = ioAccess.createBlockChannelWriter(probeChannelEnumerator.next(), bufferReturnQueue);
this.probeSideBuffer = new ChannelWriterOutputView(this.probeSideChannel, this.memorySegmentSize);
}
-
-
-
-
-
-
// --------------------------------------------------------------------------------------------------
// Methods to provide input view abstraction for reading probe records
// --------------------------------------------------------------------------------------------------
- public void setReadPosition(long pointer)
- {
+ public void setReadPosition(long pointer) {
final int bufferNum = (int) (pointer >>> this.segmentSizeBits);
final int offset = (int) (pointer & (this.memorySegmentSize - 1));
@@ -480,8 +460,6 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
}
-
-
@Override
protected MemorySegment nextSegment(MemorySegment current) throws IOException {
this.currentBufferNum++;
@@ -500,8 +478,8 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
// ============================================================================================
- protected static final class BuildSideBuffer<BT> extends AbstractPagedOutputView
- {
+ protected static final class BuildSideBuffer extends AbstractPagedOutputView {
+
private final ArrayList<MemorySegment> targetList;
private final MemorySegmentSource memSource;
@@ -513,8 +491,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
private final int sizeBits;
- private BuildSideBuffer(MemorySegment initialSegment, MemorySegmentSource memSource)
- {
+ private BuildSideBuffer(MemorySegment initialSegment, MemorySegmentSource memSource) {
super(initialSegment, initialSegment.size(), 0);
this.targetList = new ArrayList<MemorySegment>();
@@ -524,8 +501,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
@Override
- protected MemorySegment nextSegment(MemorySegment current, int bytesUsed) throws IOException
- {
+ protected MemorySegment nextSegment(MemorySegment current, int bytesUsed) throws IOException {
finalizeSegment(current, bytesUsed);
final MemorySegment next;
@@ -553,8 +529,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
return this.currentBlockNumber + 1;
}
- int spill(BlockChannelWriter<MemorySegment> writer) throws IOException
- {
+ int spill(BlockChannelWriter<MemorySegment> writer) throws IOException {
this.writer = writer;
final int numSegments = this.targetList.size();
for (int i = 0; i < numSegments; i++) {
@@ -564,8 +539,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
return numSegments;
}
- MemorySegment[] close() throws IOException
- {
+ MemorySegment[] close() throws IOException {
final MemorySegment current = getCurrentSegment();
if (current == null) {
throw new IllegalStateException("Illegal State in HashPartition: No current buffer when finilizing build side.");
@@ -575,7 +549,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
if (this.writer == null) {
this.targetList.add(current);
- MemorySegment[] buffers = (MemorySegment[]) this.targetList.toArray(new MemorySegment[this.targetList.size()]);
+ MemorySegment[] buffers = this.targetList.toArray(new MemorySegment[this.targetList.size()]);
this.targetList.clear();
return buffers;
} else {
@@ -584,29 +558,26 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
}
}
- private void finalizeSegment(MemorySegment seg, int bytesUsed) {
- }
+ private void finalizeSegment(MemorySegment seg, int bytesUsed) {}
}
// ============================================================================================
- final class PartitionIterator implements MutableObjectIterator<BT>
- {
+ final class PartitionIterator implements MutableObjectIterator<BT> {
+
private final TypeComparator<BT> comparator;
private long currentPointer;
private int currentHashCode;
- private PartitionIterator(final TypeComparator<BT> comparator) throws IOException
- {
+ private PartitionIterator(final TypeComparator<BT> comparator) throws IOException {
this.comparator = comparator;
setReadPosition(0);
}
- public final BT next(BT reuse) throws IOException
- {
+ public final BT next(BT reuse) throws IOException {
final int pos = getCurrentPositionInSegment();
final int buffer = HashPartition.this.currentBufferNum;
@@ -621,8 +592,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See
}
}
- public final BT next() throws IOException
- {
+ public final BT next() throws IOException {
final int pos = getCurrentPositionInSegment();
final int buffer = HashPartition.this.currentBufferNum;
http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index b0042fc..7661808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.iterative.io.HashPartitionIterator;
import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.runtime.util.MathUtils;
import org.apache.flink.util.MutableObjectIterator;
@@ -83,6 +82,7 @@ import org.apache.flink.util.MutableObjectIterator;
* @param <BT> The type of records from the build side that are stored in the hash table.
* @param <PT> The type of records from the probe side that are stored in the hash table.
*/
+@SuppressWarnings("ForLoopReplaceableByForEach")
public class MutableHashTable<BT, PT> implements MemorySegmentSource {
private static final Logger LOG = LoggerFactory.getLogger(MutableHashTable.class);
@@ -125,23 +125,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
private static final int POINTER_LEN = 8;
/**
- * The number of bytes for the serialized record length in the partition buffers.
- */
- private static final int SERIALIZED_LENGTH_FIELD_BYTES = 0;
-
- /**
* The number of bytes that the entry in the hash structure occupies, in bytes.
* It corresponds to a 4 byte hash value and an 8 byte pointer.
*/
private static final int RECORD_TABLE_BYTES = HASH_CODE_LEN + POINTER_LEN;
- /**
- * The total storage overhead per record, in bytes. This corresponds to the space in the
- * actual hash table buckets, consisting of a 4 byte hash value and an 8 byte
- * pointer, plus the overhead for the stored length field.
- */
- private static final int RECORD_OVERHEAD_BYTES = RECORD_TABLE_BYTES + SERIALIZED_LENGTH_FIELD_BYTES;
-
// -------------------------- Bucket Size and Structure -------------------------------------
static final int NUM_INTRA_BUCKET_BITS = 7;
@@ -150,7 +138,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
static final int BUCKET_HEADER_LENGTH = 16;
- private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_OVERHEAD_BYTES;
+ private static final int NUM_ENTRIES_PER_BUCKET = (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) / RECORD_TABLE_BYTES;
private static final int BUCKET_POINTER_START_OFFSET = BUCKET_HEADER_LENGTH + (NUM_ENTRIES_PER_BUCKET * HASH_CODE_LEN);
@@ -344,12 +332,13 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
/**
* If true, build side partitions are kept for multiple probe steps.
*/
- protected boolean keepBuildSidePartitions = false;
+ protected boolean keepBuildSidePartitions;
- protected boolean furtherPartitioning = false;
+ protected boolean furtherPartitioning;
private boolean running = true;
+
// ------------------------------------------------------------------------
// Construction and Teardown
// ------------------------------------------------------------------------
@@ -446,8 +435,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
// grab the write behind buffers first
- for (int i = this.numWriteBehindBuffers; i > 0; --i)
- {
+ for (int i = this.numWriteBehindBuffers; i > 0; --i) {
this.writeBehindBuffers.add(this.availableMemory.remove(this.availableMemory.size() - 1));
}
// open builds the initial table by consuming the build-side input
@@ -522,10 +510,9 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
// check if there are pending partitions
- if (!this.partitionsPending.isEmpty())
- {
+ if (!this.partitionsPending.isEmpty()) {
final HashPartition<BT, PT> p = this.partitionsPending.get(0);
-
+
// build the next table
buildTableFromSpilledPartition(p);
@@ -534,10 +521,16 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.currentSpilledProbeSide = this.ioManager.createBlockChannelReader(p.getProbeSideChannel().getChannelID(), returnQueue);
List<MemorySegment> memory = new ArrayList<MemorySegment>();
- memory.add(getNextBuffer());
- MemorySegment nextBuffer = getNextBuffer();
- if (nextBuffer != null) {
- memory.add(nextBuffer);
+ MemorySegment seg1 = getNextBuffer();
+ if (seg1 != null) {
+ memory.add(seg1);
+ MemorySegment seg2 = getNextBuffer();
+ if (seg2 != null) {
+ memory.add(seg2);
+ }
+ }
+ else {
+ throw new IllegalStateException("Attempting to begin probing of partition without any memory available");
}
ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
@@ -595,10 +588,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
public HashBucketIterator<BT, PT> getBuildSideIterator() {
return this.bucketIterator;
}
-
- public MutableObjectIterator<BT> getPartitionEntryIterator() {
- return new HashPartitionIterator<BT, PT>(this.partitionsBeingBuilt.iterator(), this.buildSideSerializer);
- }
/**
* Closes the hash table. This effectively releases all internal structures and closes all
@@ -667,8 +656,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
// ------------------------------------------------------------------------
/**
- * @param input
- * @throws IOException
+ * Creates the initial hash table. This method sets up partitions, hash index, and inserts
+ * the data from the given iterator.
+ *
+ * @param input The iterator with the build side data.
+ * @throws IOException Thrown, if an element could not be fetched and deserialized from
+ * the iterator, or if serialization fails.
*/
protected void buildInitialTable(final MutableObjectIterator<BT> input) throws IOException {
// create the partitions
@@ -718,16 +711,13 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
private int getEstimatedMaxBucketEntries(int numBuffers, int bufferSize, int numBuckets, int recordLenBytes) {
final long totalSize = ((long) bufferSize) * numBuffers;
- final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+ final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_TABLE_BYTES);
final long maxNumRecordsStorable = (MAX_RECURSION_DEPTH + 1) * numRecordsStorable;
final long maxNumRecordsPerBucket = maxNumRecordsStorable / numBuckets;
return (int) maxNumRecordsPerBucket;
}
- /**
- * @param p
- * @throws IOException
- */
+
protected void buildTableFromSpilledPartition(final HashPartition<BT, PT> p) throws IOException {
final int nextRecursionLevel = p.getRecursionLevel() + 1;
@@ -748,11 +738,11 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
throw new RuntimeException("Hash Join bug in memory management: Memory buffers leaked.");
}
- long numBuckets = (p.getBuildSideRecordCount() * RECORD_TABLE_BYTES) / (HASH_BUCKET_SIZE - BUCKET_HEADER_LENGTH) + 1;
+ long numBuckets = p.getBuildSideRecordCount() / NUM_ENTRIES_PER_BUCKET + 1;
// we need to consider the worst case where everything hashes to one bucket which needs to overflow by the same
- // number of total buckets again.
- final long totalBuffersNeeded = (numBuckets * 2) / (this.bucketsPerSegmentMask + 1) + p.getBuildSideBlockCount() + 1;
+ // number of total buckets again. Also, one buffer needs to remain for the probing
+ final long totalBuffersNeeded = 2 * (numBuckets / (this.bucketsPerSegmentMask + 1)) + p.getBuildSideBlockCount() + 2;
if (totalBuffersNeeded < totalBuffersAvailable) {
// we are guaranteed to stay in memory
@@ -791,7 +781,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
final int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
final MemorySegment bucket = this.buckets[bucketArrayPos];
- insertBucketEntry(newPart, bucket, bucketInSegmentPos, hashCode, pointer);
+ insertBucketEntry(newPart, bucket, bucketInSegmentPos, hashCode, pointer, false);
}
}
else {
@@ -824,11 +814,16 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.availableMemory, this.buildSideSerializer);
final TypeComparator<BT> btComparator = this.buildSideComparator;
BT rec = this.buildSideSerializer.createInstance();
- while ((rec = inIter.next(rec)) != null)
- {
+ while ((rec = inIter.next(rec)) != null) {
final int hashCode = hash(btComparator.hash(rec), nextRecursionLevel);
insertIntoTable(rec, hashCode);
}
+
+ if (keepBuildSidePartitions && p.recursionLevel == 0) {
+ inReader.close(); // keep the partitions
+ } else {
+ inReader.closeAndDelete();
+ }
// finalize the partitions
for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) {
@@ -838,11 +833,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
}
- /**
- * @param record
- * @param hashCode
- * @throws IOException
- */
+
protected final void insertIntoTable(final BT record, final int hashCode) throws IOException {
final int posHashCode = hashCode % this.numBuckets;
@@ -865,7 +856,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
long pointer = p.insertIntoBuildBuffer(record);
if (pointer != -1) {
// record was inserted into an in-memory partition. a pointer must be inserted into the buckets
- insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer);
+ insertBucketEntry(p, bucket, bucketInSegmentPos, hashCode, pointer, true);
} else {
byte status = bucket.get(bucketInSegmentPos + HEADER_STATUS_OFFSET);
if (status == BUCKET_STATUS_IN_FILTER) {
@@ -877,24 +868,17 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
}
}
- /**
- * @param p
- * @param bucket
- * @param bucketInSegmentPos
- * @param hashCode
- * @param pointer
- * @throws IOException
- */
+
final void insertBucketEntry(final HashPartition<BT, PT> p, final MemorySegment bucket,
- final int bucketInSegmentPos, final int hashCode, final long pointer)
- throws IOException
+ final int bucketInSegmentPos, final int hashCode, final long pointer, final boolean spillingAllowed)
+ throws IOException
{
// find the position to put the hash code and pointer
final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
if (count < NUM_ENTRIES_PER_BUCKET)
{
// we are good in our current bucket, put the values
- bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode); // hash code
+ bucket.putInt(bucketInSegmentPos + BUCKET_HEADER_LENGTH + (count * HASH_CODE_LEN), hashCode); // hash code
bucket.putLong(bucketInSegmentPos + BUCKET_POINTER_START_OFFSET + (count * POINTER_LEN), pointer); // pointer
bucket.putShort(bucketInSegmentPos + HEADER_COUNT_OFFSET, (short) (count + 1)); // update count
}
@@ -943,6 +927,10 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
overflowSeg = getNextBuffer();
if (overflowSeg == null) {
// no memory available to create overflow bucket. we need to spill a partition
+ if (!spillingAllowed) {
+ throw new IOException("Hashtable memory ran out in a non-spillable situation. " +
+ "This is probably related to wrong size calculations.");
+ }
final int spilledPart = spillPartition();
if (spilledPart == p.getPartitionNumber()) {
// this bucket is no longer in-memory
@@ -1005,13 +993,12 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
number, recursionLevel, this.availableMemory.remove(this.availableMemory.size() - 1),
this, this.segmentSize);
}
- /**
- * @param numPartitions
- */
+
+
protected void createPartitions(int numPartitions, int recursionLevel) {
// sanity check
ensureNumBuffersReturned(numPartitions);
-
+
this.currentEnumerator = this.ioManager.createChannelEnumerator();
this.partitionsBeingBuilt.clear();
@@ -1122,7 +1109,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.currentEnumerator.next(), this.writeBehindBuffers);
this.writeBehindBuffersAvailable += numBuffersFreed;
// grab as many buffers as are available directly
- MemorySegment currBuff = null;
+ MemorySegment currBuff;
while (this.writeBehindBuffersAvailable > 0 && (currBuff = this.writeBehindBuffers.poll()) != null) {
this.availableMemory.add(currBuff);
this.writeBehindBuffersAvailable--;
@@ -1150,10 +1137,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
/**
* Set all the bucket memory except bucket header as the bit set of bloom filter, and use hash code of build records
* to build bloom filter.
- *
- * @param bucketInSegmentPos
- * @param bucket
- * @param p
*/
final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition<BT, PT> p) {
final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET);
@@ -1256,7 +1239,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
this.writeBehindBuffersAvailable--;
// grab as many more buffers as are available directly
- MemorySegment currBuff = null;
+ MemorySegment currBuff;
while (this.writeBehindBuffersAvailable > 0 && (currBuff = this.writeBehindBuffers.poll()) != null) {
this.availableMemory.add(currBuff);
this.writeBehindBuffersAvailable--;
@@ -1267,12 +1250,20 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
return null;
}
}
-
+
+ /**
+ * This is the method called by the partitions to request memory to serialize records.
+ * It automatically spills partitions, if memory runs out.
+ *
+ * @return The next available memory segment.
+ */
@Override
public MemorySegment nextSegment() {
final MemorySegment seg = getNextBuffer();
- if (seg == null) {
+ if (seg != null) {
+ return seg;
+ } else {
try {
spillPartition();
} catch (IOException ioex) {
@@ -1286,8 +1277,6 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
} else {
return fromSpill;
}
- } else {
- return seg;
}
}
@@ -1342,7 +1331,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
// ----------------------------------------------------------------------------------------
final long totalSize = ((long) bufferSize) * numBuffers;
- final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
+ final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_TABLE_BYTES);
final long bucketBytes = numRecordsStorable * RECORD_TABLE_BYTES;
final long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
@@ -1369,7 +1358,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
* @param code The integer to be hashed.
* @return The hash code for the integer.
*/
- public static final int hash(int code, int level) {
+ public static int hash(int code, int level) {
final int rotation = level * 11;
code = (code << rotation) | (code >>> -rotation);
@@ -1487,7 +1476,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
final int overflowSegNum = (int) (forwardPointer >>> 32);
this.bucket = this.overflowSegments[overflowSegNum];
- this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+ this.bucketInSegmentOffset = (int) forwardPointer;
this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
this.numInSegment = 0;
@@ -1537,7 +1526,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
final int overflowSegNum = (int) (forwardPointer >>> 32);
this.bucket = this.overflowSegments[overflowSegNum];
- this.bucketInSegmentOffset = (int) (forwardPointer & 0xffffffff);
+ this.bucketInSegmentOffset = (int) forwardPointer;
this.countInSegment = this.bucket.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET);
this.posInSegment = this.bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
this.numInSegment = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
index fd5fcde..b7a7262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java
@@ -101,7 +101,7 @@ public class ReOpenableMutableHashTable<BT, PT> extends MutableHashTable<BT, PT>
final int bucketArrayPos = posHashCode >> this.bucketsPerSegmentBits;
final int bucketInSegmentPos = (posHashCode & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS;
final MemorySegment bucket = this.buckets[bucketArrayPos];
- insertBucketEntry(part, bucket, bucketInSegmentPos, hashCode, pointer);
+ insertBucketEntry(part, bucket, bucketInSegmentPos, hashCode, pointer, true);
}
} else {
this.writeBehindBuffersAvailable--; // we are not in-memory, thus the probe side buffer will grab one wbb.
http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
new file mode 100644
index 0000000..0d8b81e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+public class HashTableTest {
+
+ private final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer;
+ private final TypeSerializer<Long> probeSerializer;
+
+ private final TypeComparator<Tuple2<Long, byte[]>> buildComparator;
+ private final TypeComparator<Long> probeComparator;
+
+ private final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator;
+
+
+ public HashTableTest() {
+ TypeSerializer<?>[] fieldSerializers = { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE };
+ @SuppressWarnings("unchecked")
+ Class<Tuple2<Long, byte[]>> clazz = (Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class;
+ this.buildSerializer = new TupleSerializer<Tuple2<Long, byte[]>>(clazz, fieldSerializers);
+
+ this.probeSerializer = LongSerializer.INSTANCE;
+
+ TypeComparator<?>[] comparators = { new LongComparator(true) };
+ TypeSerializer<?>[] comparatorSerializers = { LongSerializer.INSTANCE };
+
+ this.buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(new int[] {0}, comparators, comparatorSerializers);
+
+ this.probeComparator = new LongComparator(true);
+
+ this.pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
+
+ private long ref;
+
+ @Override
+ public void setReference(Long reference) {
+ ref = reference;
+ }
+
+ @Override
+ public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
+ //noinspection UnnecessaryUnboxing
+ return candidate.f0.longValue() == ref;
+ }
+
+ @Override
+ public int compareToReference(Tuple2<Long, byte[]> candidate) {
+ long x = ref;
+ long y = candidate.f0;
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+ };
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * This tests a combination of values that lead to a corner case situation where memory
+ * was missing and the computation deadlocked.
+ */
+ @Test
+ public void testBufferMissingForProbing() {
+
+ final IOManager ioMan = new IOManagerAsync();
+
+ try {
+ final int pageSize = 32*1024;
+ final int numSegments = 34;
+ final int numRecords = 3400;
+ final int recordLen = 270;
+
+ final byte[] payload = new byte[recordLen - 8 - 4];
+
+ List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+ MutableHashTable<Tuple2<Long, byte[]>, Long> table = new MutableHashTable<>(
+ buildSerializer, probeSerializer, buildComparator, probeComparator,
+ pairComparator, memory, ioMan, 16, false);
+
+ table.open(new TupleBytesIterator(payload, numRecords), new LongIterator(10000));
+
+ try {
+ while (table.nextRecord()) {
+ MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
+ while (matches.next() != null);
+ }
+ }
+ catch (RuntimeException e) {
+ if (!e.getMessage().contains("exceeded maximum number of recursions")) {
+ e.printStackTrace();
+ fail("Test failed with unexpected exception");
+ }
+ }
+ finally {
+ table.close();
+ }
+
+ checkNoTempFilesRemain(ioMan);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ ioMan.shutdown();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+ ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ list.add(new MemorySegment(new byte[segmentSize]));
+ }
+ return list;
+ }
+
+ private static void checkNoTempFilesRemain(IOManager ioManager) {
+ for (File dir : ioManager.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("hash table did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
+
+ private static class TupleBytesIterator implements MutableObjectIterator<Tuple2<Long, byte[]>> {
+
+ private final byte[] payload;
+ private final int numRecords;
+
+ private int count = 0;
+
+ TupleBytesIterator(byte[] payload, int numRecords) {
+ this.payload = payload;
+ this.numRecords = numRecords;
+ }
+
+ @Override
+ public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
+ return next();
+ }
+
+ @Override
+ public Tuple2<Long, byte[]> next() {
+ if (count++ < numRecords) {
+ return new Tuple2<>(42L, payload);
+ } else {
+ return null;
+ }
+ }
+ }
+
+ private static class LongIterator implements MutableObjectIterator<Long> {
+
+ private final long numRecords;
+ private long value = 0;
+
+ LongIterator(long numRecords) {
+ this.numRecords = numRecords;
+ }
+
+ @Override
+ public Long next(Long aLong) {
+ return next();
+ }
+
+ @Override
+ public Long next() {
+ if (value < numRecords) {
+ return value++;
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4a5fe4e6/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
new file mode 100644
index 0000000..56ee1da
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/RecordsAndWidthsCombinationCheck.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongComparator;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+public class RecordsAndWidthsCombinationCheck {
+
+ public static void main(String[] args) throws Exception {
+
+ @SuppressWarnings("unchecked")
+ final TypeSerializer<Tuple2<Long, byte[]>> buildSerializer =
+ new TupleSerializer<Tuple2<Long, byte[]>>(
+ (Class<Tuple2<Long, byte[]>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[] { LongSerializer.INSTANCE, BytePrimitiveArraySerializer.INSTANCE });
+
+ final TypeSerializer<Long> probeSerializer = LongSerializer.INSTANCE;
+
+ final TypeComparator<Tuple2<Long, byte[]>> buildComparator = new TupleComparator<Tuple2<Long, byte[]>>(
+ new int[] {0},
+ new TypeComparator<?>[] { new LongComparator(true) },
+ new TypeSerializer<?>[] { LongSerializer.INSTANCE });
+
+ final TypeComparator<Long> probeComparator = new LongComparator(true);
+
+ final TypePairComparator<Long, Tuple2<Long, byte[]>> pairComparator = new TypePairComparator<Long, Tuple2<Long, byte[]>>() {
+
+ private long ref;
+
+ @Override
+ public void setReference(Long reference) {
+ ref = reference;
+ }
+
+ @Override
+ public boolean equalToReference(Tuple2<Long, byte[]> candidate) {
+ //noinspection UnnecessaryUnboxing
+ return candidate.f0.longValue() == ref;
+ }
+
+ @Override
+ public int compareToReference(Tuple2<Long, byte[]> candidate) {
+ long x = ref;
+ long y = candidate.f0;
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+ };
+
+ final IOManager ioMan = new IOManagerAsync();
+
+ try {
+ final int pageSize = 32*1024;
+ final int numSegments = 34;
+
+ for (int num = 3400; num < 3550; num++) {
+ final int numRecords = num;
+
+ for (int recordLen = 270; recordLen < 320; recordLen++) {
+
+ final byte[] payload = new byte[recordLen - 8 - 4];
+
+ System.out.println("testing " + numRecords + " / " + recordLen);
+
+ List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+ // we create a hash table that thinks the records are super large. that makes it choose initially
+ // a lot of memory for the partition buffers, and start with a smaller hash table. that way
+ // we trigger a hash table growth early.
+ MutableHashTable<Tuple2<Long, byte[]>, Long> table = new MutableHashTable<>(
+ buildSerializer, probeSerializer, buildComparator, probeComparator,
+ pairComparator, memory, ioMan, 16, false);
+
+ final MutableObjectIterator<Tuple2<Long, byte[]>> buildInput = new MutableObjectIterator<Tuple2<Long, byte[]>>() {
+
+ private int count = 0;
+
+ @Override
+ public Tuple2<Long, byte[]> next(Tuple2<Long, byte[]> reuse) {
+ return next();
+ }
+
+ @Override
+ public Tuple2<Long, byte[]> next() {
+ if (count++ < numRecords) {
+ return new Tuple2<>(42L, payload);
+ } else {
+ return null;
+ }
+ }
+ };
+
+ // probe side
+ final MutableObjectIterator<Long> probeInput = new MutableObjectIterator<Long>() {
+
+ private final long numRecords = 10000;
+ private long value = 0;
+
+ @Override
+ public Long next(Long aLong) {
+ return next();
+ }
+
+ @Override
+ public Long next() {
+ if (value < numRecords) {
+ return value++;
+ } else {
+ return null;
+ }
+ }
+ };
+
+ table.open(buildInput, probeInput);
+
+ try {
+ while (table.nextRecord()) {
+ MutableHashTable.HashBucketIterator<Tuple2<Long, byte[]>, Long> matches = table.getBuildSideIterator();
+ while (matches.next() != null);
+ }
+ }
+ catch (RuntimeException e) {
+ if (!e.getMessage().contains("exceeded maximum number of recursions")) {
+ throw e;
+ }
+ }
+ finally {
+ table.close();
+ }
+
+ // make sure no temp files are left
+ checkNoTempFilesRemain(ioMan);
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ ioMan.shutdown();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
+ ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ list.add(new MemorySegment(new byte[segmentSize]));
+ }
+ return list;
+ }
+
+ private static void checkNoTempFilesRemain(IOManager ioManager) {
+ for (File dir : ioManager.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("hash table did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
+}
[2/2] flink git commit: [FLINK-2575] [runtime] Deactivate join bloom
filters by default
Posted by se...@apache.org.
[FLINK-2575] [runtime] Deactivate join bloom filters by default
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ced6a199
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ced6a199
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ced6a199
Branch: refs/heads/master
Commit: ced6a1993815fc61fc0c4b68370d9af76de68c71
Parents: e02c301
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 26 12:29:58 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 26 18:48:17 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/configuration/ConfigConstants.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ced6a199/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d145eb2..2ca7c36 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -550,7 +550,7 @@ public final class ConfigConstants {
/**
* Default setting for the switch for hash join bloom filters for spilled partitions.
*/
- public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = true;
+ public static final boolean DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS = false;
/**
* The default value for the maximum spilling fan in/out.