You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/19 15:13:15 UTC

[GitHub] ilooner commented on a change in pull request #1384: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.

ilooner commented on a change in pull request #1384: DRILL-6606: Fixed bug in HashJoin that caused it not to return OK_NEW_SCHEMA in some cases.
URL: https://github.com/apache/drill/pull/1384#discussion_r203766032
 
 

 ##########
 File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##########
 @@ -213,86 +219,185 @@ public int getRecordCount() {
 
   @Override
   protected void buildSchema() throws SchemaChangeException {
-    if (! prefetchFirstBatchFromBothSides()) {
-      return;
+    // We must first get the schemas from upstream operators before we can build
+    // our schema.
+    boolean validSchema = sniffNewSchemas();
+
+    if (validSchema) {
+      // We are able to construct a valid schema from the upstream data.
+      // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
+      state = BatchState.BUILD_SCHEMA;
+    } else {
+      // We were not able to build a valid schema, so we need to set our termination state.
+      final Optional<BatchState> batchStateOpt = getBatchStateTermination();
+      state = batchStateOpt.get(); // There should be a state here.
     }
 
+    // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
+    // we still need to build a dummy schema. These code handles both cases for us.
+    setupOutputContainerSchema();
+    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
     // Initialize the hash join helper context
-    if (rightUpstream != IterOutcome.NONE) {
+    if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+      // We only need the hash tables if we have data on the build side.
       setupHashTable();
     }
-    setupOutputContainerSchema();
+
     try {
       hashJoinProbe = setupHashJoinProbe();
     } catch (IOException | ClassTransformationException e) {
       throw new SchemaChangeException(e);
     }
-
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
   }
 
   @Override
   protected boolean prefetchFirstBatchFromBothSides() {
-    leftUpstream = sniffNonEmptyBatch(0, left);
-    rightUpstream = sniffNonEmptyBatch(1, right);
+    if (leftUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
+    }
+
+    if (rightUpstream != IterOutcome.NONE) {
+      // We can only get data if there is data available
+      rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
+    }
+
+    buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
+
+    final Optional<BatchState> batchStateOpt = getBatchStateTermination();
+
+    if (batchStateOpt.isPresent()) {
+      // We reached a termination state
+      state = batchStateOpt.get();
+
+      switch (state) {
+        case STOP:
+        case OUT_OF_MEMORY:
+          // Terminate processing now
+          return false;
+        case DONE:
+          // No more data but take operation to completion
+          return true;
+        default:
+          throw new IllegalStateException();
+      }
+    } else {
+      // For build side, use aggregate i.e. average row width across batches
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
 
-    // For build side, use aggregate i.e. average row width across batches
-    batchMemoryManager.update(LEFT_INDEX, 0);
-    batchMemoryManager.update(RIGHT_INDEX, 0, true);
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
 
-    logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
-    logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+      // Got our first batche(s)
+      state = BatchState.FIRST;
+      return true;
+    }
+  }
 
+  /**
+   * Checks if a termination state has been reached, and returns the appropriate termination state if it has been reached.
+   * @return The termination state if it has been reached. Otherwise empty.
+   */
+  private Optional<BatchState> getBatchStateTermination() {
     if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
-      state = BatchState.STOP;
-      return false;
+      return Optional.of(BatchState.STOP);
     }
 
     if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
-      state = BatchState.OUT_OF_MEMORY;
-      return false;
+      return Optional.of(BatchState.OUT_OF_MEMORY);
     }
 
     if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
-      state = BatchState.DONE;
-      return false;
+      return Optional.of(BatchState.DONE);
+    }
+
+    return Optional.empty();
+  }
+
+  /**
+   * Sniffs all data necessary to construct a schema.
+   * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+   */
+  private boolean sniffNewSchemas() {
+    do {
+      // Ask for data until we get a valid result.
+      leftUpstream = next(LEFT_INDEX, probeBatch);
+    } while (leftUpstream == IterOutcome.NOT_YET);
 
 Review comment:
   True, but the contract is that any operator COULD return not yet. According to the javadoc, so I'd rather obey the contract.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services