You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by GitBox <gi...@apache.org> on 2019/01/15 03:06:04 UTC
[drill] Diff for: [GitHub] Ben-Zvi closed pull request #1606: Drill 6845:
Semi-Hash-Join to skip incoming build duplicates,
automatically stop skipping if too few
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 811c4791e82..729b5e4021f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -157,8 +157,17 @@ private ExecConstants() {
public static final BooleanValidator HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING = new BooleanValidator(HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY, null);
public static final String HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY = "exec.hashjoin.runtime_filter.max.waiting.time";
public static final PositiveLongValidator HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME = new PositiveLongValidator(HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY, Character.MAX_VALUE, null);
-
-
+ public static final String HASHJOIN_SEMI_SKIP_DUPLICATES_KEY = "exec.hashjoin.semi_skip_duplicates";
+ public static final BooleanValidator HASHJOIN_SEMI_SKIP_DUPLICATES_VALIDATOR = new BooleanValidator(HASHJOIN_SEMI_SKIP_DUPLICATES_KEY,
+ new OptionDescription("When TRUE, make Semi Hash Join check for incoming duplicated and skip those (use more cpu, less memory)"));
+ public static final String HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_KEY = "exec.hashjoin.semi_percent_duplicates_to_skip";
+ public static final IntegerValidator HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR = new IntegerValidator(HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_KEY,
+ 0, 100,
+ new OptionDescription("Semi join to skip duplicates only if initial check finds duplicates in incoming as no less than this percentage"));
+ public static final String HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_KEY = "exec.hashjoin.min_batches_in_available_memory";
+ public static final IntegerValidator HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR = new IntegerValidator(HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_KEY,
+ 1, Integer.MAX_VALUE,
+ new OptionDescription("Threshold: Start spilling if available memory is less than this number of batches (only for semi join skipping duplicates"));
// Hash Aggregate Options
public static final String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 275cf16572d..69ea3f34a66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -66,7 +66,7 @@
* After all the build/inner data is read for this partition - if all its data is in memory, then
* a hash table and a helper are created, and later this data would be probed.
* If all this partition's build/inner data was spilled, then it begins to work as an outer
- * partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentBatch,
+ * partition (see the flag "processingOuter") -- reusing some of the fields (e.g., currentVectorContainer,
* currHVVector, writer, spillFile, partitionBatchesCount) for the outer.
* </p>
*/
@@ -94,8 +94,8 @@
// incoming batches, per each partition (these may be spilled at some point)
private List<VectorContainer> tmpBatchesList;
// A batch and HV vector to hold incoming rows - per each partition
- private VectorContainer currentBatch; // The current (newest) batch
- private IntVector currHVVector; // The HV vectors for the currentBatches
+ private VectorContainer currentVectorContainer; // The current (newest) container
+ private IntVector currHVVector; // The HV vectors for the currentVectorContainers
/* Helper class
* Maintains linked list of build side records with the same key
@@ -126,9 +126,10 @@
private long numInMemoryRecords;
private boolean updatedRecordsPerBatch = false;
private boolean semiJoin;
+ private boolean skipDuplicates; // only for semi
public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
- RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin,
+ RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin, boolean skipDuplicates,
int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) {
this.allocator = allocator;
this.buildBatch = buildBatch;
@@ -139,6 +140,7 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
this.cycleNum = cycleNum;
this.numPartitions = numPartitions;
this.semiJoin = semiJoin;
+ this.skipDuplicates = semiJoin && skipDuplicates;
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
@@ -156,7 +158,7 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
tmpBatchesList = new ArrayList<>();
if ( numPartitions > 1 ) {
- allocateNewCurrentBatchAndHV();
+ allocateNewCurrentVectorContainerAndHV();
}
}
@@ -218,19 +220,47 @@ private VectorContainer allocateNewVectorContainer(RecordBatch rb) {
/**
* Allocate a new current Vector Container and current HV vector
*/
- public void allocateNewCurrentBatchAndHV() {
+ public void allocateNewCurrentVectorContainerAndHV() {
if (outerBatchAllocNotNeeded) { return; } // skip when the inner is whole in memory
- currentBatch = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
+ currentVectorContainer = allocateNewVectorContainer(processingOuter ? probeBatch : buildBatch);
currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), allocator);
currHVVector.allocateNew(recordsPerBatch);
}
+ /**
+ * This method is only used for semi-join, when trying to skip incoming key duplicate rows
+ * It adds the given row's key to the hash table, is needed, and returns true only if that
+ * key already existed in the hash table
+ *
+ * @param buildContainer The container with the current row
+ * @param ind The index of the current row in the container
+ * @param hashCode The hash code for the key of this row
+ * @return True iff that key already exists in the hash table
+ */
+ public boolean insertKeyIntoHashTable(VectorContainer buildContainer, int ind, int hashCode) throws SchemaChangeException {
+ hashTable.updateIncoming(buildContainer, probeBatch );
+ final IndexPointer htIndex = new IndexPointer();
+ HashTable.PutStatus status;
+
+ try {
+ status = hashTable.put(ind, htIndex, hashCode, BATCH_SIZE);
+ } catch (RetryAfterSpillException RE) {
+ if ( numPartitions == 1 ) { // if cannot spill
+ throw new OutOfMemoryException(RE);
+ }
+ spillThisPartition(); // free some memory
+ return false;
+ }
+
+ return status == HashTable.PutStatus.KEY_PRESENT;
+ }
+
/**
* Spills if needed
*/
- public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
+ public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.HashJoinSpillControl calc) {
- int pos = currentBatch.appendRow(buildContainer,ind);
+ int pos = currentVectorContainer.appendRow(buildContainer,ind);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
if ( pos == recordsPerBatch ) {
boolean needsSpill = isSpilled || calc.shouldSpill();
@@ -243,7 +273,7 @@ public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode
*
*/
public void appendOuterRow(int hashCode, int recordsProcessed) {
- int pos = currentBatch.appendRow(probeBatch.getContainer(),recordsProcessed);
+ int pos = currentVectorContainer.appendRow(probeBatch.getContainer(),recordsProcessed);
currHVVector.getMutator().set(pos - 1, hashCode); // store the hash value in the new column
if ( pos == recordsPerBatch ) {
completeAnOuterBatch(true);
@@ -262,27 +292,27 @@ public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
* (that is, more rows are coming) - initialize with a new current batch for that partition
* */
private void completeABatch(boolean toInitialize, boolean needsSpill) {
- if ( currentBatch.hasRecordCount() && currentBatch.getRecordCount() > 0) {
- currentBatch.add(currHVVector);
- currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- tmpBatchesList.add(currentBatch);
+ if ( currentVectorContainer.hasRecordCount() && currentVectorContainer.getRecordCount() > 0) {
+ currentVectorContainer.add(currHVVector);
+ currentVectorContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ tmpBatchesList.add(currentVectorContainer);
partitionBatchesCount++;
- long batchSize = new RecordBatchSizer(currentBatch).getActualSize();
- inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentBatch.getRecordCount(), batchSize));
+ long batchSize = new RecordBatchSizer(currentVectorContainer).getActualSize();
+ inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(currentVectorContainer.getRecordCount(), batchSize));
partitionInMemorySize += batchSize;
- numInMemoryRecords += currentBatch.getRecordCount();
+ numInMemoryRecords += currentVectorContainer.getRecordCount();
} else {
- freeCurrentBatchAndHVVector();
+ freeCurrentVectorContainerAndHVVector();
}
if ( needsSpill ) { // spill this batch/partition and free its memory
spillThisPartition();
}
if ( toInitialize ) { // allocate a new batch and HV vector
- allocateNewCurrentBatchAndHV();
+ allocateNewCurrentVectorContainerAndHV();
} else {
- currentBatch = null;
+ currentVectorContainer = null;
currHVVector = null;
}
}
@@ -327,6 +357,10 @@ public void spillThisPartition() {
if ( tmpBatchesList.size() == 0 ) { return; } // in case empty - nothing to spill
logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", partitionNum, cycleNum, tmpBatchesList.size());
+ if ( skipDuplicates ) {
+ hashTable.reset();
+ } // deallocate and reinit the hash table in case of a semi skipping dupl
+
// If this is the first spill for this partition, create an output stream
if ( writer == null ) {
final String side = processingOuter ? "outer" : "inner";
@@ -492,6 +526,31 @@ private void closeWriterInternal(boolean doDeleteFile) {
partitionBatchesCount = 0;
}
+ /**
+ * Stop skipping duplicates (when there are too few of them)
+ * thus not maintaining the hash table as new rows arrive
+ */
+ public void stopSkippingDuplicates() {
+ assert skipDuplicates;
+ hashTable.reset();
+ skipDuplicates = false;
+ }
+
+ /**
+ * Builds the containers only, not the hash table nor the helper
+ * To be used in case of skipping duplicates (when the hash table already exists)
+ */
+ public void buildContainers() {
+ assert skipDuplicates;
+ if ( isSpilled ) { return; } // no building for spilled partitions
+ containers = new ArrayList<>();
+ for (int curr = 0; curr < partitionBatchesCount; curr++) {
+ VectorContainer nextBatch = tmpBatchesList.get(curr);
+ containers.add(nextBatch);
+ }
+ outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch
+ }
+
/**
* Creates the hash table and join helper for this partition.
* This method should only be called after all the build side records
@@ -514,7 +573,7 @@ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
assert nextBatch != null;
assert probeBatch != null;
- hashTable.updateIncoming(nextBatch, probeBatch );
+ hashTable.updateIncoming(nextBatch, probeBatch);
IntVector HV_vector = (IntVector) nextBatch.getLast();
@@ -531,7 +590,6 @@ public void buildContainersHashTableAndHelper() throws SchemaChangeException {
*/
if ( ! semiJoin ) { hjHelper.setCurrentIndex(htIndex.value, curr /* buildBatchIndex */, recInd); }
}
-
containers.add(nextBatch);
}
outerBatchAllocNotNeeded = true; // the inner is whole in memory, no need for an outer batch
@@ -555,10 +613,10 @@ private void clearHashTableAndHelper() {
}
}
- private void freeCurrentBatchAndHVVector() {
- if ( currentBatch != null ) {
- currentBatch.clear();
- currentBatch = null;
+ private void freeCurrentVectorContainerAndHVVector() {
+ if ( currentVectorContainer != null ) {
+ currentVectorContainer.clear();
+ currentVectorContainer = null;
}
if ( currHVVector != null ) {
currHVVector.clear();
@@ -571,7 +629,7 @@ private void freeCurrentBatchAndHVVector() {
* @param deleteFile - whether to delete the spill file or not
*/
public void cleanup(boolean deleteFile) {
- freeCurrentBatchAndHVVector();
+ freeCurrentVectorContainerAndHVVector();
if (containers != null && !containers.isEmpty()) {
for (VectorContainer vc : containers) {
vc.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index fe329cc5e51..5ecf0447cc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -70,7 +70,7 @@ public HashTableConfig(
@JsonCreator
public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity,
- @JsonProperty("initialCapacity") boolean initialSizeIsFinal,
+ @JsonProperty("initialSizeIsFinal") boolean initialSizeIsFinal,
@JsonProperty("loadFactor") float loadFactor,
@JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
@JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0ac0809d8f7..2761fff4f1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -214,6 +214,12 @@
private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
private List<BloomFilter> bloomFilters = new ArrayList<>();
+ private long semiCountTotal;
+ private long semiCountDuplicates;
+ private long semiDupDecisionPoint; // The number of incoming at which to stop and make the "continue skip" decision
+ private boolean semiSkipDuplicates; // optional, for semi join
+ private int semiSkipDuplicatesMinPercentage;
+
/**
* This holds information about the spilled partitions for the build and probe side.
*/
@@ -320,7 +326,7 @@ public boolean hasPartitionLimit() {
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;
- // duplicate for hash ag
+ // duplicate for hash agg
@Override
public int metricId() { return ordinal(); }
@@ -719,7 +725,7 @@ private void setupHashTable() throws SchemaChangeException {
}
final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- true, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
+ !semiSkipDuplicates, HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators, joinControl.asInt());
// Create the chained hash table
baseHashTable =
@@ -789,18 +795,16 @@ private void setupHash64(HashTableConfig htConfig) throws SchemaChangeException
}
/**
- * Call only after num partitions is known
+ * Call only after the final 'num partitions' is known (See partitionNumTuning())
*/
private void delayedSetup() {
- //
- // Find out the estimated max batch size, etc
- // and compute the max numPartitions possible
- // See partitionNumTuning()
- //
-
spilledState.initialize(numPartitions);
// Create array for the partitions
partitions = new HashPartition[numPartitions];
+ // Runtime stats for semi-join: Help decide early if seen too many duplicates (based on initial data, about 32K per partition)
+ semiCountTotal = semiCountDuplicates = 0;
+ semiDupDecisionPoint = // average each partition's hash table half full ( + 1 to avoid zero in case numPartitions == 1 )
+ ((numPartitions + 1) / 2) * context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY);
}
/**
@@ -810,8 +814,8 @@ private void initializeBuild() {
baseHashTable.updateIncoming(buildBatch, probeBatch); // in case we process the spilled files
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
- partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin,
- RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(), numPartitions);
+ partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, semiJoin, semiSkipDuplicates, RECORDS_PER_BATCH, spillSet, part, spilledState.getCycle(),
+ numPartitions);
}
spilledInners = new HashJoinSpilledPartition[numPartitions];
@@ -939,16 +943,17 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
+ HashJoinMemoryCalculator.BuildSidePartitioning currentCalc; // may be either a spill control calc, or buildCalc
{
// Initializing build calculator
// Limit scope of these variables to this block
- int maxBatchSize = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
+ int maxBatchRowCount = spilledState.isFirstCycle()? RecordBatch.MAX_BATCH_ROW_COUNT: RECORDS_PER_BATCH;
boolean doMemoryCalculation = canSpill && !probeSideIsEmpty.booleanValue();
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(doMemoryCalculation);
- buildCalc = calc.next();
+ currentCalc = buildCalc = calc.next();
buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
buildBatch,
@@ -959,14 +964,40 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
- maxBatchSize,
- maxBatchSize,
+ maxBatchRowCount,
+ maxBatchRowCount,
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
if (spilledState.isFirstCycle() && doMemoryCalculation) {
// Do auto tuning
- buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
+ buildCalc = partitionNumTuning(maxBatchRowCount, buildCalc);
+ }
+ if ( semiSkipDuplicates ) {
+ // in case of a Semi Join skippinging duplicates, use a "spill control" calc
+ // (may revert back to the buildCalc if the code decides to stop skipping)
+ currentCalc = new HashJoinSpillControlImpl(allocator, RECORDS_PER_BATCH,
+ (int) context.getOptions().getOption(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR),
+ batchMemoryManager, context);
+
+ // calculates the max number of partitions possible
+ if ( spilledState.isFirstCycle() && doMemoryCalculation ) {
+ currentCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after growing hash values bug fixed
+ buildBatch,
+ probeBatch,
+ buildJoinColumns,
+ probeSideIsEmpty.booleanValue(),
+ allocator.getLimit(),
+ numPartitions,
+ RECORDS_PER_BATCH,
+ RECORDS_PER_BATCH,
+ maxBatchRowCount,
+ maxBatchRowCount,
+ batchMemoryManager.getOutputBatchSize(),
+ HashTable.DEFAULT_LOAD_FACTOR);
+
+ numPartitions = currentCalc.getNumPartitions();
+ }
}
}
@@ -981,7 +1012,7 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
// Make the calculator aware of our partitions
final HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(partitions);
- buildCalc.setPartitionStatSet(partitionStatSet);
+ currentCalc.setPartitionStatSet(partitionStatSet);
boolean moreData = true;
while (moreData) {
@@ -1029,12 +1060,31 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
: read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
int currPart = hashCode & spilledState.getPartitionMask();
hashCode >>>= spilledState.getBitsInMask();
- // semi-join skips join-key-duplicate rows
- if ( semiJoin ) {
-
+ // semi-join builds the hash-table, and skips join-key-duplicate rows
+ if (semiSkipDuplicates) {
+ semiCountTotal++;
+ boolean aDuplicate = partitions[currPart].insertKeyIntoHashTable(buildBatch.getContainer(), ind, hashCode);
+ // A heuristic: Make a decision once the threshold was met - either continue skipping duplicates, or stop
+ // (skipping duplicates carries a cost, so better avoid if duplicates are too few)
+ if ( semiCountTotal == semiDupDecisionPoint) { // got enough incoming rows to decide ?
+ long threshold = semiCountTotal * semiSkipDuplicatesMinPercentage / 100;
+ if ( semiCountDuplicates < threshold ) { // when duplicates found were less than the percentage threshold, stop skipping
+ for (HashPartition partn : partitions) {
+ partn.stopSkippingDuplicates();
+ }
+ semiSkipDuplicates = false;
+ currentCalc = buildCalc; // back to using the regular calc
+ }
+ logger.debug("Semi {} skipping duplicates after receiving {} rows with {} percent duplicates",
+ semiSkipDuplicates ? "to continue" : "stopped", semiCountTotal, (100 * semiCountDuplicates) / semiCountTotal);
+ }
+ if ( aDuplicate ) {
+ semiCountDuplicates++;
+ continue;
+ }
}
// Append the new inner row to the appropriate partition; spill (that partition) if needed
- partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
+ partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, currentCalc); // may spill if needed
}
if ( read_right_HV_vector != null ) {
@@ -1054,12 +1104,15 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
}
}
+ int numPartitionsSpilled = 0;
+
// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
if ( numPartitions > 1 ) { // a single partition needs no completion
for (HashPartition partn : partitions) {
partn.completeAnInnerBatch(false, partn.isSpilled());
+ if ( partn.isSpilled() ) { numPartitionsSpilled++; }
}
}
@@ -1071,8 +1124,8 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
return leftUpstream;
}
- HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
- postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
+ HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = currentCalc.next();
+ postBuildCalc.initialize(probeSideIsEmpty.booleanValue());
//
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
@@ -1090,6 +1143,10 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
if (postBuildCalc.shouldSpill()) {
// Spill this partition if we need to make room
partn.spillThisPartition();
+ } else if (semiSkipDuplicates) {
+ // All in memory, and already got the Hash Table - just build the containers
+ // (No additional memory is needed, hence no need for any new spill)
+ partn.buildContainers();
} else {
// Only build hash tables for partitions that are not spilled
partn.buildContainersHashTableAndHelper();
@@ -1196,6 +1253,10 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
this.probeBatch = left;
joinType = popConfig.getJoinType();
semiJoin = popConfig.isSemiJoin();
+ semiSkipDuplicatesMinPercentage = (int) context.getOptions().getOption(ExecConstants.HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR);
+ semiSkipDuplicates = semiJoin &&
+ semiSkipDuplicatesMinPercentage < 100 && // can't have 100 percent, at least the first key is not a duplicate
+ context.getOptions().getBoolean(ExecConstants.HASHJOIN_SEMI_SKIP_DUPLICATES_KEY);
joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL;
joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
conditions = popConfig.getConditions();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index c262e3c8c27..d6108c6ce3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -95,7 +95,7 @@
* </li>
* </ul>
*/
- interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations> {
+ interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations>, HashJoinSpillControl {
void initialize(boolean firstCycle,
boolean reserveHash,
RecordBatch buildSideBatch,
@@ -119,8 +119,6 @@ void initialize(boolean firstCycle,
long getMaxReservedMemory();
- boolean shouldSpill();
-
String makeDebugString();
}
@@ -128,17 +126,16 @@ void initialize(boolean firstCycle,
* The interface representing the {@link HashJoinStateCalculator} corresponding to the
* {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
*/
- interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator> {
+ interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator>, HashJoinSpillControl {
/**
* Initializes the calculator with additional information needed.
* @param probeEmty True if the probe is empty. False otherwise.
+ *
*/
void initialize(boolean probeEmty);
int getProbeRecordsPerBatch();
- boolean shouldSpill();
-
String makeDebugString();
}
@@ -154,6 +151,9 @@ void initialize(boolean firstCycle,
long getInMemorySize();
}
+ interface HashJoinSpillControl {
+ boolean shouldSpill();
+ }
/**
* This class represents the memory size statistics for an entire set of partitions.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 88f3ddc21de..dd1ede2e15a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -174,7 +174,7 @@ public HashJoinState getState() {
* <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, int, int, int, int, int, int, double)}.
* This will initialize the StateCalculate with the additional information it needs.</li>
* <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li>
- * <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li>
+ * <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()} To determine if spilling needs to occurr.</li>
* <li><b>Step 3:</b> Call {@link #next()} and get the next memory calculator associated with your next state.</li>
* </ul>
* </p>
@@ -555,9 +555,9 @@ public String makeDebugString() {
* <h1>Lifecycle</h1>
* <p>
* <ul>
- * <li><b>Step 1:</b> Call {@link #initialize(boolean)}. This
+ * <li><b>Step 1:</b> Call {@link PostBuildCalculations#initialize(boolean)}. This
* gives the {@link HashJoinStateCalculator} additional information it needs to compute memory requirements.</li>
- * <li><b>Step 2:</b> Call {@link #shouldSpill()}. This tells
+ * <li><b>Step 2:</b> Call {@link HashJoinSpillControl#shouldSpill()}. This tells
* you which build side partitions need to be spilled in order to make room for probing.</li>
* <li><b>Step 3:</b> Call {@link #next()}. After you are done probing
* and partitioning the probe side, get the next calculator.</li>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c549143d5f3..44e191d96d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -153,7 +153,7 @@ public void setupHashJoinProbe(RecordBatch probeBatch, HashJoinBatch outgoing, J
// for those outer partitions that need spilling (cause their matching inners spilled)
// initialize those partitions' current batches and hash-value vectors
for ( HashPartition partn : this.partitions) {
- partn.allocateNewCurrentBatchAndHV();
+ partn.allocateNewCurrentVectorContainerAndHV();
}
currRightPartition = 0; // In case it's a Right/Full outer join
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
new file mode 100644
index 00000000000..d4f22ac8a8e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinSpillControlImpl.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.join;
+
+ import org.apache.drill.exec.ExecConstants;
+ import org.apache.drill.exec.memory.BufferAllocator;
+ import org.apache.drill.exec.ops.FragmentContext;
+ import org.apache.drill.exec.record.RecordBatch;
+ import org.apache.drill.exec.record.RecordBatchMemoryManager;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import javax.annotation.Nullable;
+ import java.util.Set;
+
+ import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+ import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
+
+/**
+ * This class is currently used only for Semi-Hash-Join that avoids duplicates by the use of a hash table
+ * The method {@link HashJoinMemoryCalculator.HashJoinSpillControl#shouldSpill()} returns true if the memory available now to the allocator if not enough
+ * to hold (a multiple of, for safety) a new allocated batch
+ */
+public class HashJoinSpillControlImpl implements HashJoinMemoryCalculator.BuildSidePartitioning {
+ private static final Logger logger = LoggerFactory.getLogger(HashJoinSpillControlImpl.class);
+
+ private BufferAllocator allocator;
+ private int recordsPerBatch;
+ private int minBatchesInAvailableMemory;
+ private RecordBatchMemoryManager batchMemoryManager;
+ private int initialPartitions;
+ private int numPartitions;
+ private int recordsPerPartitionBatchProbe;
+ private FragmentContext context;
+ HashJoinMemoryCalculator.PartitionStatSet partitionStatSet;
+
+ HashJoinSpillControlImpl(BufferAllocator allocator, int recordsPerBatch, int minBatchesInAvailableMemory, RecordBatchMemoryManager batchMemoryManager, FragmentContext context) {
+ this.allocator = allocator;
+ this.recordsPerBatch = recordsPerBatch;
+ this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+ this.batchMemoryManager = batchMemoryManager;
+ this.context = context;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ // Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
+ long batchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+ long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+ long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
+ boolean needsSpill = minBatchesInAvailableMemory * batchSize > memoryAvailableNow;
+ if ( needsSpill ) {
+ logger.debug("should spill now - batch size {}, mem avail {}, reserved for outgoing {}", batchSize, memoryAvailableNow, reserveForOutgoing);
+ }
+ return needsSpill; // go spill if too little memory is available
+ }
+
+ @Override
+ public void initialize(boolean firstCycle,
+ boolean reserveHash,
+ RecordBatch buildSideBatch,
+ RecordBatch probeSideBatch,
+ Set<String> joinColumns,
+ boolean probeEmpty,
+ long memoryAvailable,
+ int initialPartitions,
+ int recordsPerPartitionBatchBuild,
+ int recordsPerPartitionBatchProbe,
+ int maxBatchNumRecordsBuild,
+ int maxBatchNumRecordsProbe,
+ int outputBatchSize,
+ double loadFactor) {
+ this.initialPartitions = initialPartitions;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+
+ calculateMemoryUsage();
+ }
+
+ @Override
+ public void setPartitionStatSet(HashJoinMemoryCalculator.PartitionStatSet partitionStatSet) {
+ this.partitionStatSet = partitionStatSet;
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return numPartitions;
+ }
+
+ /**
+ * Calculate the number of partitions possible for the given available memory
+ * start at initialPartitions and adjust down (in powers of 2) as needed
+ */
+ private void calculateMemoryUsage() {
+ long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+ long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
+
+ // Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
+ int buildBatchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+ int hashTableSize = buildBatchSize /* the keys in the HT */ +
+ 4 * (int)context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY) /* the initial hash table buckets */ +
+ (2 + 2) * recordsPerBatch; /* the hash-values and the links */
+ int probeBatchSize = ( batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+
+ long memoryNeededPerPartition = Integer.max(buildBatchSize + hashTableSize, probeBatchSize);
+
+ for ( numPartitions = initialPartitions; numPartitions > 2; numPartitions /= 2 ) { // need at least 2
+ // each partition needs at least one internal build batch, and a minimum hash-table
+ // ( or an internal probe batch, for a spilled partition during probe phase )
+ // adding "min batches" to create some safety slack
+ if ( memoryAvailableNow >
+ ( numPartitions + minBatchesInAvailableMemory ) * memoryNeededPerPartition ) {
+ break; // got enough memory
+ }
+ }
+ logger.debug("Spill control chosen to use {} partitions", numPartitions);
+ }
+
+ @Override
+ public long getBuildReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxReservedMemory() {
+ return 0;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Spill Control " + HashJoinMemoryCalculatorImpl.HashJoinSpillControl.class.getCanonicalName();
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator.PostBuildCalculations next() {
+ return new SpillControlPostBuildCalculationsImpl(recordsPerPartitionBatchProbe,
+ allocator, recordsPerBatch, minBatchesInAvailableMemory, batchMemoryManager, partitionStatSet);
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.BUILD_SIDE_PARTITIONING;
+ }
+
+
+ /**
+ * The purpose of this class is to provide the method {@link #shouldSpill} that ensures that enough memory is available to
+ * hold all the probe incoming batches for those partitions that spilled (else need to spill more of them, for more memory).
+ */
+ public static class SpillControlPostBuildCalculationsImpl implements HashJoinMemoryCalculator.PostBuildCalculations {
+ private static final Logger logger = LoggerFactory.getLogger(SpillControlPostBuildCalculationsImpl.class);
+
+ private final int recordsPerPartitionBatchProbe;
+ private BufferAllocator allocator;
+ private int recordsPerBatch;
+ private int minBatchesInAvailableMemory;
+ private RecordBatchMemoryManager batchMemoryManager;
+ private boolean probeEmpty;
+ private final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet;
+
+
+ public SpillControlPostBuildCalculationsImpl(final int recordsPerPartitionBatchProbe,
+ BufferAllocator allocator, int recordsPerBatch, int minBatchesInAvailableMemory,
+ RecordBatchMemoryManager batchMemoryManager,
+ final HashJoinMemoryCalculator.PartitionStatSet buildPartitionStatSet) {
+ this.allocator = allocator;
+ this.recordsPerBatch = recordsPerBatch;
+ this.minBatchesInAvailableMemory = minBatchesInAvailableMemory;
+ this.batchMemoryManager = batchMemoryManager;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+ this.buildPartitionStatSet = buildPartitionStatSet;
+ }
+
+ @Override
+ public void initialize(boolean hasProbeData) {
+ this.probeEmpty = hasProbeData;
+ }
+
+
+ @Override
+ public int getProbeRecordsPerBatch() {
+ return recordsPerPartitionBatchProbe;
+ }
+
+ @Override
+ public boolean shouldSpill() {
+ int numPartitionsSpilled = buildPartitionStatSet.getNumSpilledPartitions();
+ if ( numPartitionsSpilled == 0 ) { return false; } // no extra memory is needed if all the build side is in memory
+ if ( probeEmpty ) { return false; } // no probe side data
+ // Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
+ long batchSize = ( batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
+ long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
+ long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
+ boolean needsSpill = (numPartitionsSpilled + minBatchesInAvailableMemory ) * batchSize > memoryAvailableNow;
+ if ( needsSpill ) {
+ logger.debug("Post build should spill now - batch size {}, mem avail {}, reserved for outgoing {}, num partn spilled {}", batchSize,
+ memoryAvailableNow, reserveForOutgoing, numPartitionsSpilled);
+ }
+ return needsSpill; // go spill if too little memory is available
+ }
+
+ @Nullable
+ @Override
+ public HashJoinMemoryCalculator next() {
+ return null;
+ }
+
+ @Override
+ public HashJoinState getState() {
+ return HashJoinState.POST_BUILD_CALCULATIONS;
+ }
+
+ @Override
+ public String makeDebugString() {
+ return "Spill Control " + HashJoinMemoryCalculatorImpl.HashJoinSpillControl.class.getCanonicalName() + " calculator.";
+ }
+ }
+
+
+
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index c97220cef98..aeba391c992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,13 +124,16 @@
new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
- new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME),
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_WAITING),
+ new OptionDefinition(ExecConstants.HASHJOIN_SEMI_SKIP_DUPLICATES_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_SEMI_PERCENT_DUPLICATES_TO_SKIP_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
// ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
new OptionDefinition(PlannerSettings.USE_SIMPLE_OPTIMIZER),
new OptionDefinition(PlannerSettings.INDEX_PLANNING),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
index 3f8f8a4b4c9..fa8a0702ec0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java
@@ -158,14 +158,21 @@ public DoubleValidator(String name, OptionDescription description) {
}
public static class IntegerValidator extends LongValidator {
+ private int iMin = Integer.MIN_VALUE;
+ private int iMax = Integer.MAX_VALUE;
public IntegerValidator(String name, OptionDescription description) {
super(name, description);
}
+ public IntegerValidator(String name, int min, int max, OptionDescription description) {
+ super(name, description);
+ iMin = min;
+ iMax = max;
+ }
@Override
public void validate(final OptionValue v, final OptionMetaData metaData, final OptionSet manager) {
super.validate(v, metaData, manager);
- if (v.num_val > Integer.MAX_VALUE || v.num_val < Integer.MIN_VALUE) {
+ if (v.num_val > iMax || v.num_val < iMin) {
throw UserException.validationError()
.message(String.format("Option %s does not have a valid integer value", getOptionName()))
.build(logger);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4a5f07549ea..3d25b80d326 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -490,6 +490,9 @@ drill.exec.options: {
exec.hashjoin.bloom_filter.max.size: 33554432, #32 MB
exec.hashjoin.runtime_filter.waiting.enable: true,
exec.hashjoin.runtime_filter.max.waiting.time: 300, #400 ms
+ exec.hashjoin.semi_skip_duplicates: true,
+ exec.hashjoin.semi_percent_duplicates_to_skip: 20,
+ exec.hashjoin.min_batches_in_available_memory: 3,
exec.hashagg.mem_limit: 0,
exec.hashagg.min_batches_per_partition: 2,
exec.hashagg.num_partitions: 32,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index 93475acf6d4..a4377cb43ca 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -110,7 +110,7 @@ public void run(SpillSet spillSet,
context.getAllocator(),
baseHashTable,
buildBatch,
- probeBatch, false, 10,
+ probeBatch, false, false, 10,
spillSet,
0,
0,
@@ -209,7 +209,7 @@ public void run(SpillSet spillSet,
context.getAllocator(),
baseHashTable,
buildBatch,
- probeBatch, false, 10,
+ probeBatch, false, false, 10,
spillSet,
0,
0,
With regards,
Apache Git Services