You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/09 12:17:29 UTC

[drill] 03/04: DRILL-6549: batch sizing for nested loop join

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 25e65c0fe2aed741b143d69c0ffbe23a3433bedc
Author: Padma Penumarthy <pp...@yahoo.com>
AuthorDate: Thu Jun 28 03:50:36 2018 -0700

    DRILL-6549: batch sizing for nested loop join
    
    closes #1363
---
 .../exec/physical/impl/join/NestedLoopJoin.java    |   3 +
 .../physical/impl/join/NestedLoopJoinBatch.java    |  62 +++-
 .../physical/impl/join/NestedLoopJoinTemplate.java |  24 +-
 .../exec/physical/unit/TestOutputBatchSize.java    | 342 ++++++++++++++++++++-
 4 files changed, 407 insertions(+), 24 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
index f7d96ad..725c46d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoin.java
@@ -37,6 +37,9 @@ public interface NestedLoopJoin {
                                   ExpandableHyperContainer rightContainer,
                                   LinkedList<Integer> rightCounts,
                                   NestedLoopJoinBatch outgoing);
+
+  void setTargetOutputCount(int targetOutputCount);
+
   // Produce output records taking into account join type
   public int outputRecords(JoinRelType joinType);
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index ae14fb3..e2532e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 
@@ -29,6 +30,7 @@ import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -50,8 +52,8 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.AllocationHelper;
-
+import org.apache.drill.exec.record.JoinBatchMemoryManager;
+import org.apache.drill.exec.record.RecordBatchSizer;
 import com.google.common.base.Preconditions;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
@@ -65,9 +67,6 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
 
-  // Maximum number records in the outgoing batch
-  protected static final int MAX_BATCH_SIZE = 4096;
-
   // Input indexes to correctly update the stats
   protected static final int LEFT_INPUT = 0;
   protected static final int RIGHT_INPUT = 1;
@@ -130,6 +129,11 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     super(popConfig, context, left, right);
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
+
+    // get the output batch size from config.
+    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   /**
@@ -162,6 +166,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
             }
             // fall through
           case OK:
+            // For right side, use aggregate i.e. average row width across batches
+            batchMemoryManager.update(RIGHT_INDEX, 0, true);
+            logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
             addBatchToHyperContainer(right);
             break;
           case OUT_OF_MEMORY:
@@ -179,7 +186,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     }
 
     // allocate space for the outgoing batch
-    allocateVectors();
+    batchMemoryManager.allocateVectors(container);
+
+    nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
 
     // invoke the runtime generated method to emit records in the output batch
     outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
@@ -193,6 +202,10 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
     container.setRecordCount(outputRecords);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+    }
+
     logger.debug("Number of records emitted: " + outputRecords);
 
     return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -332,15 +345,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
   }
 
   /**
-   * Simple method to allocate space for all the vectors in the container.
-   */
-  private void allocateVectors() {
-    for (final VectorWrapper<?> vw : container) {
-      AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
-    }
-  }
-
-  /**
    * Builds the output container's schema. Goes over the left and the right
    * batch and adds the corresponding vectors to the output container.
    * @throws SchemaChangeException if batch schema was changed during execution
@@ -352,6 +356,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         return;
       }
 
+      batchMemoryManager.update(RIGHT_INDEX, 0, true);
+      logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+
       if (leftUpstream != IterOutcome.NONE) {
         leftSchema = left.getSchema();
         for (final VectorWrapper<?> vw : left) {
@@ -380,7 +387,6 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         addBatchToHyperContainer(right);
       }
 
-      allocateVectors();
       nljWorker = setupWorker();
 
       // if left batch is empty, fetch next
@@ -388,7 +394,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
         leftUpstream = next(LEFT_INPUT, left);
       }
 
-      container.setRecordCount(0);
+      batchMemoryManager.update(LEFT_INDEX, 0);
+      logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+
       container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
 
     } catch (ClassTransformationException | IOException e) {
@@ -412,6 +420,26 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
 
   @Override
   public void close() {
+    updateBatchMemoryManagerStats();
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+              batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+              batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
+              batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+              batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    }
+
     rightContainer.clear();
     rightCounts.clear();
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index cdd02f4..adf681b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -29,12 +29,16 @@ import javax.inject.Named;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
+
 /*
  * Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
  * class contains the main nested loop join logic.
  */
 public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
 
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
+
   // Current left input batch being processed
   private RecordBatch left = null;
 
