You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/03 20:58:01 UTC
[GitHub] parthchandra closed pull request #1362: DRILL-6576: Unnest reports
incoming record counts incorrectly
parthchandra closed pull request #1362: DRILL-6576: Unnest reports incoming record counts incorrectly
URL: https://github.com/apache/drill/pull/1362
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 06713a5164..ffc64f9237 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -103,7 +103,7 @@ public final int unnestRecords(final int recordCount) {
innerValueIndex += count;
return count;
- }
+ }
@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e985c4defe..bc01a70477 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -199,6 +199,7 @@ public IterOutcome innerNext() {
schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
// current field metadata for check in subsequent iterations
setupNewSchema();
+ stats.batchReceived(0, incoming.getRecordCount(), true);
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
@@ -207,32 +208,30 @@ public IterOutcome innerNext() {
} finally {
stats.stopSetup();
}
- // since we never called next on an upstream operator, incoming stats are
- // not updated. update input stats explicitly.
- stats.batchReceived(0, incoming.getRecordCount(), true);
return IterOutcome.OK_NEW_SCHEMA;
} else {
assert state != BatchState.FIRST : "First batch should be OK_NEW_SCHEMA";
container.zeroVectors();
-
// Check if schema has changed
- if (lateral.getRecordIndex() == 0 && schemaChanged()) {
- hasRemainder = true; // next call to next will handle the actual data.
- try {
- setupNewSchema();
- } catch (SchemaChangeException ex) {
- kill(false);
- logger.error("Failure during query", ex);
- context.getExecutorState().fail(ex);
- return IterOutcome.STOP;
- }
- stats.batchReceived(0, incoming.getRecordCount(), true);
- return OK_NEW_SCHEMA;
- }
if (lateral.getRecordIndex() == 0) {
- unnest.resetGroupIndex();
+ boolean isNewSchema = schemaChanged();
+ if (isNewSchema) {
+ hasRemainder = true; // next call to next will handle the actual data.
+ stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException ex) {
+ kill(false);
+ logger.error("Failure during query", ex);
+ context.getExecutorState().fail(ex);
+ return IterOutcome.STOP;
+ }
+ return OK_NEW_SCHEMA;
+ } else {
+ unnest.resetGroupIndex();
+ stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
+ }
}
- stats.batchReceived(0, incoming.getRecordCount(), false);
return doWork();
}
@@ -243,7 +242,8 @@ public VectorContainer getOutgoingContainer() {
return this.container;
}
- @SuppressWarnings("resource") private void setUnnestVector() {
+ @SuppressWarnings("resource")
+ private void setUnnestVector() {
final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
final RepeatedValueVector vector;
@@ -347,7 +347,8 @@ protected IterOutcome doWork() {
return tp;
}
- @Override protected boolean setupNewSchema() throws SchemaChangeException {
+ @Override
+ protected boolean setupNewSchema() throws SchemaChangeException {
Preconditions.checkNotNull(lateral);
container.clear();
recordCount = 0;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services