You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/09 12:17:52 UTC

[GitHub] asfgit closed pull request #1363: DRILL-6549: batch sizing for nested loop join

asfgit closed pull request #1363: DRILL-6549: batch sizing for nested loop join
URL: https://github.com/apache/drill/pull/1363
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index f7d96ad8c2e..725c46d1ba8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -37,6 +37,9 @@ public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing);
+
+  void setTargetOutputCount(int targetOutputCount);
+
   // Produce output records taking into account join type
   public int outputRecords(JoinRelType joinType);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index ae14fb3ec90..e2532e8ed2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 
@@ -29,6 +30,7 @@
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -50,8 +52,8 @@
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.AllocationHelper;
-
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -65,9 +67,6 @@
 public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
 
-  // Maximum number records in the outgoing batch
-  protected static final int MAX_BATCH_SIZE = 4096;
-
   // Input indexes to correctly update the stats
   protected static final int LEFT_INPUT = 0;
   protected static final int RIGHT_INPUT = 1;
@@ -130,6 +129,11 @@ protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext conte
     super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
+
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   /**
@@ -162,6 +166,9 @@ public IterOutcome innerNext() {
             }
             // fall through
           case OK:
+            // For right side, use aggregate i.e. average row width across batches
+            batchMemoryManager.update(RIGHT_INDEX, 0, true);
+            logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
             addBatchToHyperContainer(right);
             break;
           case OUT_OF_MEMORY:
@@ -179,7 +186,9 @@ public IterOutcome innerNext() {
     }
 
     // allocate space for the outgoing batch
-    allocateVectors();
+    batchMemoryManager.allocateVectors(container);
+
+    nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
     // invoke the runtime generated method to emit records in the output batch
     outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
@@ -193,6 +202,10 @@ public IterOutcome innerNext() {
     container.setRecordCount(outputRecords);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    }
+
     logger.debug("Number of records emitted: " + outputRecords);
 
     return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -331,15 +344,6 @@ private NestedLoopJoin setupWorker() throws IOException, ClassTransformationExce
     return context.getImplementationClass(nLJCodeGenerator);
   }
 
-  /**
-   * Simple method to allocate space for all the vectors in the container.
-   */
-  private void allocateVectors() {
-    for (final VectorWrapper<?> vw : container) {
-      AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
-    }
-  }
-
   /**
    * Builds the output container's schema. Goes over the left and the right
    * batch and adds the corresponding vectors to the output container.
@@ -352,6 +356,9 @@ protected void buildSchema() throws SchemaChangeException {
         return;
       }
 
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
         for (final VectorWrapper<?> vw : left) {
@@ -380,7 +387,6 @@ protected void buildSchema() throws SchemaChangeException {
         addBatchToHyperContainer(right);
       }
 
-      allocateVectors();
       nljWorker = setupWorker();
 
       // if left batch is empty, fetch next
@@ -388,7 +394,9 @@ protected void buildSchema() throws SchemaChangeException {
         leftUpstream = next(LEFT_INPUT, left);
       }
 
-      container.setRecordCount(0);
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     } catch (ClassTransformationException | IOException e) {
@@ -412,6 +420,26 @@ private void addBatchToHyperContainer(RecordBatch inputBatch) {
 
   @Override
   public void close() {
+    updateBatchMemoryManagerStats();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+              batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    }
+
     rightContainer.clear();
     rightCounts.clear();
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index cdd02f42cbe..adf681b58e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -29,12 +29,16 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+
 /*
  * Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
  * class contains the main nested loop join logic.
  */
 public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
+
   // Current left input batch being processed
   private RecordBatch left = null;
 
@@ -50,6 +54,8 @@
   // Iteration status tracker
   private IterationStatusTracker tracker = new IterationStatusTracker();
 
