You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/21 00:23:29 UTC

[drill] branch master updated: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 91b5e4d  DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
91b5e4d is described below

commit 91b5e4d0d84dae2c6af19f82df11bb4c493a6ce8
Author: Timothy Farkas <ti...@gmail.com>
AuthorDate: Fri Jul 20 17:23:26 2018 -0700

    DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
    
    closes #1384
---
 .../exec/physical/impl/join/HashJoinBatch.java     | 175 +++++++++++++++------
 .../physical/impl/join/TestHashJoinAdvanced.java   |  37 +++++
 2 files changed, 163 insertions(+), 49 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 345d182..d4d4f92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -149,6 +149,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   private RecordBatch buildBatch;
   private RecordBatch probeBatch;
 
+  /**
+   * Flag indicating whether or not the first data holding batch needs to be fetched.
+   */
+  private boolean prefetched;
+
   // For handling spilling
   private SpillSet spillSet;
   HashJoinPOP popConfig;
@@ -174,7 +179,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
   /**
    * Queue of spilled partitions to process.
    */
-  private ArrayList<HJSpilledPartition> spilledPartitionsList;
+  private ArrayList<HJSpilledPartition> spilledPartitionsList = new ArrayList<>();
   private HJSpilledPartition spilledInners[]; // for the outer to find the partition
 
   public enum Metric implements MetricDef {
@@ -213,86 +218,145 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    if (! prefetchFirstBatchFromBothSides()) {
-      return;
+    // We must first get the schemas from upstream operators before we can build
+    // our schema.
+    boolean validSchema = sniffNewSchemas();
+
+    if (validSchema) {
+      // We are able to construct a valid schema from the upstream data.
+      // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
+      state = BatchState.BUILD_SCHEMA;
+    } else {
+      verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
     }
 
+    // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+    // we still need to build a dummy schema. These code handles both cases for us.
+    setupOutputContainerSchema();
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
     // Initialize the hash join helper context
-    if (rightUpstream != IterOutcome.NONE) {
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      // We only need the hash tables if we have data on the build side.
       setupHashTable();
     }
-    setupOutputContainerSchema();
+
     try {
       hashJoinProbe = setupHashJoinProbe();
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException(e);
     }
-
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
   }
 
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
-    leftUpstream = sniffNonEmptyBatch(0, left);
-    rightUpstream = sniffNonEmptyBatch(1, right);
+    if (leftUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
+    }
+
+    if (rightUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
+    }
+
+    buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
 
-    // For build side, use aggregate i.e. average row width across batches
-    batchMemoryManager.update(LEFT_INDEX, 0);
-    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+    if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
+      // For build side, use aggregate i.e. average row width across batches
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-    logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
-    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
+      // Got our first batche(s)
+      state = BatchState.FIRST;
+      return true;
+    } else {
       return false;
     }
+  }
 
