You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/21 00:23:28 UTC

[GitHub] sohami closed pull request #1384: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.

sohami closed pull request #1384: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
URL: https://github.com/apache/drill/pull/1384
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 345d18200da..d4d4f927e3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -149,6 +149,11 @@
   private RecordBatch buildBatch;
   private RecordBatch probeBatch;
 
+  /**
+   * Flag indicating whether or not the first data holding batch needs to be fetched.
+   */
+  private boolean prefetched;
+
   // For handling spilling
   private SpillSet spillSet;
   HashJoinPOP popConfig;
@@ -174,7 +179,7 @@
   /**
    * Queue of spilled partitions to process.
    */
-  private ArrayList<HJSpilledPartition> spilledPartitionsList;
+  private ArrayList<HJSpilledPartition> spilledPartitionsList = new ArrayList<>();
   private HJSpilledPartition spilledInners[]; // for the outer to find the partition
 
   public enum Metric implements MetricDef {
@@ -213,86 +218,145 @@ public int getRecordCount() {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    if (! prefetchFirstBatchFromBothSides()) {
-      return;
+    // We must first get the schemas from upstream operators before we can build
+    // our schema.
+    boolean validSchema = sniffNewSchemas();
+
+    if (validSchema) {
+      // We are able to construct a valid schema from the upstream data.
+      // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
+      state = BatchState.BUILD_SCHEMA;
+    } else {
+      verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
     }
 
+    // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+    // we still need to build a dummy schema. These code handles both cases for us.
+    setupOutputContainerSchema();
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
     // Initialize the hash join helper context
-    if (rightUpstream != IterOutcome.NONE) {
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      // We only need the hash tables if we have data on the build side.
       setupHashTable();
     }
-    setupOutputContainerSchema();
+
     try {
       hashJoinProbe = setupHashJoinProbe();
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException(e);
     }
-
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
   }
 
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
-    leftUpstream = sniffNonEmptyBatch(0, left);
-    rightUpstream = sniffNonEmptyBatch(1, right);
+    if (leftUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
+    }
+
+    if (rightUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
+    }
+
+    buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
 
-    // For build side, use aggregate i.e. average row width across batches
-    batchMemoryManager.update(LEFT_INDEX, 0);
-    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+    if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
+      // For build side, use aggregate i.e. average row width across batches
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-    logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
-    if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
+      // Got our first batche(s)
+      state = BatchState.FIRST;
+      return true;
+    } else {
       return false;
     }
+  }
 
