You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/09 12:17:29 UTC
[drill] 03/04: DRILL-6549: batch sizing for nested loop join
This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 25e65c0fe2aed741b143d69c0ffbe23a3433bedc
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Thu Jun 28 03:50:36 2018 -0700
DRILL-6549: batch sizing for nested loop join
closes #1363
---
.../exec/physical/impl/join/NestedLoopJoin.java | 3 +
.../physical/impl/join/NestedLoopJoinBatch.java | 62 +++-
.../physical/impl/join/NestedLoopJoinTemplate.java | 24 +-
.../exec/physical/unit/TestOutputBatchSize.java | 342 ++++++++++++++++++++-
4 files changed, 407 insertions(+), 24 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index f7d96ad..725c46d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -37,6 +37,9 @@ public interface NestedLoopJoin {
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing);
+
+ void setTargetOutputCount(int targetOutputCount);
+
// Produce output records taking into account join type
public int outputRecords(JoinRelType joinType);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index ae14fb3..e2532e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
@@ -29,6 +30,7 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -50,8 +52,8 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.AllocationHelper;
-
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
import com.google.common.base.Preconditions;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
@@ -65,9 +67,6 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
- // Maximum number records in the outgoing batch
- protected static final int MAX_BATCH_SIZE = 4096;
-
// Input indexes to correctly update the stats
protected static final int LEFT_INPUT = 0;
protected static final int RIGHT_INPUT = 1;
@@ -130,6 +129,11 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
super(popConfig, context, left, right);
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);
+
+ // get the output batch size from config.
+ int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
+ logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}
/**
@@ -162,6 +166,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
}
// fall through
case OK:
+ // For right side, use aggregate i.e. average row width across batches
+ batchMemoryManager.update(RIGHT_INDEX, 0, true);
+ logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
addBatchToHyperContainer(right);
break;
case OUT_OF_MEMORY:
@@ -179,7 +186,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
}
// allocate space for the outgoing batch
- allocateVectors();
+ batchMemoryManager.allocateVectors(container);
+
+ nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
// invoke the runtime generated method to emit records in the output batch
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
@@ -193,6 +202,10 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
container.setRecordCount(outputRecords);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+ }
+
logger.debug("Number of records emitted: " + outputRecords);
return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -332,15 +345,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
}
/**
- * Simple method to allocate space for all the vectors in the container.
- */
- private void allocateVectors() {
- for (final VectorWrapper<?> vw : container) {
- AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
- }
- }
-
- /**
* Builds the output container's schema. Goes over the left and the right
* batch and adds the corresponding vectors to the output container.
* @throws SchemaChangeException if batch schema was changed during execution
@@ -352,6 +356,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
return;
}
+ batchMemoryManager.update(RIGHT_INDEX, 0, true);
+ logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+
if (leftUpstream != IterOutcome.NONE) {
leftSchema = left.getSchema();
for (final VectorWrapper<?> vw : left) {
@@ -380,7 +387,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
addBatchToHyperContainer(right);
}
- allocateVectors();
nljWorker = setupWorker();
// if left batch is empty, fetch next
@@ -388,7 +394,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
leftUpstream = next(LEFT_INPUT, left);
}
- container.setRecordCount(0);
+ batchMemoryManager.update(LEFT_INDEX, 0);
+ logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
} catch (ClassTransformationException | IOException e) {
@@ -412,6 +420,26 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
@Override
public void close() {
+ updateBatchMemoryManagerStats();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
+ batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+ }
+
rightContainer.clear();
rightCounts.clear();
super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index cdd02f4..adf681b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -29,12 +29,16 @@ import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+
/*
* Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
* class contains the main nested loop join logic.
*/
public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
+
// Current left input batch being processed
private RecordBatch left = null;
@@ -50,6 +54,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// Iteration status tracker
private IterationStatusTracker tracker = new IterationStatusTracker();
+ private int targetOutputRecords;
+
/**
* Method initializes necessary state and invokes the doSetup() to set the
* input and output value vector references.
@@ -69,10 +75,14 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
this.leftRecordCount = left.getRecordCount();
this.rightCounts = rightCounts;
this.outgoing = outgoing;
-
doSetup(context, rightContainer, left, outgoing);
}
+ @Override
+ public void setTargetOutputCount(int targetOutputRecords) {
+ this.targetOutputRecords = targetOutputRecords;
+ }
+
/**
* Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
* controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
@@ -84,11 +94,11 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
int outputIndex = 0;
while (leftRecordCount != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);
- if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+ if (outputIndex >= targetOutputRecords) {
break;
}
// reset state and get next left batch
- resetAndGetNextLeft();
+ resetAndGetNextLeft(outputIndex);
}
return outputIndex;
}
@@ -128,7 +138,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
outputIndex++;
rightRecordMatched = true;
- if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+ if (outputIndex >= targetOutputRecords) {
nextRightRecordToProcess++;
// no more space left in the batch, stop processing
@@ -143,7 +153,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// project records from the left side only, records from right will be null
emitLeft(nextLeftRecordToProcess, outputIndex);
outputIndex++;
- if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+ if (outputIndex >= targetOutputRecords) {
nextLeftRecordToProcess++;
// no more space left in the batch, stop processing
@@ -165,7 +175,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
* Resets some internal state which indicates the next records to process in the left and right batches,
* also fetches the next left input batch.
*/
- private void resetAndGetNextLeft() {
+ private void resetAndGetNextLeft(int outputIndex) {
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
@@ -181,6 +191,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
leftRecordCount = 0;
break;
case OK:
+ setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
+ logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
leftRecordCount = left.getRecordCount();
break;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 471f1b8..84a4fbc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -17,18 +17,23 @@
*/
package org.apache.drill.exec.physical.unit;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.directory.api.util.Strings;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
-
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -2288,6 +2293,341 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
}
@Test
+ public void testNestedLoopJoinMultipleOutputBatches() throws Exception {
+ LogicalExpression functionCallExpr = new FunctionCall("equal",
+ ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+ (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+ ExpressionPosition.UNKNOWN);
+
+ NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+ mockOpContext(nestedLoopJoin, initReservation, maxAllocation);
+
+ numRows = 4000 * 2;
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a2" : 6, "b2" : wideString, "c2" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(nestedLoopJoin)
+ .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2) // verify batch size
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+ for (long i = 0; i < numRows+1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+ }
+
+ opTestBuilder.go();
+
+ }
+
+ @Test
+ public void testNestedLoopJoinSingleOutputBatch() throws Exception {
+ LogicalExpression functionCallExpr = new FunctionCall("equal",
+ ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+ (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+ ExpressionPosition.UNKNOWN);
+
+ NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+ // create multiple batches from both sides.
+ numRows = 4096 * 2;
+
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a2" : 6, "b2" : wideString, "c2" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to twice of total size expected.
+ // We should get 1 batch.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(nestedLoopJoin)
+ .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+ .expectedNumBatches(1) // verify number of batches
+ .expectedBatchSize(totalSize) // verify batch size
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testNestedLoopJoinUpperLimit() throws Exception {
+ // test the upper limit of 65535 records per batch.
+ LogicalExpression functionCallExpr = new FunctionCall("<",
+ ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+ (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+ ExpressionPosition.UNKNOWN);
+
+ NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+ numRows = 500;
+
+ // create left input rows like this.
+ // "a1" : 5, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a2" : 6, "c2" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "c1" : 1, "a2":6, "c2": 1
+ // "a1" : 5, "c1" : 2, "a2":6, "c2": 2
+ // "a1" : 5, "c1" : 3, "a2":6, "c2": 3
+
+ // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
+ // expect two batches, batch limited by 65535 records
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(nestedLoopJoin)
+ .baselineColumns("a1", "c1", "a2", "c2")
+ .expectedNumBatches(2) // verify number of batches
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+ for (long i = 0; i < numRows+1; i++) {
+ for (long j = i+1; j < numRows+1; j++) {
+ opTestBuilder.baselineValues(5l, i, 6l, j);
+ }
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testNestedLoopJoinLowerLimit() throws Exception {
+ // test the lower limit of at least one batch
+ LogicalExpression functionCallExpr = new FunctionCall("equal",
+ ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+ (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+ ExpressionPosition.UNKNOWN);
+
+ NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+ numRows = 10;
+
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a2" : 6, "b2" : wideString, "c2" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+
+ // set very low value of output batch size so we can do only one row per batch.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(nestedLoopJoin)
+ .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+ .expectedNumBatches(10) // verify number of batches
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+ for (long i = 0; i < numRows + 1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+ }
+
+ opTestBuilder.go();
+ }
+
+ @Test
+ public void testLeftNestedLoopJoin() throws Exception {
+ LogicalExpression functionCallExpr = new FunctionCall("equal",
+ ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+ (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+ ExpressionPosition.UNKNOWN);
+
+ NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.LEFT, functionCallExpr);
+
+ numRows = 4000 * 2;
+ // create left input rows like this.
+ // "a1" : 5, "b1" : wideString, "c1" : <id>
+ List<String> leftJsonBatches = Lists.newArrayList();
+ StringBuilder leftBatchString = new StringBuilder();
+ leftBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+ }
+ leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+ leftBatchString.append("]");
+
+ leftJsonBatches.add(leftBatchString.toString());
+
+ // create right input rows like this.
+ // "a2" : 6, "b2" : wideString, "c2" : <id>
+ List<String> rightJsonBatches = Lists.newArrayList();
+ StringBuilder rightBatchString = new StringBuilder();
+ rightBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ rightBatchString.append("]");
+ rightJsonBatches.add(rightBatchString.toString());
+
+ // output rows will be like this.
+ // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+ // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+ // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+ List<String> expectedJsonBatches = Lists.newArrayList();
+ StringBuilder expectedBatchString = new StringBuilder();
+ expectedBatchString.append("[");
+ for (int i = 0; i < numRows; i++) {
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+ }
+ expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+ expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+ expectedBatchString.append("]");
+ expectedJsonBatches.add(expectedBatchString.toString());
+
+ long totalSize = getExpectedSize(expectedJsonBatches);
+
+ // set the output batch size to 1/2 of total size expected.
+ // We will get approximately 4 batches.
+ fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+ OperatorTestBuilder opTestBuilder = opTestBuilder()
+ .physicalOperator(nestedLoopJoin)
+ .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+ .expectedNumBatches(4) // verify number of batches
+ .expectedBatchSize(totalSize / 2) // verify batch size
+ .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+ for (long i = 0; i < numRows+1; i++) {
+ opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+ }
+
+ opTestBuilder.go();
+
+ }
+
+ @Test
public void testSizerRepeatedList() throws Exception {
List<String> inputJsonBatches = Lists.newArrayList();
StringBuilder batchString = new StringBuilder();