You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/03 20:57:43 UTC

[drill] branch master updated: DRILL-6576: Unnest reports incoming record counts incorrectly

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 62aadda  DRILL-6576: Unnest reports incoming record counts incorrectly
62aadda is described below

commit 62aadda83c68cb3ec06af38336df969958620aa0
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Thu Jun 7 13:30:01 2018 -0700

    DRILL-6576: Unnest reports incoming record counts incorrectly
    
    This closes #1362
---
 .../exec/physical/impl/unnest/UnnestImpl.java      |  2 +-
 .../physical/impl/unnest/UnnestRecordBatch.java    | 41 +++++++++++-----------
 2 files changed, 21 insertions(+), 22 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 06713a5..ffc64f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -103,7 +103,7 @@ public class UnnestImpl implements Unnest {
     innerValueIndex += count;
     return count;
 
-    }
+  }
 
   @Override
   public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e985c4d..3ef547c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -199,6 +199,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
         schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
                          // current field metadata for check in subsequent iterations
         setupNewSchema();
+        stats.batchReceived(0, incoming.getRecordCount(), true);
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
@@ -207,32 +208,28 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
       } finally {
         stats.stopSetup();
       }
-      // since we never called next on an upstream operator, incoming stats are
-      // not updated. update input stats explicitly.
-      stats.batchReceived(0, incoming.getRecordCount(), true);
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
-      assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
       container.zeroVectors();
-
       // Check if schema has changed
-      if (lateral.getRecordIndex() == 0 && schemaChanged()) {
-        hasRemainder = true;     // next call to next will handle the actual data.
-        try {
-          setupNewSchema();
-        } catch (SchemaChangeException ex) {
-          kill(false);
-          logger.error("Failure during query", ex);
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
-        }
-        stats.batchReceived(0, incoming.getRecordCount(), true);
-        return OK_NEW_SCHEMA;
-      }
       if (lateral.getRecordIndex() == 0) {
+        boolean isNewSchema = schemaChanged();
+        stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+        if (isNewSchema) {
+          hasRemainder = true;     // next call to next will handle the actual data.
+          try {
+            setupNewSchema();
+          } catch (SchemaChangeException ex) {
+            kill(false);
+            logger.error("Failure during query", ex);
+            context.getExecutorState().fail(ex);
+            return IterOutcome.STOP;
+          }
+          return OK_NEW_SCHEMA;
+        }
+        // else
         unnest.resetGroupIndex();
       }
-      stats.batchReceived(0, incoming.getRecordCount(), false);
       return doWork();
     }
 
@@ -243,7 +240,8 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     return this.container;
   }
 
-  @SuppressWarnings("resource") private void setUnnestVector() {
+  @SuppressWarnings("resource")
+  private void setUnnestVector() {
     final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
     final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
     final RepeatedValueVector vector;
@@ -347,7 +345,8 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
     return tp;
   }
 
-  @Override protected boolean setupNewSchema() throws SchemaChangeException {
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
     Preconditions.checkNotNull(lateral);
     container.clear();
     recordCount = 0;