-    if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return false;
+  /**
+   * Sniffs all data necessary to construct a schema.
+   * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+   */
+  private boolean sniffNewSchemas() {
+    do {
+      // Ask for data until we get a valid result.
+      leftUpstream = next(LEFT_INDEX, left);
+    } while (leftUpstream == IterOutcome.NOT_YET);
+
+    boolean isValidLeft = false;
+
+    switch (leftUpstream) {
+      case OK_NEW_SCHEMA:
+        probeSchema = probeBatch.getSchema();
+      case NONE:
+        isValidLeft = true;
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+      default:
+        // Termination condition
     }
 
-    if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
-      state = BatchState.DONE;
-      return false;
+    do {
+      // Ask for data until we get a valid result.
+      rightUpstream = next(RIGHT_INDEX, right);
+    } while (rightUpstream == IterOutcome.NOT_YET);
+
+    boolean isValidRight = false;
+
+    switch (rightUpstream) {
+      case OK_NEW_SCHEMA:
+        // We need to have the schema of the build side even when the build side is empty
+        rightSchema = buildBatch.getSchema();
+        // position of the new "column" for keeping the hash values (after the real columns)
+        rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+      case NONE:
+        isValidRight = true;
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+      default:
+        // Termination condition
     }
 
-    state = BatchState.FIRST;  // Got our first batches on both sides
-    return true;
+    // Left and right sides must return a valid response and both sides cannot be NONE.
+    return (isValidLeft && isValidRight) &&
+      (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
   }
 
   /**
    * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
    * fetches the first non-empty batch from the left or right side.
+   * @param curr The current outcome.
    * @param inputIndex Index specifying whether to work with the left or right input.
    * @param recordBatch The left or right record batch.
    * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
    */
-  private IterOutcome sniffNonEmptyBatch(int inputIndex, RecordBatch recordBatch) {
+  private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
     while (true) {
-      IterOutcome outcome = next(inputIndex, recordBatch);
+      if (recordBatch.getRecordCount() != 0) {
+        return curr;
+      }
 
-      switch (outcome) {
-        case OK_NEW_SCHEMA:
-          if ( inputIndex == 0 ) {
-            // Indicate that a schema was seen (in case probe side is empty)
-            probeSchema = probeBatch.getSchema();
-          } else {
-            // We need to have the schema of the build side even when the build side is empty
-            rightSchema = buildBatch.getSchema();
-            // position of the new "column" for keeping the hash values (after the real columns)
-            rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
-            // new schema can also contain records
-          }
+      curr = next(inputIndex, recordBatch);
+
+      switch (curr) {
         case OK:
-          if (recordBatch.getRecordCount() == 0) {
-            continue;
-          }
-          // We got a non empty batch
+          // We got a data batch
+          break;
+        case NOT_YET:
+          // We need to try again
+          break;
         default:
           // Other cases termination conditions
-          return outcome;
+          return curr;
       }
     }
   }
@@ -317,7 +381,25 @@ public HashJoinMemoryCalculator getCalculatorImpl() {
 
   @Override
   public IterOutcome innerNext() {
-    // In case incoming was killed before, just cleanup and return
+    if (!prefetched) {
+      // If we didn't retrieve our first data hold batch, we need to do it now.
+      prefetched = true;
+      prefetchFirstBatchFromBothSides();
+
+      // Handle emitting the correct outcome for termination conditions
+      // Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome.
+      switch (state) {
+        case DONE:
+          return IterOutcome.NONE;
+        case STOP:
+          return IterOutcome.STOP;
+        case OUT_OF_MEMORY:
+          return IterOutcome.OUT_OF_MEMORY;
+        default:
+          // No termination condition so continue processing.
+      }
+    }
+
     if ( wasKilled ) {
       this.cleanup();
       super.close();
@@ -504,13 +586,8 @@ private void delayedSetup() {
     partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
     bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
 
-    // Create the FIFO list of spilled partitions (pairs - inner/outer)
-    spilledPartitionsList = new ArrayList<>();
-
     // Create array for the partitions
     partitions = new HashPartition[numPartitions];
-
-    buildSideIsEmpty = false;
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
index 6893e8e2a8e..e0936329c25 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinAdvanced.java
@@ -19,9 +19,16 @@
 
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -29,6 +36,7 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.nio.file.Paths;
+import java.util.List;
 import java.util.regex.Pattern;
 
 @Category(OperatorTest.class)
@@ -206,4 +214,33 @@ public void testJoinOrdering() throws Exception {
     final Pattern sortHashJoinPattern = Pattern.compile(".*Sort.*HashJoin", Pattern.DOTALL);
     testPlanMatchingPatterns(query, new Pattern[]{sortHashJoinPattern}, null);
   }
+
+  @Test // DRILL-6606
+  public void testJoinLimit0Schema() throws Exception {
+    String query = "SELECT l.l_quantity, l.l_shipdate, o.o_custkey\n" +
+      "FROM (SELECT * FROM cp.`tpch/lineitem.parquet` LIMIT 0) l\n" +
+      "    JOIN (SELECT * FROM cp.`tpch/orders.parquet` LIMIT 0) o \n" +
+      "    ON l.l_orderkey = o.o_orderkey\n";
+    final List<QueryDataBatch> dataBatches = client.runQuery(UserBitShared.QueryType.SQL, query);
+
+    Assert.assertEquals(1, dataBatches.size());
+
+    final QueryDataBatch queryDataBatch = dataBatches.get(0);
+    final RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
+
+    try {
+      batchLoader.load(queryDataBatch.getHeader().getDef(), queryDataBatch.getData());
+
+      final BatchSchema actualSchema = batchLoader.getSchema();
+      final BatchSchema expectedSchema = new SchemaBuilder()
+        .add("l_quantity", TypeProtos.MinorType.FLOAT8, TypeProtos.DataMode.REQUIRED)
+        .add("l_shipdate", TypeProtos.MinorType.DATE, TypeProtos.DataMode.REQUIRED)
+        .add("o_custkey", TypeProtos.MinorType.INT, TypeProtos.DataMode.REQUIRED)
+        .build();
+
+      Assert.assertTrue(expectedSchema.isEquivalent(actualSchema));
+    } finally {
+      batchLoader.clear();
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services