You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/11/20 20:31:41 UTC
[drill] branch master updated: DRILL-6861: Hash-Join should not
exit after an empty probe-side spilled partition
This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 6ecaed7 DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition
6ecaed7 is described below
commit 6ecaed7e76228bfefdd5dc5c94aff01c16d1e249
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Mon Nov 19 18:55:45 2018 -0800
DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition
---
.../exec/physical/impl/join/HashJoinBatch.java | 54 +++++++++++++---------
.../exec/server/options/SystemOptionManager.java | 2 +-
2 files changed, 32 insertions(+), 24 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c6181..88eadf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
//
// (recursively) Handle the spilled partitions, if any
//
- if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
- // Get the next (previously) spilled partition to handle as incoming
- HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
-
- // Create a BUILD-side "incoming" out of the inner spill file of that partition
- buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
- // The above ctor call also got the first batch; need to update the outcome
- rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
-
- if (currSp.outerSpilledBatches > 0) {
- // Create a PROBE-side "incoming" out of the outer spill file of that partition
- probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ if (!buildSideIsEmpty.booleanValue()) {
+ while (!spilledState.isEmpty()) { // "while" is only used for skipping; see "continue" below
+
+ // Get the next (previously) spilled partition to handle as incoming
+ HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
+
+ // If the outer is empty (and it's not a right/full join) - try the next spilled partition
+ if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+ continue;
+ }
+
+ // Create a BUILD-side "incoming" out of the inner spill file of that partition
+ buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
- leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
- } else {
- probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
- leftUpstream = IterOutcome.NONE;
- hashJoinProbe.changeToFinalProbeState();
- }
+ rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
+
+ if (currSp.outerSpilledBatches > 0) {
+ // Create a PROBE-side "incoming" out of the outer spill file of that partition
+ probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+ // The above ctor call also got the first batch; need to update the outcome
+ leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
+ } else {
+ probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
+ leftUpstream = IterOutcome.NONE;
+ hashJoinProbe.changeToFinalProbeState();
+ }
- spilledState.updateCycle(stats, currSp, spilledStateUpdater);
- state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
+ spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+ state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
- prefetchedBuild.setValue(false);
- prefetchedProbe.setValue(false);
+ prefetchedBuild.setValue(false);
+ prefetchedProbe.setValue(false);
- return innerNext(); // start processing the next spilled partition "recursively"
+ return innerNext(); // start processing the next spilled partition "recursively"
+ }
}
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286..7897c3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
- new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+ new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin
new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),