You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/08/02 22:23:20 UTC
[drill] branch master updated: DRILL-6657: Unnest reports one batch
less than the actual number of batches
This is an automated email from the ASF dual-hosted git repository.
parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 419f51e DRILL-6657: Unnest reports one batch less than the actual number of batches
419f51e is described below
commit 419f51e57b389e39a0c3c090ae0e8d34e1fb944c
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Tue Jul 31 15:41:52 2018 -0700
DRILL-6657: Unnest reports one batch less than the actual number of batches
---
.../exec/physical/impl/unnest/UnnestRecordBatch.java | 20 ++++++++------------
1 file changed, 8 insertions(+), 12 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index e89144d..6204d37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -55,9 +55,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
private IntVector rowIdVector; // vector to keep the implicit rowId column in
private Unnest unnest = new UnnestImpl();
- private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
- // sent. The next iteration, we need to make sure the record batch sizer
- // is updated before we process the actual data.
private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
// to keep processing it. Kill may be called by a limit in a subquery that
// requires us to stop processing thecurrent row, but not stop processing
@@ -164,7 +161,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
Preconditions.checkState(context.getExecutorState().isFailed() ||
lateral.getLeftOutcome() == IterOutcome.STOP, "Kill received by unnest with unexpected state. " +
"Neither the LateralOutcome is STOP nor executor state is failed");
- logger.debug("Kill received. Stopping all processing");
+ logger.debug("Kill received. Stopping all processing");
state = BatchState.DONE;
recordCount = 0;
hasRemainder = false; // whatever the case, we need to stop processing the current row.
@@ -180,12 +177,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return IterOutcome.NONE;
}
- if (hasNewSchema) {
- memoryManager.update();
- hasNewSchema = false;
- return doWork();
- }
-
if (hasRemainder) {
return doWork();
}
@@ -200,12 +191,13 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
state = BatchState.NOT_FIRST;
try {
stats.startSetup();
- hasNewSchema = true; // next call to next will handle the actual data.
logger.debug("First batch received");
schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
// current field metadata for check in subsequent iterations
setupNewSchema();
stats.batchReceived(0, incoming.getRecordCount(), true);
+ memoryManager.update();
+ hasRemainder = incoming.getRecordCount() > 0;
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
@@ -216,14 +208,18 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
}
return IterOutcome.OK_NEW_SCHEMA;
} else {
+ Preconditions.checkState(incoming.getRecordCount() > 0,
+ "Incoming batch post buildSchema phase should never be empty for Unnest");
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
- hasNewSchema = schemaChanged();
+ boolean hasNewSchema = schemaChanged();
stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
if (hasNewSchema) {
try {
setupNewSchema();
+ hasRemainder = true;
+ memoryManager.update();
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);