+  private int targetOutputRecords;
+
   /**
    * Method initializes necessary state and invokes the doSetup() to set the
    * input and output value vector references.
@@ -69,10 +75,14 @@ public void setupNestedLoopJoin(FragmentContext context,
     this.leftRecordCount = left.getRecordCount();
     this.rightCounts = rightCounts;
     this.outgoing = outgoing;
-
     doSetup(context, rightContainer, left, outgoing);
   }
 
+  @Override
+  public void setTargetOutputCount(int targetOutputRecords) {
+    this.targetOutputRecords = targetOutputRecords;
+  }
+
   /**
    * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
    * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
@@ -84,11 +94,11 @@ public int outputRecords(JoinRelType joinType) {
     int outputIndex = 0;
     while (leftRecordCount != 0) {
       outputIndex = populateOutgoingBatch(joinType, outputIndex);
-      if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+      if (outputIndex >= targetOutputRecords) {
         break;
       }
       // reset state and get next left batch
-      resetAndGetNextLeft();
+      resetAndGetNextLeft(outputIndex);
     }
     return outputIndex;
   }
@@ -128,7 +138,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
             outputIndex++;
             rightRecordMatched = true;
 
-            if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+            if (outputIndex >= targetOutputRecords) {
               nextRightRecordToProcess++;
 
               // no more space left in the batch, stop processing
@@ -143,7 +153,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
         // project records from the left side only, records from right will be null
         emitLeft(nextLeftRecordToProcess, outputIndex);
         outputIndex++;
-        if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+        if (outputIndex >= targetOutputRecords) {
           nextLeftRecordToProcess++;
 
           // no more space left in the batch, stop processing
@@ -165,7 +175,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
    * Resets some internal state which indicates the next records to process in the left and right batches,
    * also fetches the next left input batch.
    */
-  private void resetAndGetNextLeft() {
+  private void resetAndGetNextLeft(int outputIndex) {
     for (VectorWrapper<?> vw : left) {
       vw.getValueVector().clear();
     }
@@ -181,6 +191,8 @@ private void resetAndGetNextLeft() {
         leftRecordCount = 0;
         break;
       case OK:
+        setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
+        logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
         leftRecordCount = left.getRecordCount();
         break;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 471f1b85db5..84a4fbc0430 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -17,18 +17,23 @@
  */
 package org.apache.drill.exec.physical.unit;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.directory.api.util.Strings;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -2287,6 +2292,341 @@ public void testHashAggMax() throws ExecutionSetupException {
     opTestBuilder.go();
   }
 
+  @Test
+  public void testNestedLoopJoinMultipleOutputBatches() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+    mockOpContext(nestedLoopJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(4)  // verify number of batches
+            .expectedBatchSize(totalSize / 2) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
+  @Test
+  public void testNestedLoopJoinSingleOutputBatch() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    // create multiple batches from both sides.
+    numRows = 4096 * 2;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to twice of total size expected.
+    // We should get 1 batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(1)  // verify number of batches
+            .expectedBatchSize(totalSize) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testNestedLoopJoinUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    LogicalExpression functionCallExpr = new FunctionCall("<",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    numRows = 500;
+
+    // create left input rows like this.
+    // "a1" : 5,  "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5,  "c1" : 1, "a2":6,  "c2": 1
+    // "a1" : 5,  "c1" : 2, "a2":6,  "c2": 2
+    // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
+
+    // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
+    // expect two batches, batch limited by 65535 records
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "c1", "a2", "c2")
+            .expectedNumBatches(2)  // verify number of batches
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      for (long j = i+1; j < numRows+1; j++) {
+        opTestBuilder.baselineValues(5l, i, 6l, j);
+      }
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testNestedLoopJoinLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    numRows = 10;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+
+    // set very low value of output batch size so we can do only one row per batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(10)  // verify number of batches
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testLeftNestedLoopJoin() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.LEFT, functionCallExpr);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(4)  // verify number of batches
+            .expectedBatchSize(totalSize / 2) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
   @Test
   public void testSizerRepeatedList() throws Exception {
     List<String> inputJsonBatches = Lists.newArrayList();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services