-    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return false;
+  /**
+   * Sniffs all data necessary to construct a schema.
+   * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+   */
+  private boolean sniffNewSchemas() {
+    do {
+      // Ask for data until we get a valid result.
+      leftUpstream = next(LEFT_INDEX, left);
+    } while (leftUpstream == IterOutcome.NOT_YET);
+
+    boolean isValidLeft = false;
+
+    switch (leftUpstream) {
+      case OK_NEW_SCHEMA:
+        probeSchema = probeBatch.getSchema();
+      case NONE:
+        isValidLeft = true;
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+      default:
+        // Termination condition
     }
 
-    if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
-      state = BatchState.DONE;
-      return false;
+    do {
+      // Ask for data until we get a valid result.
+      rightUpstream = next(RIGHT_INDEX, right);
+    } while (rightUpstream == IterOutcome.NOT_YET);
+
+    boolean isValidRight = false;
+
+    switch (rightUpstream) {
+      case OK_NEW_SCHEMA:
+        // We need to have the schema of the build side even when the build side is empty
+        rightSchema = buildBatch.getSchema();
+        // position of the new "column" for keeping the hash values (after the real columns)
+        rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+      case NONE:
+        isValidRight = true;
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+      default:
+        // Termination condition
     }
 
-    state = BatchState.FIRST;  // Got our first batches on both sides
-    return true;
+    // Left and right sides must return a valid response and both sides cannot be NONE.
+    return (isValidLeft && isValidRight) &&
+      (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
   }
 
   /**
    * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
    * fetches the first non-empty batch from the left or right side.
+   * @param curr The current outcome.
    * @param inputIndex Index specifying whether to work with the left or right input.
    * @param recordBatch The left or right record batch.
    * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
    */
-  private IterOutcome sniffNonEmptyBatch(int inputIndex, RecordBatch recordBatch) {
+  private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
     while (true) {
-      IterOutcome outcome = next(inputIndex, recordBatch);
+      if (recordBatch.getRecordCount() != 0) {
+        return curr;
+      }
 
-      switch (outcome) {
-        case OK_NEW_SCHEMA:
-          if ( inputIndex == 0 ) {
-            // Indicate that a schema was seen (in case probe side is empty)
-            probeSchema = probeBatch.getSchema();
-          } else {
-            // We need to have the schema of the build side even when the build side is empty
-            rightSchema = buildBatch.getSchema();
-            // position of the new "column" for keeping the hash values (after the real columns)
-            rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
-            // new schema can also contain records
-          }
+      curr = next(inputIndex, recordBatch);
+
+      switch (curr) {
         case OK:
-          if (recordBatch.getRecordCount() == 0) {
-            continue;
-          }
-          // We got a non empty batch
+          // We got a data batch
+          break;
+        case NOT_YET:
+          // We need to try again
+          break;
         default:
           // Other cases termination conditions
-          return outcome;
+          return curr;
       }
     }
   }
@@ -317,7 +381,25 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
 
   @Override
   public IterOutcome innerNext() {
-    // In case incoming was killed before, just cleanup and return
+    if (!prefetched) {
+      // If we didn't retrieve our first data hold batch, we need to do it now.
+      prefetched = true;
+      prefetchFirstBatchFromBothSides();
+
+      // Handle emitting the correct outcome for termination conditions
+      // Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome.
+      switch (state) {
+        case DONE:
+          return IterOutcome.NONE;
+        case STOP:
+          return IterOutcome.STOP;
+        case OUT_OF_MEMORY:
+          return IterOutcome.OUT_OF_MEMORY;
+        default:
+          // No termination condition so continue processing.
+      }
+    }
+
     if ( wasKilled ) {
       this.cleanup();
       super.close();
@@ -504,13 +586,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
     bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
 
-    // Create the FIFO list of spilled partitions (pairs - inner/outer)
-    spilledPartitionsList = new ArrayList<>();
-
     // Create array for the partitions
     partitions = new HashPartition[numPartitions];
-
-    buildSideIsEmpty = false;
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
index 6893e8e..e093632 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
@@ -19,9 +19,16 @@ package org.apache.drill.exec.physical.impl.join;
 
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -29,6 +36,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.regex.Pattern;
 
 @Category(OperatorTest.class)
@@ -206,4 +214,33 @@ public class TestHashJoinAdvanced extends JoinTestBase {
     final Pattern sortHashJoinPattern = Pattern.compile(".*Sort.*HashJoin", Pattern.DOTALL);
     testPlanMatchingPatterns(query, new Pattern[]{sortHashJoinPattern}, null);
   }
+
+  @Test // DRILL-6606
+  public void testJoinLimit0Schema() throws Exception {
+    String query = "SELECT l.l_quantity, l.l_shipdate, o.o_custkey\n" +
+      "FROM (SELECT * FROM cp.`tpch/lineitem.parquet` LIMIT 0) l\n" +
+      "    JOIN (SELECT * FROM cp.`tpch/orders.parquet` LIMIT 0) o \n" +
+      "    ON l.l_orderkey = o.o_orderkey\n";
+    final List<QueryDataBatch> dataBatches = client.runQuery(UserBitShared.QueryType.SQL, query);
+
+    Assert.assertEquals(1, dataBatches.size());
+
+    final QueryDataBatch queryDataBatch = dataBatches.get(0);
+    final RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+
+    try {
+      batchLoader.load(queryDataBatch.getHeader().getDef(), queryDataBatch.getData());
+
+      final BatchSchema actualSchema = batchLoader.getSchema();
+      final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("l_quantity", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+        .add("l_shipdate", TypeProtos.MinorType.DATE, TypeProtos.DataMode.REQUIRED)
+        .add("o_custkey", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+        .build();
+
+      Assert.assertTrue(expectedSchema.isEquivalent(actualSchema));
+    } finally {
+      batchLoader.clear();
+    }
+  }
 }