You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/06/27 06:19:28 UTC

[drill] branch master updated: DRILL-6503: Performance improvements in lateral

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 779edf8  DRILL-6503: Performance improvements in lateral
779edf8 is described below

commit 779edf880a1e92608b68108f18e79eff6eb4afa5
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Tue Jun 26 23:19:25 2018 -0700

    DRILL-6503: Performance improvements in lateral
    
    closes #1328
---
 .../drill/exec/physical/config/LateralJoinPOP.java |   4 +
 .../exec/physical/impl/join/LateralJoinBatch.java  | 162 ++++++++++++++-------
 .../impl/join/TestLateralJoinCorrectness.java      |  16 +-
 .../unnest/TestUnnestWithLateralCorrectness.java   |   6 +-
 4 files changed, 121 insertions(+), 67 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index fab89a2..a12fed1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -43,6 +43,10 @@ public class LateralJoinPOP extends AbstractJoinPop {
       @JsonProperty("right") PhysicalOperator right,
       @JsonProperty("joinType") JoinRelType joinType) {
     super(left, right, joinType, null, null);
+    Preconditions.checkArgument(joinType != JoinRelType.FULL,
+      "Full outer join is currently not supported with Lateral Join");
+    Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
+      "Right join is currently not supported with Lateral Join");
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index a09913f..578cbc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -52,11 +52,6 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);
 
-  // Input indexes to correctly update the stats
-  private static final int LEFT_INPUT = 0;
-
-  private static final int RIGHT_INPUT = 1;
-
   // Maximum number records in the outgoing batch
   private int maxOutputRowCount;
 
@@ -81,8 +76,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   // Keep track if any matching right record was found for current left index record
   private boolean matchedRecordFound;
 
+  // Used only for testing
   private boolean useMemoryManager = true;
 
+  // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
+  private boolean isNewLeftBatch = false;
+
   /* ****************************************************************************************************************
    * Public Methods
    * ****************************************************************************************************************/
@@ -147,9 +146,16 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // Setup the references of left, right and outgoing container in generated operator
     state = BatchState.NOT_FIRST;
 
-    // Update the memory manager
-    updateMemoryManager(LEFT_INPUT);
-    updateMemoryManager(RIGHT_INPUT);
+    // Update the memory manager only if its a brand new incoming i.e. leftJoinIndex and rightJoinIndex is 0
+    // Otherwise there will be a case where while filling last output batch, some records from previous left or
+    // right batch are still left to be sent in output for which we will count this batch twice. The actual checks
+    // are done in updateMemoryManager
+    updateMemoryManager(LEFT_INDEX);
+
+    // We have to call update on memory manager for empty batches (rightJoinIndex = -1) as well since other wise while
+    // allocating memory for vectors below it can fail. Since in that case colSize will not have any info on right side
+    // vectors and throws NPE. The actual checks are done in updateMemoryManager
+    updateMemoryManager(RIGHT_INDEX);
 
     // allocate space for the outgoing batch
     allocateVectors();
@@ -161,21 +167,25 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   public void close() {
     updateBatchMemoryManagerStats();
 
-    logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
-      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
-    logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
-      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
-    logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, record count : {}",
-      batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
-      batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
+        "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+      logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
+        "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+        batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+      logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {},  avg row bytes : {}, " +
+        "record count : {}", batchMemoryManager.getNumOutgoingBatches(),
+        batchMemoryManager.getAvgOutputBatchSize(),
+        batchMemoryManager.getAvgOutputRowWidth(),
+        batchMemoryManager.getTotalOutputRecords());
+    }
 
     super.close();
   }
@@ -238,6 +248,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     boolean validBatch = setBatchState(leftUpstream);
 
     if (validBatch) {
+      isNewLeftBatch = true;
       rightUpstream = next(1, right);
       validBatch = setBatchState(rightUpstream);
     }
@@ -266,10 +277,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     }
     Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received");
 
-    // Update the record memory manager
-    updateMemoryManager(LEFT_INPUT);
-    updateMemoryManager(RIGHT_INPUT);
-
     // Setup output container schema based on known left and right schema
     setupNewSchema();
 
