You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/10/01 12:35:40 UTC

[drill] 02/02: DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 3b1ae159b94ef7c1d67ddde474c75d5558d3e50a
Author: Ben-Zvi <bb...@mapr.com>
AuthorDate: Tue Sep 25 19:07:10 2018 -0700

    DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty
    
    - Preparations and cleanup for DRILL-6755
    
    clsoes #1480
---
 .../exec/physical/impl/join/HashJoinBatch.java     | 60 ++++++++++++++--------
 .../exec/record/AbstractBinaryRecordBatch.java     |  2 +-
 .../drill/exec/work/batch/BaseRawBatchBuffer.java  |  2 +-
 .../physical/impl/join/TestHashJoinOutcome.java    | 43 ++++++++++++++++
 4 files changed, 84 insertions(+), 23 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 89ab8d4..658f03a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@ import org.apache.drill.exec.work.filter.BloomFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterDef;
 import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
 
-
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
  *   processed individually (that Build partition should be smaller than the original, hence likely fit whole into
  *   memory to allow probing; if not -- see below).
  *      Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- *   the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ *   the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
  *   read and divided into new partitions, which in turn may spill again (and again...).
  *   The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
  *   indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private final JoinRelType joinType;
+  private boolean joinIsLeftOrFull;
+  private boolean joinIsRightOrFull;
+  private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
 
   // Join conditions
   private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   private final Set<String> buildJoinColumns;
 
   // Fields used for partitioning
-
-  private long maxIncomingBatchSize;
   /**
    * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
    */
@@ -264,6 +264,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         buildSchema = right.getSchema();
         // position of the new "column" for keeping the hash values (after the real columns)
         rightHVColPosition = right.getContainer().getNumberOfColumns();
+        // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+        skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
         // We only need the hash tables if we have data on the build side.
         setupHashTable();
       }
@@ -447,12 +449,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
       // Try to probe and project, or recursively handle a spilled partition
       if (!buildSideIsEmpty.booleanValue() ||  // If there are build-side rows
-        joinType != JoinRelType.INNER) {  // or if this is a left/full outer join
+          joinIsLeftOrFull) {  // or if this is a left/full outer join
 
         prefetchFirstProbeBatch();
 
         if (leftUpstream.isError() ||
-            ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) {
+            ( leftUpstream == NONE && ! joinIsRightOrFull )) {
           // A termination condition was reached while prefetching the first probe side data holding batch.
           // We need to terminate.
           return leftUpstream;
@@ -568,19 +570,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
       } else {
         // Our build side is empty, we won't have any matches, clear the probe side
-        if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-          for (final VectorWrapper<?> wrapper : probeBatch) {
-            wrapper.getValueVector().clear();
-          }
-          probeBatch.kill(true);
-          leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
-            for (final VectorWrapper<?> wrapper : probeBatch) {
-              wrapper.getValueVector().clear();
-            }
-            leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-          }
-        }
+        killAndDrainLeftUpstream();
       }
 
       // No more output records, clean up and return
@@ -596,10 +586,31 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     }
   }
 
+  /**
+   *  In case an upstream data is no longer needed, send a kill and flush any remaining batch
+   *
+   * @param batch probe or build batch
+   * @param upstream which upstream
+   * @param isLeft is it the left or right
+   */
+  private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
+      batch.kill(true);
+      while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+        for (final VectorWrapper<?> wrapper : batch) {
+          wrapper.getValueVector().clear();
+        }
+        upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+      }
+  }
+  private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
+  private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
+
   private void setupHashTable() throws SchemaChangeException {
     final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
     conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
 
+    if ( skipHashTableBuild ) { return; }
+
     // Setup the hash table configuration object
     List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
 
@@ -819,6 +830,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
       return null;
     }
 
+    if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
+      killAndDrainRightUpstream();
+      return null;
+    }
+
     HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
     boolean firstCycle = cycleNum == 0;
 
@@ -1013,7 +1029,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
         final MajorType outputType;
         // If left or full outer join, then the output type must be nullable. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2197).
-        if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+        if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1034,7 +1050,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
         // If right or full outer join then the output type should be optional. However, map types are
         // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
-        if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+        if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
             && inputType.getMinorType() != TypeProtos.MinorType.MAP) {
           outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
         } else {
@@ -1074,6 +1090,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     this.buildBatch = right;
     this.probeBatch = left;
     joinType = popConfig.getJoinType();
+    joinIsLeftOrFull  = joinType == JoinRelType.LEFT  || joinType == JoinRelType.FULL;
+    joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
     conditions = popConfig.getConditions();
     this.popConfig = popConfig;
     rightExpr = new ArrayList<>(conditions.size());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index e7fa4e6..486fb1e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -113,7 +113,7 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte
     return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
   }
 
-  /*
+  /**
    * Checks for the operator specific early terminal condition.
    * @return true if the further processing can stop.
    *         false if the further processing is needed.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 6d77d63..5487d95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -105,7 +105,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
   @Override
   public void close() {
     if (!isTerminated() && context.getExecutorState().shouldContinue()) {
-      final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
+      final String msg = String.format("Cleanup before finished. %d out of %d streams have finished", completedStreams(), fragmentCount);
       throw  new IllegalStateException(msg);
     }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index 349a295..5beb7cb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -209,4 +209,47 @@ public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
   public void testHashJoinNoneOutcomeUninitLeftSide() {
     testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
   }
+
+  /**
+   * Testing for DRILL-6755: No Hash Table is built when the first probe batch is NONE
+   */
+  @Test
+  public void testHashJoinWhenProbeIsNONE() {
+
+    inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE);
+
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.OK);
+    inputOutcomesRight.add(RecordBatch.IterOutcome.NONE);
+
+    // for the probe side input - use multiple batches (to check that they are all cleared/drained)
+    final List<VectorContainer> buildSideinputContainer = new ArrayList<>(5);
+    buildSideinputContainer.add(emptyInputRowSetRight.container());
+    buildSideinputContainer.add(nonEmptyInputRowSetRight.container());
+    RowSet.SingleRowSet secondInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build();
+    RowSet.SingleRowSet thirdInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build();
+    buildSideinputContainer.add(secondInputRowSetRight.container());
+    buildSideinputContainer.add(thirdInputRowSetRight.container());
+
+    final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, buildSideinputContainer, inputOutcomesRight, batchSchemaRight);
+    final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainerLeft, inputOutcomesLeft, batchSchemaLeft);
+
+    List<JoinCondition> conditions = Lists.newArrayList();
+
+    conditions.add(new JoinCondition(SqlKind.EQUALS.toString(), FieldReference.getWithQuotedRef("leftcol"), FieldReference.getWithQuotedRef("rightcol")));
+
+    HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER);
+
+    HashJoinBatch hjBatch = new HashJoinBatch(hjConf, operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight);
+
+    RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    gotOutcome = hjBatch.next();
+    assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE);
+
+    secondInputRowSetRight.clear();
+    thirdInputRowSetRight.clear();
+    buildSideinputContainer.clear();
+  }
 }