You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/03 20:57:43 UTC
[drill] branch master updated: DRILL-6576: Unnest reports incoming
record counts incorrectly
This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 62aadda DRILL-6576: Unnest reports incoming record counts incorrectly
62aadda is described below
commit 62aadda83c68cb3ec06af38336df969958620aa0
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Thu Jun 7 13:30:01 2018 -0700
DRILL-6576: Unnest reports incoming record counts incorrectly
This closes #1362
---
.../exec/physical/impl/unnest/UnnestImpl.java | 2 +-
.../physical/impl/unnest/UnnestRecordBatch.java | 41 +++++++++++-----------
2 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 06713a5..ffc64f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -103,7 +103,7 @@ public class UnnestImpl implements Unnest {
innerValueIndex += count;
return count;
- }
+ }
@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e985c4d..3ef547c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -199,6 +199,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
// current field metadata for check in subsequent iterations
setupNewSchema();
+ stats.batchReceived(0, incoming.getRecordCount(), true);
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
@@ -207,32 +208,28 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
} finally {
stats.stopSetup();
}
- // since we never called next on an upstream operator, incoming stats are
- // not updated. update input stats explicitly.
- stats.batchReceived(0, incoming.getRecordCount(), true);
return IterOutcome.OK_NEW_SCHEMA;
} else {
- assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
container.zeroVectors();
-
// Check if schema has changed
- if (lateral.getRecordIndex() == 0 && schemaChanged()) {
- hasRemainder = true; // next call to next will handle the actual data.
- try {
- setupNewSchema();
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
- stats.batchReceived(0, incoming.getRecordCount(), true);
- return OK_NEW_SCHEMA;
- }
if (lateral.getRecordIndex() == 0) {
+ boolean isNewSchema = schemaChanged();
+ stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+ if (isNewSchema) {
+ hasRemainder = true; // next call to next will handle the actual data.
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
+ return OK_NEW_SCHEMA;
+ }
+ // else
unnest.resetGroupIndex();
}
- stats.batchReceived(0, incoming.getRecordCount(), false);
return doWork();
}
@@ -243,7 +240,8 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return this.container;
}
- @SuppressWarnings("resource") private void setUnnestVector() {
+ @SuppressWarnings("resource")
+ private void setUnnestVector() {
final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
final RepeatedValueVector vector;
@@ -347,7 +345,8 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return tp;
}
- @Override protected boolean setupNewSchema() throws SchemaChangeException {
+ @Override
+ protected boolean setupNewSchema() throws SchemaChangeException {
Preconditions.checkNotNull(lateral);
container.clear();
recordCount = 0;