@@ -337,7 +344,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // If left batch is empty
     while (needLeftBatch) {
-      leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;
+
+      if (!processLeftBatchInFuture) {
+        leftUpstream = next(LEFT_INDEX, left);
+        isNewLeftBatch = true;
+      }
+
       final boolean emptyLeftBatch = left.getRecordCount() <=0;
       logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);
 
@@ -418,7 +430,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // will be a valid index. When all records are consumed it will be set to -1.
     boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
     while (needNewRightBatch) {
-      rightUpstream = next(RIGHT_INPUT, right);
+      rightUpstream = next(RIGHT_INDEX, right);
       switch (rightUpstream) {
         case OK_NEW_SCHEMA:
           // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
@@ -503,7 +515,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
         if (rightUpstream == EMIT) {
           if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
             // copy left side in case of LEFT join
-            emitLeft(leftJoinIndex, outputIndex++);
+            emitLeft(leftJoinIndex, outputIndex, 1);
+            ++outputIndex;
           }
           ++leftJoinIndex;
           // Reset matchedRecord for next left index record
@@ -557,7 +570,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
             }
 
             // Update the batch memory manager to use new left incoming batch
-            updateMemoryManager(LEFT_INPUT);
+            updateMemoryManager(LEFT_INDEX);
           }
         }
 
@@ -577,7 +590,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
         }
 
         // Update the batch memory manager to use new right incoming batch
-        updateMemoryManager(RIGHT_INPUT);
+        updateMemoryManager(RIGHT_INDEX);
       }
     } // output batch is full to its max capacity
 
@@ -615,9 +628,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     batchMemoryManager.updateOutgoingStats(outputIndex);
 
-    logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
-    logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex,
-      container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
+    if (logger.isDebugEnabled()) {
+      logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
+      logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
+        outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
+    }
 
     // Update the output index for next output batch to zero
     outputIndex = 0;
@@ -745,8 +760,6 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
    * this left index. The right container is copied starting from rightIndex until number of records in the container.
    */
   private void crossJoinAndOutputRecords() {
-    logger.trace("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {} and outputIndex: {}",
-      leftJoinIndex, rightJoinIndex, right.getRecordCount(), outputIndex);
     final int rightRecordCount = right.getRecordCount();
 
     // If there is no record in right batch just return current index in output batch
@@ -756,16 +769,30 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // Check if right batch is empty since we have to handle left join case
     Preconditions.checkState(rightJoinIndex != -1, "Right batch record count is >0 but index is -1");
-    // For every record in right side just emit left and right records in output container
-    for (int i = rightJoinIndex; i < rightRecordCount; ++i) {
-      emitLeft(leftJoinIndex, outputIndex);
-      emitRight(i, outputIndex);
-      ++outputIndex;
-
-      if (isOutgoingBatchFull()) {
-        break;
-      }
+
+    int currentOutIndex = outputIndex;
+    // Number of rows that can be copied in output batch
+    final int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;
+    // Number of rows that can be copied inside output batch is minimum of available slot in
+    // output batch and available data to copy from right side. It can be half consumed right batch
+    // which has few more rows to be copied to output but output batch has more to fill.
+    final int rowsToCopy = Math.min(maxAvailableRowSlot, (rightRecordCount - rightJoinIndex));
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and " +
+        "availableSlotInOutput: {}", leftJoinIndex, rightJoinIndex, rightRecordCount, outputIndex, maxAvailableRowSlot);
+      logger.debug("Output Batch stats before copying new data: {}", new RecordBatchSizer(this));
     }
+
+    // First copy all the left vectors data. Doing it in this way since it's the same data being copied over may be
+    // we will have performance gain from JVM
+    emitLeft(leftJoinIndex, currentOutIndex, rowsToCopy);
+
+    // Copy all the right side vectors data
+    emitRight(rightJoinIndex, currentOutIndex, rowsToCopy);
+
+    // Update outputIndex
+    outputIndex += rowsToCopy;
   }
 
   /**
@@ -779,9 +806,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
    * @param startVectorIndex - start index of vector inside source record batch
    * @param endVectorIndex - end index of vector inside source record batch
    * @param baseVectorIndex - base index to be added to startVectorIndex to get corresponding vector in outgoing batch
+   * @param numRowsToCopy - Number of rows to copy into output batch
+   * @param moveFromIndex - boolean to indicate if the fromIndex should also be increased or not. Since in case of
+   *                      copying data from left vector fromIndex is constant whereas in case of copying data from right
+   *                      vector fromIndex will move along with output index.
    */
   private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBatch batch,
