You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/08/02 22:23:20 UTC

[GitHub] parthchandra closed pull request #1413: DRILL-6657: Unnest reports one batch less than the actual number of b…

parthchandra closed pull request #1413: DRILL-6657: Unnest reports one batch less than the actual number of b…
URL: https://github.com/apache/drill/pull/1413
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e89144db59d..6204d37cfb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -55,9 +55,6 @@
   private IntVector rowIdVector; // vector to keep the implicit rowId column in
 
   private Unnest unnest = new UnnestImpl();
-  private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
-                                        // sent. The next iteration, we need to make sure the record batch sizer
-                                        // is updated before we process the actual data.
   private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
@@ -164,7 +161,7 @@ protected void killIncoming(boolean sendUpstream) {
     Preconditions.checkState(context.getExecutorState().isFailed() ||
       lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " +
       "Neither the LateralOutcome is STOP nor executor state is failed");
-    logger.debug("Kill received. Stopping all processing");
+      logger.debug("Kill received. Stopping all processing");
     state = BatchState.DONE;
     recordCount = 0;
     hasRemainder = false; // whatever the case, we need to stop processing the current row.
@@ -180,12 +177,6 @@ public IterOutcome innerNext() {
       return IterOutcome.NONE;
     }
 
-    if (hasNewSchema) {
-      memoryManager.update();
-      hasNewSchema = false;
-      return doWork();
-    }
-
     if (hasRemainder) {
       return doWork();
     }
@@ -200,12 +191,13 @@ public IterOutcome innerNext() {
       state = BatchState.NOT_FIRST;
       try {
         stats.startSetup();
-        hasNewSchema = true; // next call to next will handle the actual data.
         logger.debug("First batch received");
         schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
                          // current field metadata for check in subsequent iterations
         setupNewSchema();
         stats.batchReceived(0, incoming.getRecordCount(), true);
+        memoryManager.update();
+        hasRemainder = incoming.getRecordCount() > 0;
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
@@ -216,14 +208,18 @@ public IterOutcome innerNext() {
       }
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
+      Preconditions.checkState(incoming.getRecordCount() > 0,
+        "Incoming batch post buildSchema phase should never be empty for Unnest");
       container.zeroVectors();
       // Check if schema has changed
       if (lateral.getRecordIndex() == 0) {
-        hasNewSchema = schemaChanged();
+        boolean hasNewSchema = schemaChanged();
         stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
         if (hasNewSchema) {
           try {
             setupNewSchema();
+            hasRemainder = true;
+            memoryManager.update();
           } catch (SchemaChangeException ex) {
             kill(false);
             logger.error("Failure during query", ex);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services