@@ -50,6 +54,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
   // Iteration status tracker
   private IterationStatusTracker tracker = new IterationStatusTracker();
 
+  private int targetOutputRecords;
+
   /**
    * Method initializes necessary state and invokes the doSetup() to set the
    * input and output value vector references.
@@ -69,10 +75,14 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
     this.leftRecordCount = left.getRecordCount();
     this.rightCounts = rightCounts;
     this.outgoing = outgoing;
-
     doSetup(context, rightContainer, left, outgoing);
   }
 
+  @Override
+  public void setTargetOutputCount(int targetOutputRecords) {
+    this.targetOutputRecords = targetOutputRecords;
+  }
+
   /**
    * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
    * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
@@ -84,11 +94,11 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
     int outputIndex = 0;
     while (leftRecordCount != 0) {
       outputIndex = populateOutgoingBatch(joinType, outputIndex);
-      if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+      if (outputIndex >= targetOutputRecords) {
         break;
       }
       // reset state and get next left batch
-      resetAndGetNextLeft();
+      resetAndGetNextLeft(outputIndex);
     }
     return outputIndex;
   }
@@ -128,7 +138,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
             outputIndex++;
             rightRecordMatched = true;
 
-            if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+            if (outputIndex >= targetOutputRecords) {
               nextRightRecordToProcess++;
 
               // no more space left in the batch, stop processing
@@ -143,7 +153,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
         // project records from the left side only, records from right will be null
         emitLeft(nextLeftRecordToProcess, outputIndex);
         outputIndex++;
-        if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
+        if (outputIndex >= targetOutputRecords) {
           nextLeftRecordToProcess++;
 
           // no more space left in the batch, stop processing
@@ -165,7 +175,7 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
    * Resets some internal state which indicates the next records to process in the left and right batches,
    * also fetches the next left input batch.
    */
