You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/11/20 20:31:41 UTC

[drill] branch master updated: DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition

This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ecaed7  DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition
6ecaed7 is described below

commit 6ecaed7e76228bfefdd5dc5c94aff01c16d1e249
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Mon Nov 19 18:55:45 2018 -0800

    DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition
---
 .../exec/physical/impl/join/HashJoinBatch.java     | 54 +++++++++++++---------
 .../exec/server/options/SystemOptionManager.java   |  2 +-
 2 files changed, 32 insertions(+), 24 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c6181..88eadf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
-          // Get the next (previously) spilled partition to handle as incoming
-          HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
-
-          // Create a BUILD-side "incoming" out of the inner spill file of that partition
-          buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
-          // The above ctor call also got the first batch; need to update the outcome
-          rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
-
-          if (currSp.outerSpilledBatches > 0) {
-            // Create a PROBE-side "incoming" out of the outer spill file of that partition
-            probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+        if (!buildSideIsEmpty.booleanValue()) {
+          while (!spilledState.isEmpty()) {  // "while" is only used for skipping; see "continue" below
+
+            // Get the next (previously) spilled partition to handle as incoming
+            HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
+
+            // If the outer is empty (and it's not a right/full join) - try the next spilled partition
+            if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+              continue;
+            }
+
+            // Create a BUILD-side "incoming" out of the inner spill file of that partition
+            buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
             // The above ctor call also got the first batch; need to update the outcome
-            leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
-          } else {
-            probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
-            leftUpstream = IterOutcome.NONE;
-            hashJoinProbe.changeToFinalProbeState();
-          }
+            rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
+
+            if (currSp.outerSpilledBatches > 0) {
+              // Create a PROBE-side "incoming" out of the outer spill file of that partition
+              probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+              // The above ctor call also got the first batch; need to update the outcome
+              leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
+            } else {
+              probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
+              leftUpstream = IterOutcome.NONE;
+              hashJoinProbe.changeToFinalProbeState();
+            }
 
-          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
-          state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
+            spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+            state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
 
-          prefetchedBuild.setValue(false);
-          prefetchedProbe.setValue(false);
+            prefetchedBuild.setValue(false);
+            prefetchedProbe.setValue(false);
 
-          return innerNext(); // start processing the next spilled partition "recursively"
+            return innerNext(); // start processing the next spilled partition "recursively"
+          }
         }
 
       } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286..7897c3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
-      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),