You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/19 02:49:16 UTC

[GitHub] Ben-Zvi commented on a change in pull request #1384: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.

Ben-Zvi commented on a change in pull request #1384: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
URL: https://github.com/apache/drill/pull/1384#discussion_r203586629
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##########
 @@ -213,86 +219,185 @@ public int getRecordCount() {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    if (! prefetchFirstBatchFromBothSides()) {
-      return;
+    // We must first get the schemas from upstream operators before we can build
+    // our schema.
+    boolean validSchema = sniffNewSchemas();
+
+    if (validSchema) {
+      // We are able to construct a valid schema from the upstream data.
+      // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
+      state = BatchState.BUILD_SCHEMA;
+    } else {
+      // We were not able to build a valid schema, so we need to set our termination state.
+      final Optional<BatchState> batchStateOpt = getBatchStateTermination();
+      state = batchStateOpt.get(); // There should be a state here.
     }
 
+    // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+    // we still need to build a dummy schema. These code handles both cases for us.
+    setupOutputContainerSchema();
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
     // Initialize the hash join helper context
-    if (rightUpstream != IterOutcome.NONE) {
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      // We only need the hash tables if we have data on the build side.
       setupHashTable();
     }
-    setupOutputContainerSchema();
+
     try {
       hashJoinProbe = setupHashJoinProbe();
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException(e);
     }
-
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
   }
 
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
-    leftUpstream = sniffNonEmptyBatch(0, left);
-    rightUpstream = sniffNonEmptyBatch(1, right);
+    if (leftUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
+    }
+
+    if (rightUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
+    }
+
+    buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
+
+    final Optional<BatchState> batchStateOpt = getBatchStateTermination();
+
+    if (batchStateOpt.isPresent()) {
+      // We reached a termination state
+      state = batchStateOpt.get();
+
+      switch (state) {
+        case STOP:
+        case OUT_OF_MEMORY:
+          // Terminate processing now
+          return false;
+        case DONE:
+          // No more data but take operation to completion
+          return true;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      // For build side, use aggregate i.e. average row width across batches
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    // For build side, use aggregate i.e. average row width across batches
-    batchMemoryManager.update(LEFT_INDEX, 0);
-    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
-    logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-    logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      // Got our first batche(s)
+      state = BatchState.FIRST;
+      return true;
+    }
+  }
 
+  /**
+   * Checks if a termination state has been reached, and returns the appropriate termination state if it has been reached.
+   * @return The termination state if it has been reached. Otherwise empty.
+   */
+  private Optional<BatchState> getBatchStateTermination() {
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return false;
+      return Optional.of(BatchState.STOP);
     }
 
     if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return false;
+      return Optional.of(BatchState.OUT_OF_MEMORY);
     }
 
     if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
-      state = BatchState.DONE;
-      return false;
+      return Optional.of(BatchState.DONE);
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Sniffs all data necessary to construct a schema.
+   * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+   */
+  private boolean sniffNewSchemas() {
+    do {
+      // Ask for data until we get a valid result.
+      leftUpstream = next(LEFT_INDEX, probeBatch);
+    } while (leftUpstream == IterOutcome.NOT_YET);
+
+    switch (leftUpstream) {
+      case OK_NEW_SCHEMA:
+        probeSchema = probeBatch.getSchema();
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+      default:
+        // Termination condition
+    }
+
+    do {
+      // Ask for data until we get a valid result.
+      rightUpstream = next(RIGHT_INDEX, buildBatch);
+    } while (rightUpstream == IterOutcome.NOT_YET);
+
+    switch (rightUpstream) {
+      case OK_NEW_SCHEMA:
+        // We need to have the schema of the build side even when the build side is empty
+        rightSchema = buildBatch.getSchema();
+        break;
+      case OK:
+      case EMIT:
+        throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
+      default:
+        // Termination condition
+    }
+
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
 
 Review comment:
   Instead of "if" - move into the "switch" above.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services