You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:44:59 UTC

[drill] 07/13: DRILL-6592: Unnest record batch size is called too frequently

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit cad9aad12ff18a9315b8cce971e27c1b32c48079
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Fri Jul 6 16:23:51 2018 -0700

    DRILL-6592: Unnest record batch size is called too frequently
    
    closes #1376
---
 .../exec/physical/impl/unnest/UnnestRecordBatch.java | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 3ef547c..9c1e702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -49,6 +49,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
 
   private Unnest unnest;
+  private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
+                                        // sent. The next iteration, we need to make sure the record batch sizer
+                                        // is updated before we process the actual data.
   private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
                                         // to keep processing it. Kill may be called by a limit in a subquery that
                                         // requires us to stop processing thecurrent row, but not stop processing
@@ -180,6 +183,12 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       return nextState;
     }
 
+    if (hasNewSchema) {
+      memoryManager.update();
+      hasNewSchema = false;
+      return doWork();
+    }
+
     if (hasRemainder) {
       return doWork();
     }
@@ -194,7 +203,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       state = BatchState.NOT_FIRST;
       try {
         stats.startSetup();
-        hasRemainder = true; // next call to next will handle the actual data.
+        hasNewSchema = true; // next call to next will handle the actual data.
         logger.debug("First batch received");
         schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
                          // current field metadata for check in subsequent iterations
@@ -213,10 +222,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       container.zeroVectors();
       // Check if schema has changed
       if (lateral.getRecordIndex() == 0) {
-        boolean isNewSchema = schemaChanged();
-        stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
-        if (isNewSchema) {
-          hasRemainder = true;     // next call to next will handle the actual data.
+        hasNewSchema = schemaChanged();
+        stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+        if (hasNewSchema) {
           try {
             setupNewSchema();
           } catch (SchemaChangeException ex) {
@@ -229,6 +237,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
         }
         // else
         unnest.resetGroupIndex();
+        memoryManager.update();
       }
       return doWork();
     }
@@ -265,7 +274,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
 
   protected IterOutcome doWork() {
     Preconditions.checkNotNull(lateral);
-    memoryManager.update();
     unnest.setOutputCount(memoryManager.getOutputRowCount());
     final int incomingRecordCount = incoming.getRecordCount();
     final int currentRecord = lateral.getRecordIndex();