You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/10/27 03:15:12 UTC

[GitHub] Ben-Zvi closed pull request #1480: DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty

Ben-Zvi closed pull request #1480: DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty
URL: https://github.com/apache/drill/pull/1480
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 368bb5dc91b..21dcdcf83e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
 
-
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@
  *   processed individually (that Build partition should be smaller than the original, hence likely fit whole into
  *   memory to allow probing; if not -- see below).
  *      Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- *   the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ *   the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
  *   read and divided into new partitions, which in turn may spill again (and again...).
  *   The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
  *   indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
+  private boolean joinIsLeftOrFull;
+  private boolean joinIsRightOrFull;
+  private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
 
   // Join conditions
   private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@
   private final Set<String> buildJoinColumns;
 
   // Fields used for partitioning
-
-  private long maxIncomingBatchSize;
   /**
    * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
@@ -264,6 +264,8 @@ protected void buildSchema() throws SchemaChangeException {
         buildSchema = right.getSchema();
         // position of the new "column" for keeping the hash values (after the real columns)
         rightHVColPosition = right.getContainer().getNumberOfColumns();
+        // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+        skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
         // We only need the hash tables if we have data on the build side.
         setupHashTable();
       }
@@ -447,12 +449,12 @@ public IterOutcome innerNext() {
 
       // Try to probe and project, or recursively handle a spilled partition
       if (!buildSideIsEmpty.booleanValue() ||  // If there are build-side rows
-        joinType != JoinRelType.INNER) {  // or if this is a left/full outer join
+          joinIsLeftOrFull) {  // or if this is a left/full outer join
 
         prefetchFirstProbeBatch();
 
         if (leftUpstream.isError() ||
-            ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) {
+            ( leftUpstream == NONE && ! joinIsRightOrFull )) {
           // A termination condition was reached while prefetching the first probe side data holding batch.
           // We need to terminate.
           return leftUpstream;
@@ -568,19 +570,7 @@ public IterOutcome innerNext() {
 
       } else {
         // Our build side is empty, we won't have any matches, clear the probe side
-        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-          for (final VectorWrapper<?> wrapper : probeBatch) {
-            wrapper.getValueVector().clear();
-          }
-          probeBatch.kill(true);
-          leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-            for (final VectorWrapper<?> wrapper : probeBatch) {
-              wrapper.getValueVector().clear();
-            }
-            leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          }
-        }
+        killAndDrainLeftUpstream();
       }
 
       // No more output records, clean up and return
@@ -596,10 +586,31 @@ public IterOutcome innerNext() {
     }
   }
 
+  /**
+   *  In case an upstream data is no longer needed, send a kill and flush any remaining batch
+   *
+   * @param batch probe or build batch
+   * @param upstream which upstream
+   * @param isLeft is it the left or right
+   */
+  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
+      batch.kill(true);
+      while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+        for (final VectorWrapper<?> wrapper : batch) {
+          wrapper.getValueVector().clear();
+        }
+        upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+      }
+  }
+  private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
+  private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
+
   private void setupHashTable() throws SchemaChangeException {
     final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
     conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
+    if ( skipHashTableBuild ) { return; }
+
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
 
@@ -819,6 +830,11 @@ public IterOutcome executeBuildPhase() throws SchemaChangeException {
       return null;
     }
 
+    if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
+      killAndDrainRightUpstream();
+      return null;
+    }
+
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
     boolean firstCycle = cycleNum == 0;
 
@@ -1013,7 +1029,7 @@ private void setupOutputContainerSchema() {
         final MajorType outputType;
         // If left or full outer join, then the output type must be nullable. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2197).
-        if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+        if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1034,7 +1050,7 @@ private void setupOutputContainerSchema() {
 
         // If right or full outer join then the output type should be optional. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
-        if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+        if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1074,6 +1090,8 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
     this.buildBatch = right;
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
+    joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == JoinRelType.FULL;
+    joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
     conditions = popConfig.getConditions();
     this.popConfig = popConfig;
     rightExpr = new ArrayList<>(conditions.size());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index e7fa4e6b57b..486fb1e1a99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -113,7 +113,7 @@ protected boolean prefetchFirstBatchFromBothSides() {
     return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
   }
 
-  /*
+  /**
    * Checks for the operator specific early terminal condition.
    * @return true if the further processing can stop.
    *         false if the further processing is needed.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 43abd8e705d..67789465a59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -103,7 +103,7 @@ public synchronized void enqueue(final RawFragmentBatch batch) throws IOExceptio
   @Override
   public void close() {
     if (!isTerminated() && context.getExecutorState().shouldContinue()) {
-      final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
+      final String msg = String.format("Cleanup before finished. %d out of %d streams have finished", completedStreams(), fragmentCount);
       throw  new IllegalStateException(msg);
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index 349a295114f..5beb7cbdd98 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -209,4 +209,47 @@ public void testHashJoinNoneOutcomeUninitRightSide() {
   public void testHashJoinNoneOutcomeUninitLeftSide() {
     testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
   }
+
+  /**
+   * Testing for DRILL-6755: No Hash Table is built when the first probe batch is NONE
+   */
+  @Test
+  public void testHashJoinWhenProbeIsNONE() {
+
+    inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE);
+
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.NONE);
+
+    // for the probe side input - use multiple batches (to check that they are all cleared/drained)
+    final List<VectorContainer> buildSideinputContainer = new ArrayList<>(5);
+    buildSideinputContainer.add(emptyInputRowSetRight.container());
+    buildSideinputContainer.add(nonEmptyInputRowSetRight.container());
+    RowSet.SingleRowSet secondInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build();
+    RowSet.SingleRowSet thirdInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build();
+    buildSideinputContainer.add(secondInputRowSetRight.container());
+    buildSideinputContainer.add(thirdInputRowSetRight.container());
+
+    final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, buildSideinputContainer, inputOutcomesRight, batchSchemaRight);
+    final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainerLeft, inputOutcomesLeft, batchSchemaLeft);
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    conditions.add(new JoinCondition(SqlKind.EQUALS.toString(), FieldReference.getWithQuotedRef("leftcol"), FieldReference.getWithQuotedRef("rightcol")));
+
+    HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER);
+
+    HashJoinBatch hjBatch = new HashJoinBatch(hjConf, operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight);
+
+    RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE);
+
+    secondInputRowSetRight.clear();
+    thirdInputRowSetRight.clear();
+    buildSideinputContainer.clear();
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services