You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/03/08 02:55:17 UTC
[8/8] drill git commit: DRILL-6177: Merge Join - Allocate memory for
outgoing value vectors based on sizes of incoming batches.
DRILL-6177: Merge Join - Allocate memory for outgoing value vectors based on sizes of incoming batches.
closes #1125
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/766315ea
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/766315ea
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/766315ea
Branch: refs/heads/master
Commit: 766315ea17377199897d685ab801edd38394fe01
Parents: 31e0f29
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Tue Mar 6 16:09:43 2018 -0800
Committer: Ben-Zvi <bb...@mapr.com>
Committed: Wed Mar 7 15:42:11 2018 -0800
----------------------------------------------------------------------
.../exec/physical/impl/join/MergeJoinBatch.java | 23 ++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/766315ea/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index f612ae2..2155f0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -57,7 +57,6 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager;
-import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -114,6 +113,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private int leftRowWidth;
private int rightRowWidth;
+ private RecordBatchSizer leftSizer;
+ private RecordBatchSizer rightSizer;
+
/**
* mergejoin operates on one record at a time from the left and right batches
* using RecordIterator abstraction. We have a callback mechanism to get notified
@@ -126,11 +128,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
public void update(int inputIndex) {
switch(inputIndex) {
case 0:
- final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
+ leftSizer = new RecordBatchSizer(left);
leftRowWidth = leftSizer.netRowWidth();
break;
case 1:
- final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
+ rightSizer = new RecordBatchSizer(right);
rightRowWidth = rightSizer.netRowWidth();
default:
break;
@@ -158,6 +160,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
status.setTargetOutputRowCount(status.getOutPosition() + numOutputRowsRemaining);
setOutgoingRowWidth(newOutgoingRowWidth);
}
+
+ @Override
+ public RecordBatchSizer.ColumnSize getColumnSize(String name) {
+ if (leftSizer != null && leftSizer.getColumn(name) != null) {
+ return leftSizer.getColumn(name);
+ }
+ return rightSizer == null ? null : rightSizer.getColumn(name);
+ }
}
private final MergeJoinMemoryManager mergeJoinMemoryManager = new MergeJoinMemoryManager();
@@ -492,8 +502,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
container.zeroVectors();
}
+
+ // Allocate memory for the vectors.
+ // This will iteratively allocate memory for all nested columns underneath.
+ int outputRowCount = mergeJoinMemoryManager.getOutputRowCount();
for (VectorWrapper w : container) {
- AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
+ RecordBatchSizer.ColumnSize colSize = mergeJoinMemoryManager.getColumnSize(w.getField().getName());
+ colSize.allocateVector(w.getValueVector(), outputRowCount);
}
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);