-  private void resetAndGetNextLeft() {
+  private void resetAndGetNextLeft(int outputIndex) {
     for (VectorWrapper<?> vw : left) {
       vw.getValueVector().clear();
     }
@@ -181,6 +191,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
         leftRecordCount = 0;
         break;
       case OK:
+        setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
+        logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
         leftRecordCount = left.getRecordCount();
         break;
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
index 471f1b8..84a4fbc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestOutputBatchSize.java
@@ -17,18 +17,23 @@
  */
 package org.apache.drill.exec.physical.unit;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.directory.api.util.Strings;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
-
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.exec.physical.base.AbstractBase;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.FlattenPOP;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.physical.config.HashJoinPOP;
 import org.apache.drill.exec.physical.config.MergeJoinPOP;
+import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.physical.config.UnionAll;
 import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -2288,6 +2293,341 @@ public class TestOutputBatchSize extends PhysicalOpUnitTestBase {
   }
 
   @Test
+  public void testNestedLoopJoinMultipleOutputBatches() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+    mockOpContext(nestedLoopJoin, initReservation, maxAllocation);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(4)  // verify number of batches
+            .expectedBatchSize(totalSize / 2) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
+  @Test
+  public void testNestedLoopJoinSingleOutputBatch() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    // create multiple batches from both sides.
+    numRows = 4096 * 2;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to twice of total size expected.
+    // We should get 1 batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize*2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(1)  // verify number of batches
+            .expectedBatchSize(totalSize) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testNestedLoopJoinUpperLimit() throws Exception {
+    // test the upper limit of 65535 records per batch.
+    LogicalExpression functionCallExpr = new FunctionCall("<",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    numRows = 500;
+
+    // create left input rows like this.
+    // "a1" : 5,  "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5,  "c1" : 1, "a2":6,  "c2": 1
+    // "a1" : 5,  "c1" : 2, "a2":6,  "c2": 2
+    // "a1" : 5,  "c1" : 3, "a2":6,  "c2": 3
+
+    // we expect n(n+1)/2 number of records i.e. (500 * 501)/2 = 125250
+    // expect two batches, batch limited by 65535 records
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "c1", "a2", "c2")
+            .expectedNumBatches(2)  // verify number of batches
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      for (long j = i+1; j < numRows+1; j++) {
+        opTestBuilder.baselineValues(5l, i, 6l, j);
+      }
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testNestedLoopJoinLowerLimit() throws Exception {
+    // test the lower limit of at least one batch
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.INNER, functionCallExpr);
+
+    numRows = 10;
+
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+
+    // set very low value of output batch size so we can do only one row per batch.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", 128);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(10)  // verify number of batches
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows + 1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+  }
+
+  @Test
+  public void testLeftNestedLoopJoin() throws Exception {
+    LogicalExpression functionCallExpr = new FunctionCall("equal",
+            ImmutableList.of((LogicalExpression) new FieldReference("c1", ExpressionPosition.UNKNOWN),
+                    (LogicalExpression) new FieldReference("c2", ExpressionPosition.UNKNOWN)),
+            ExpressionPosition.UNKNOWN);
+
+    NestedLoopJoinPOP nestedLoopJoin = new NestedLoopJoinPOP(null, null, JoinRelType.LEFT, functionCallExpr);
+
+    numRows = 4000 * 2;
+    // create left input rows like this.
+    // "a1" : 5, "b1" : wideString, "c1" : <id>
+    List<String> leftJsonBatches = Lists.newArrayList();
+    StringBuilder leftBatchString = new StringBuilder();
+    leftBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i + "},");
+    }
+    leftBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows + "}");
+    leftBatchString.append("]");
+
+    leftJsonBatches.add(leftBatchString.toString());
+
+    // create right input rows like this.
+    // "a2" : 6, "b2" : wideString, "c2" : <id>
+    List<String> rightJsonBatches = Lists.newArrayList();
+    StringBuilder rightBatchString = new StringBuilder();
+    rightBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    rightBatchString.append("{\"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    rightBatchString.append("]");
+    rightJsonBatches.add(rightBatchString.toString());
+
+    // output rows will be like this.
+    // "a1" : 5, "b1" : wideString, "c1" : 1, "a2":6, "b2" : wideString, "c2": 1
+    // "a1" : 5, "b1" : wideString, "c1" : 2, "a2":6, "b2" : wideString, "c2": 2
+    // "a1" : 5, "b1" : wideString, "c1" : 3, "a2":6, "b2" : wideString, "c2": 3
+    List<String> expectedJsonBatches = Lists.newArrayList();
+    StringBuilder expectedBatchString = new StringBuilder();
+    expectedBatchString.append("[");
+    for (int i = 0; i < numRows; i++) {
+      expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + i);
+      expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + i + "},");
+    }
+    expectedBatchString.append("{\"a1\": 5, " + "\"b1\" : " + "\"" + wideString + "\"," + "\"c1\" : " + numRows);
+    expectedBatchString.append(", \"a2\": 6, " + "\"b2\" : " + "\"" + wideString + "\"," + "\"c2\" : " + numRows + "}");
+    expectedBatchString.append("]");
+    expectedJsonBatches.add(expectedBatchString.toString());
+
+    long totalSize = getExpectedSize(expectedJsonBatches);
+
+    // set the output batch size to 1/2 of total size expected.
+    // We will get approximately 4 batches.
+    fragContext.getOptions().setLocalOption("drill.exec.memory.operator.output_batch_size", totalSize/2);
+
+    OperatorTestBuilder opTestBuilder = opTestBuilder()
+            .physicalOperator(nestedLoopJoin)
+            .baselineColumns("a1", "b1", "c1", "a2", "b2", "c2")
+            .expectedNumBatches(4)  // verify number of batches
+            .expectedBatchSize(totalSize / 2) // verify batch size
+            .inputDataStreamsJson(Lists.newArrayList(leftJsonBatches, rightJsonBatches));
+
+    for (long i = 0; i < numRows+1; i++) {
+      opTestBuilder.baselineValues(5l, wideString, i, 6l, wideString, i);
+    }
+
+    opTestBuilder.go();
+
+  }
+
+  @Test
   public void testSizerRepeatedList() throws Exception {
     List<String> inputJsonBatches = Lists.newArrayList();
     StringBuilder batchString = new StringBuilder();