You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/08/02 22:23:20 UTC

[drill] branch master updated: DRILL-6657: Unnest reports one batch less than the actual number of batches

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 419f51e  DRILL-6657: Unnest reports one batch less than the actual number of batches
419f51e is described below

commit 419f51e57b389e39a0c3c090ae0e8d34e1fb944c
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Tue Jul 31 15:41:52 2018 -0700

    DRILL-6657: Unnest reports one batch less than the actual number of batches
---
 .../exec/physical/impl/unnest/UnnestRecordBatch.java | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e89144d..6204d37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -55,9 +55,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   private IntVector rowIdVector; // vector to keep the implicit rowId column in
 
   private Unnest unnest = new UnnestImpl();
-  private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
-                                        // sent. The next iteration, we need to make sure the record batch sizer
-                                        // is updated before we process the actual data.
   private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
@@ -164,7 +161,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     Preconditions.checkState(context.getExecutorState().isFailed() ||
       lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " +
       "Neither the LateralOutcome is STOP nor executor state is failed");
-    logger.debug("Kill received. Stopping all processing");
+      logger.debug("Kill received. Stopping all processing");
     state = BatchState.DONE;
     recordCount = 0;
     hasRemainder = false; // whatever the case, we need to stop processing the current row.
@@ -180,12 +177,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       return IterOutcome.NONE;
     }
 
-    if (hasNewSchema) {
-      memoryManager.update();
-      hasNewSchema = false;
-      return doWork();
-    }
-
     if (hasRemainder) {
       return doWork();
     }
@@ -200,12 +191,13 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       state = BatchState.NOT_FIRST;
       try {
         stats.startSetup();
-        hasNewSchema = true; // next call to next will handle the actual data.
         logger.debug("First batch received");
         schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
                          // current field metadata for check in subsequent iterations
         setupNewSchema();
         stats.batchReceived(0, incoming.getRecordCount(), true);
+        memoryManager.update();
+        hasRemainder = incoming.getRecordCount() > 0;
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
@@ -216,14 +208,18 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       }
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
+      Preconditions.checkState(incoming.getRecordCount() > 0,
+        "Incoming batch post buildSchema phase should never be empty for Unnest");
       container.zeroVectors();
       // Check if schema has changed
       if (lateral.getRecordIndex() == 0) {
-        hasNewSchema = schemaChanged();
+        boolean hasNewSchema = schemaChanged();
         stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
         if (hasNewSchema) {
           try {
             setupNewSchema();
+            hasRemainder = true;
+            memoryManager.update();
           } catch (SchemaChangeException ex) {
             kill(false);
             logger.error("Failure during query", ex);