You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/11/20 20:31:38 UTC

[GitHub] Ben-Zvi closed pull request #1546: DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition

Ben-Zvi closed pull request #1546: DRILL-6861: Hash-Join should not exit after an empty probe-side spilled partition
URL: https://github.com/apache/drill/pull/1546
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index f1c61816584..88eadf29115 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -614,33 +614,41 @@ public IterOutcome innerNext() {
         //
         //  (recursively) Handle the spilled partitions, if any
         //
-        if (!buildSideIsEmpty.booleanValue() && !spilledState.isEmpty()) {
-          // Get the next (previously) spilled partition to handle as incoming
-          HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
-
-          // Create a BUILD-side "incoming" out of the inner spill file of that partition
-          buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
-          // The above ctor call also got the first batch; need to update the outcome
-          rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
-
-          if (currSp.outerSpilledBatches > 0) {
-            // Create a PROBE-side "incoming" out of the outer spill file of that partition
-            probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+        if (!buildSideIsEmpty.booleanValue()) {
+          while (!spilledState.isEmpty()) {  // "while" is only used for skipping; see "continue" below
+
+            // Get the next (previously) spilled partition to handle as incoming
+            HashJoinSpilledPartition currSp = spilledState.getNextSpilledPartition();
+
+            // If the outer is empty (and it's not a right/full join) - try the next spilled partition
+            if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
+              continue;
+            }
+
+            // Create a BUILD-side "incoming" out of the inner spill file of that partition
+            buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
             // The above ctor call also got the first batch; need to update the outcome
-            leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
-          } else {
-            probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
-            leftUpstream = IterOutcome.NONE;
-            hashJoinProbe.changeToFinalProbeState();
-          }
+            rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
+
+            if (currSp.outerSpilledBatches > 0) {
+              // Create a PROBE-side "incoming" out of the outer spill file of that partition
+              probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
+              // The above ctor call also got the first batch; need to update the outcome
+              leftUpstream = ((SpilledRecordbatch) probeBatch).getInitialOutcome();
+            } else {
+              probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
+              leftUpstream = IterOutcome.NONE;
+              hashJoinProbe.changeToFinalProbeState();
+            }
 
-          spilledState.updateCycle(stats, currSp, spilledStateUpdater);
-          state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
+            spilledState.updateCycle(stats, currSp, spilledStateUpdater);
+            state = BatchState.FIRST;  // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
 
-          prefetchedBuild.setValue(false);
-          prefetchedProbe.setValue(false);
+            prefetchedBuild.setValue(false);
+            prefetchedProbe.setValue(false);
 
-          return innerNext(); // start processing the next spilled partition "recursively"
+            return innerNext(); // start processing the next spilled partition "recursively"
+          }
         }
 
       } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 87e5286a485..7897c3b0096 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -124,7 +124,7 @@
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR),
       new OptionDefinition(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR),
-      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
+      new OptionDefinition(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, false, true)),
       new OptionDefinition(ExecConstants.HASHJOIN_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashJoin
       new OptionDefinition(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER),
       new OptionDefinition(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services