You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/07/13 03:44:59 UTC
[drill] 07/13: DRILL-6592: Unnest record batch size is called too
frequently
This is an automated email from the ASF dual-hosted git repository.
sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit cad9aad12ff18a9315b8cce971e27c1b32c48079
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Fri Jul 6 16:23:51 2018 -0700
DRILL-6592: Unnest record batch size is called too frequently
closes #1376
---
.../exec/physical/impl/unnest/UnnestRecordBatch.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index 3ef547c..9c1e702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -49,6 +49,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
private Unnest unnest;
+ private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
+ // sent. The next iteration, we need to make sure the record batch sizer
+ // is updated before we process the actual data.
private boolean hasRemainder = false; // set to true if there is data left over for the current row AND if we want
// to keep processing it. Kill may be called by a limit in a subquery that
// requires us to stop processing thecurrent row, but not stop processing
@@ -180,6 +183,12 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
return nextState;
}
+ if (hasNewSchema) {
+ memoryManager.update();
+ hasNewSchema = false;
+ return doWork();
+ }
+
if (hasRemainder) {
return doWork();
}
@@ -194,7 +203,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
state = BatchState.NOT_FIRST;
try {
stats.startSetup();
- hasRemainder = true; // next call to next will handle the actual data.
+ hasNewSchema = true; // next call to next will handle the actual data.
logger.debug("First batch received");
schemaChanged(); // checks if schema has changed (redundant in this case becaause it has) AND saves the
// current field metadata for check in subsequent iterations
@@ -213,10 +222,9 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
container.zeroVectors();
// Check if schema has changed
if (lateral.getRecordIndex() == 0) {
- boolean isNewSchema = schemaChanged();
- stats.batchReceived(0, incoming.getRecordCount(), isNewSchema);
- if (isNewSchema) {
- hasRemainder = true; // next call to next will handle the actual data.
+ hasNewSchema = schemaChanged();
+ stats.batchReceived(0, incoming.getRecordCount(), hasNewSchema);
+ if (hasNewSchema) {
try {
setupNewSchema();
} catch (SchemaChangeException ex) {
@@ -229,6 +237,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
}
// else
unnest.resetGroupIndex();
+ memoryManager.update();
}
return doWork();
}
@@ -265,7 +274,6 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
protected IterOutcome doWork() {
Preconditions.checkNotNull(lateral);
- memoryManager.update();
unnest.setOutputCount(memoryManager.getOutputRowCount());
final int incomingRecordCount = incoming.getRecordCount();
final int currentRecord = lateral.getRecordIndex();