-                                      int startVectorIndex, int endVectorIndex, int baseVectorIndex) {
+                                       int startVectorIndex, int endVectorIndex, int baseVectorIndex,
+                                       int numRowsToCopy, boolean moveFromIndex) {
     // Get the vectors using field index rather than Materialized field since input batch field can be different from
     // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
     // container.
@@ -796,10 +828,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
 
       logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
-          "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and BaseIndex: {}]",
-        fromRowIndex, inputValueClass, toRowIndex, outputValueClass, baseVectorIndex);
-      // Copy data from input vector to output vector
-      outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex);
+          "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
+          " NumBaseIndex: {}) ]",
+        fromRowIndex, inputValueClass, toRowIndex, outputValueClass, numRowsToCopy, baseVectorIndex);
+
+      // Copy data from input vector to output vector for numRowsToCopy times.
+      for (int j = 0; j < numRowsToCopy; ++j) {
+        outputVector.copyEntry(toRowIndex + j, inputVector, (moveFromIndex) ? fromRowIndex + j : fromRowIndex);
+      }
     }
   }
 
@@ -809,8 +845,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
    * @param leftIndex - index to copy data from left incoming batch vectors
    * @param outIndex - index to copy data to in outgoing batch vectors
    */
-  private void emitLeft(int leftIndex, int outIndex) {
-    copyDataToOutputVectors(leftIndex, outIndex, left, 0, leftSchema.getFieldCount(), 0);
+  private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
+    copyDataToOutputVectors(leftIndex, outIndex, left, 0,
+      leftSchema.getFieldCount(), 0, numRowsToCopy, false);
   }
 
   /**
@@ -819,8 +856,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
    * @param rightIndex - index to copy data from right incoming batch vectors
    * @param outIndex - index to copy data to in outgoing batch vectors
    */
-  private void emitRight(int rightIndex, int outIndex) {
-    copyDataToOutputVectors(rightIndex, outIndex, right, 0, rightSchema.getFieldCount(), leftSchema.getFieldCount());
+  private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
+    copyDataToOutputVectors(rightIndex, outIndex, right, 0,
+      rightSchema.getFieldCount(), leftSchema.getFieldCount(), numRowsToCopy, true);
   }
 
   /**
@@ -847,12 +885,24 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   }
 
   private void updateMemoryManager(int inputIndex) {
+
+    if (inputIndex == LEFT_INDEX && isNewLeftBatch) {
+      // reset state and continue to update
+      isNewLeftBatch = false;
+    } else if (inputIndex == RIGHT_INDEX && (rightJoinIndex == 0 || rightJoinIndex == -1)) {
+      // continue to update
+    } else {
+      return;
+    }
+
     // For cases where all the previous input were consumed and send with previous output batch. But now we are building
     // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
     final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
 
     if (logger.isDebugEnabled()) {
-      logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", batchMemoryManager.getRecordBatchSizer(inputIndex));
+      logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
+        batchMemoryManager.getRecordBatchSizer(inputIndex));
+      logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
     }
 
     if (useMemoryManager) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index e9e9aac..79a7bd4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -107,7 +107,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       .buildSchema();
     emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
 
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
   }
 
   @AfterClass
@@ -1754,7 +1754,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1863,7 +1863,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1964,7 +1964,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2091,7 +2091,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2225,7 +2225,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2369,7 +2369,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2723,7 +2723,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index f281964..03fd1c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -69,7 +69,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
 
   @BeforeClass public static void setUpBeforeClass() throws Exception {
     mockPopConfig = new MockStorePOP(null);
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.FULL);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
     operatorContext = fixture.newOperatorContext(mockPopConfig);
   }
 
@@ -875,8 +875,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     final ProjectRecordBatch projectBatch2 =
         new ProjectRecordBatch(projectPopConfig2, unnestBatch2, fixture.getFragmentContext());
 
-    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.FULL);
-    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.FULL);
+    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER);
+    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER);
 
     final LateralJoinBatch lateralJoinBatch2 =
         new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), projectBatch1, projectBatch2);