You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/03 20:58:01 UTC

[GitHub] parthchandra closed pull request #1362: DRILL-6576: Unnest reports incoming record counts incorrectly

parthchandra closed pull request #1362: DRILL-6576: Unnest reports incoming record counts incorrectly
URL: https://github.com/apache/drill/pull/1362
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 06713a5164..ffc64f9237 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -103,7 +103,7 @@ public final int unnestRecords(final int recordCount) {
     innerValueIndex += count;
     return count;
 
-    }
+  }
 
   @Override
   public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e985c4defe..bc01a70477 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -199,6 +199,7 @@ public IterOutcome innerNext() {
         schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
                          // current field metadata for check in subsequent iterations
         setupNewSchema();
+        stats.batchReceived(0, incoming.getRecordCount(), true);
       } catch (SchemaChangeException ex) {
         kill(false);
         logger.error("Failure during query", ex);
@@ -207,32 +208,30 @@ public IterOutcome innerNext() {
       } finally {
         stats.stopSetup();
       }
-      // since we never called next on an upstream operator, incoming stats are
-      // not updated. update input stats explicitly.
-      stats.batchReceived(0, incoming.getRecordCount(), true);
       return IterOutcome.OK_NEW_SCHEMA;
     } else {
       assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
       container.zeroVectors();
-
       // Check if schema has changed
-      if (lateral.getRecordIndex() == 0 && schemaChanged()) {
-        hasRemainder = true;     // next call to next will handle the actual data.
-        try {
-          setupNewSchema();
-        } catch (SchemaChangeException ex) {
-          kill(false);
-          logger.error("Failure during query", ex);
-          context.getExecutorState().fail(ex);
-          return IterOutcome.STOP;
-        }
-        stats.batchReceived(0, incoming.getRecordCount(), true);
-        return OK_NEW_SCHEMA;
-      }
       if (lateral.getRecordIndex() == 0) {
-        unnest.resetGroupIndex();
+        boolean isNewSchema = schemaChanged();
+        if (isNewSchema) {
+          hasRemainder = true;     // next call to next will handle the actual data.
+          stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+          try {
+            setupNewSchema();
+          } catch (SchemaChangeException ex) {
+            kill(false);
+            logger.error("Failure during query", ex);
+            context.getExecutorState().fail(ex);
+            return IterOutcome.STOP;
+          }
+          return OK_NEW_SCHEMA;
+        } else {
+          unnest.resetGroupIndex();
+          stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+        }
       }
-      stats.batchReceived(0, incoming.getRecordCount(), false);
       return doWork();
     }
 
@@ -243,7 +242,8 @@ public VectorContainer getOutgoingContainer() {
     return this.container;
   }
 
-  @SuppressWarnings("resource") private void setUnnestVector() {
+  @SuppressWarnings("resource")
+  private void setUnnestVector() {
     final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
     final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
     final RepeatedValueVector vector;
@@ -347,7 +347,8 @@ protected IterOutcome doWork() {
     return tp;
   }
 
-  @Override protected boolean setupNewSchema() throws SchemaChangeException {
+  @Override
+  protected boolean setupNewSchema() throws SchemaChangeException {
     Preconditions.checkNotNull(lateral);
     container.clear();
     recordCount = 0;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services