You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/31 22:47:41 UTC

[GitHub] ilooner closed pull request #1401: DRILL-6616: Batch Processing for Lateral/Unnest

ilooner closed pull request #1401: DRILL-6616: Batch Processing for Lateral/Unnest
URL: https://github.com/apache/drill/pull/1401
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
index ac6f31c4e76..8221367e843 100644
--- a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
+++ b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcPrel.java
@@ -130,4 +130,5 @@ public SelectionVectorMode getEncoding() {
   public boolean needsFinalColumnReordering() {
     return false;
   }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index e7518b8ba02..defdd708388 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -51,6 +51,7 @@
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
@@ -235,7 +236,7 @@ public PhysicalOperator visitFilter(final Filter filter, final Object obj) throw
 
     @Override
     public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) {
-      return new UnnestPOP(null, unnest.getColumn());
+      return new UnnestPOP(null, unnest.getColumn(), DrillUnnestRelBase.IMPLICIT_COLUMN);
     }
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index 55ede962826..5d6150962d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -38,6 +38,9 @@
   @JsonProperty("excludedColumns")
   private List<SchemaPath> excludedColumns;
 
+  @JsonProperty("implicitRIDColumn")
+  private String implicitRIDColumn;
+
   @JsonProperty("unnestForLateralJoin")
   private UnnestPOP unnestForLateralJoin;
 
@@ -46,6 +49,7 @@ public LateralJoinPOP(
       @JsonProperty("left") PhysicalOperator left,
       @JsonProperty("right") PhysicalOperator right,
       @JsonProperty("joinType") JoinRelType joinType,
+      @JsonProperty("implicitRIDColumn") String implicitRIDColumn,
       @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
     super(left, right, joinType, null, null);
     Preconditions.checkArgument(joinType != JoinRelType.FULL,
@@ -53,13 +57,14 @@ public LateralJoinPOP(
     Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
       "Right join is currently not supported with Lateral Join");
     this.excludedColumns = excludedColumns;
+    this.implicitRIDColumn = implicitRIDColumn;
   }
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.size() == 2,
       "Lateral join should have two physical operators");
-    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns);
+    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), children.get(1), joinType, this.implicitRIDColumn, this.excludedColumns);
     newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
     return newPOP;
   }
@@ -78,6 +83,9 @@ public void setUnnestForLateralJoin(UnnestPOP unnest) {
     this.unnestForLateralJoin = unnest;
   }
 
+  @JsonProperty("implicitRIDColumn")
+  public String getImplicitRIDColumn() { return this.implicitRIDColumn; }
+
   @Override
   public int getOperatorType() {
     return CoreOperatorType.LATERAL_JOIN_VALUE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
index f95481821c9..022ea3720ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnnestPOP.java
@@ -39,6 +39,9 @@
 public class UnnestPOP extends AbstractBase implements Leaf {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestPOP.class);
 
+  @JsonProperty("implicitColumn")
+  private String implicitColumn;
+
   private SchemaPath column;
 
   @JsonIgnore
@@ -47,14 +50,16 @@
   @JsonCreator
   public UnnestPOP(
       @JsonProperty("child") PhysicalOperator child, // Operator with incoming record batch
-      @JsonProperty("column") SchemaPath column) {
+      @JsonProperty("column") SchemaPath column,
+      @JsonProperty("implicitColumn") String implicitColumn) {
     this.column = column;
+    this.implicitColumn = implicitColumn;
   }
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
     assert children.isEmpty();
-    UnnestPOP newUnnest = new UnnestPOP(null, column);
+    UnnestPOP newUnnest = new UnnestPOP(null, column, this.implicitColumn);
     newUnnest.addUnnestBatch(this.unnestBatch);
     return newUnnest;
   }
@@ -82,6 +87,9 @@ public UnnestRecordBatch getUnnestBatch() {
     return this.unnestBatch;
   }
 
+  @JsonProperty("implicitColumn")
+  public String getImplicitColumn() { return this.implicitColumn; }
+
   @Override
   public int getOperatorType() {
     return UNNEST_VALUE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index fc3c8b1888f..18843b5b4b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -39,10 +39,13 @@
 import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -90,6 +93,17 @@
 
   private final HashSet<String> excludedFieldNames = new HashSet<>();
 
+  private final String implicitColumn;
+
+  private boolean hasRemainderForLeftJoin = false;
+
+  private ValueVector implicitVector;
+
+  // Map to cache reference of input and corresponding output vectors for left and right batches
+  private final Map<ValueVector, ValueVector> leftInputOutputVector = new HashMap<>();
+
+  private final Map<ValueVector, ValueVector> rightInputOutputVector = new HashMap<>();
+
   /* ****************************************************************************************************************
    * Public Methods
    * ****************************************************************************************************************/
@@ -99,7 +113,8 @@ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
     final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    // Prepare Schema Path Mapping
+    implicitColumn = popConfig.getImplicitRIDColumn();
+
     populateExcludedField(popConfig);
     batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);
 
@@ -108,6 +123,27 @@ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
     maxOutputRowCount = batchMemoryManager.getOutputRowCount();
   }
 
+  /**
+   * Handles cases where previous output batch got full after processing all the batches from right side for a left
+   * side batch. But there are still few unprocessed rows in left batch which cannot be ignored because JoinType is
+   * LeftJoin.
+   * @return - true if all the rows in left batch is produced in output container
+   *           false if there is still some rows pending in left incoming container
+   */
+  private boolean handleRemainingLeftRows() {
+    Preconditions.checkState(popConfig.getJoinType() == JoinRelType.LEFT,
+      "Unexpected leftover rows from previous left batch when join type is not left join");
+
+    while(leftJoinIndex < left.getRecordCount() && !isOutgoingBatchFull()) {
+      emitLeft(leftJoinIndex, outputIndex, 1);
+      ++outputIndex;
+      ++leftJoinIndex;
+    }
+
+    // Check if there is still pending left rows
+    return leftJoinIndex >= left.getRecordCount();
+  }
+
   /**
    * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is
    * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty
@@ -118,11 +154,37 @@ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
   @Override
   public IterOutcome innerNext() {
 
+    if (hasRemainderForLeftJoin) { // if set that means there is spill over from previous left batch and no
+      // corresponding right rows and it is left join scenario
+      allocateVectors();
+
+      boolean hasMoreRows = !handleRemainingLeftRows();
+      if (leftUpstream == EMIT || hasMoreRows) {
+        logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " +
+          "consumed now in output batch");
+        hasRemainderForLeftJoin = hasMoreRows;
+        finalizeOutputContainer();
+        return (leftUpstream == EMIT) ? EMIT : OK;
+      } else {
+        // release memory for previous left batch
+        leftJoinIndex = -1;
+        VectorAccessibleUtilities.clear(left);
+      }
+    }
+
     // We don't do anything special on FIRST state. Process left batch first and then right batch if need be
     IterOutcome childOutcome = processLeftBatch();
+    logger.debug("Received left batch with outcome {}", childOutcome);
+
+    if (processLeftBatchInFuture && hasRemainderForLeftJoin) {
+      finalizeOutputContainer();
+      hasRemainderForLeftJoin = false;
+      return OK;
+    }
 
     // reset this state after calling processLeftBatch above.
     processLeftBatchInFuture = false;
+    hasRemainderForLeftJoin = false;
 
     // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned
     // from left side is terminal state then just return the IterOutcome and don't call next() on right branch
@@ -133,6 +195,7 @@ public IterOutcome innerNext() {
 
     // Left side has some records in the batch so let's process right batch
     childOutcome = processRightBatch();
+    logger.debug("Received right batch with outcome {}", childOutcome);
 
     // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with
     // OK_NEW_SCHEMA will be handled in subsequent next call
@@ -228,7 +291,7 @@ public RecordBatch getIncoming() {
   @Override
   public int getRecordIndex() {
     Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
-      String.format("Left join index: %d is out of bounds: %d", leftJoinIndex, left.getRecordCount()));
+      "Left join index: %s is out of bounds: %s", leftJoinIndex, left.getRecordCount());
     return leftJoinIndex;
   }
 
@@ -321,8 +384,7 @@ protected void killIncoming(boolean sendUpstream) {
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
-      logger.debug(String.format("Setting up new schema based on incoming batch. Old output schema: %s",
-        container.getSchema()));
+      logger.debug("Setting up new schema based on incoming batch. Old output schema: %s", container.getSchema());
       setupNewSchema();
       return true;
     } catch (SchemaChangeException ex) {
@@ -492,7 +554,7 @@ private IterOutcome processRightBatch() {
   /**
    * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the
    * records in the either or both the batches are consumed then it get's next batch from that branch depending upon
-   * if output batch still has some space left. If output batch is full then the output if finalized to be sent
+   * if output batch still has some space left. If output batch is full then the output is finalized to be sent
    * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the
    * output using that.
    *
@@ -504,53 +566,53 @@ private IterOutcome produceOutputBatch() {
 
     // Try to fully pack the outgoing container
     while (!isOutgoingBatchFull()) {
-      final int previousOutputCount = outputIndex;
-      // invoke the runtime generated method to emit records in the output batch for each leftJoinIndex
+      // perform the cross join between records in left and right batch and populate the output container
       crossJoinAndOutputRecords();
 
-      // We have produced some records in outgoing container, hence there must be a match found for left record
-      if (outputIndex > previousOutputCount) {
-        // Need this extra flag since there can be left join case where for current leftJoinIndex it receives a right
-        // batch with data, then an empty batch and again another empty batch with EMIT outcome. If we just use
-        // outputIndex then we will loose the information that few rows for leftJoinIndex is already produced using
-        // first right batch
-        matchedRecordFound = true;
-      }
-
-      // One right batch might span across multiple output batch. So rightIndex will be moving sum of all the
-      // output records for this record batch until it's fully consumed.
-      //
-      // Also it can be so that one output batch can contain records from 2 different right batch hence the
-      // rightJoinIndex should move by number of records in output batch for current right batch only.
-      rightJoinIndex += outputIndex - previousOutputCount;
+      // rightJoinIndex should move by number of records in output batch for current right batch only. For cases when
+      // right batch is fully consumed rightJoinIndex will be equal to record count. For cases when only part of it is
+      // consumed in current output batch rightJoinIndex will point to next row to be consumed
       final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount();
 
-      // Check if above join to produce output was based on empty right batch or
-      // it resulted in right side batch to be fully consumed. In this scenario only if rightUpstream
-      // is EMIT then increase the leftJoinIndex.
-      // Otherwise it means for the given right batch there is still some record left to be processed.
+      // Check if above join to produce output resulted in fully consuming right side batch
       if (isRightProcessed) {
-        if (rightUpstream == EMIT) {
-          if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
-            // copy left side in case of LEFT join
-            emitLeft(leftJoinIndex, outputIndex, 1);
-            ++outputIndex;
-          }
-          ++leftJoinIndex;
-          // Reset matchedRecord for next left index record
-          matchedRecordFound = false;
-        }
-
         // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK
         VectorAccessibleUtilities.clear(right);
         rightJoinIndex = -1;
       }
 
-      // Check if previous left record was last one, then set leftJoinIndex to -1
-      isLeftProcessed = leftJoinIndex >= left.getRecordCount();
+      // Check if all rows in right batch is processed and there was a match for last rowId and this is last
+      // right batch for this left batch, then increment the leftJoinIndex. If this is not the last right batch we
+      // cannot increase the leftJoinIndex even though a match is found because next right batch can contain more
+      // records for the same implicit rowId
+      if (isRightProcessed && rightUpstream == EMIT && matchedRecordFound) {
+        ++leftJoinIndex;
+        matchedRecordFound = false;
+      }
+
+      // left is only declared as processed if this is last right batch for current left batch and we have processed
+      // all the rows in it.
+      isLeftProcessed = (rightUpstream == EMIT) && leftJoinIndex >= left.getRecordCount();
+
+      // Even though if left batch is not fully processed but we have received EMIT outcome from right side.
+      // In this case if left batch has some unprocessed rows and it's left join emit left side for these rows.
+      // If it's inner join then just set treat left batch as processed.
+      if (!isLeftProcessed && rightUpstream == EMIT && isRightProcessed) {
+        if (popConfig.getJoinType() == JoinRelType.LEFT) {
+          // If outgoing batch got full that means we still have some leftJoinIndex to output but right side is done
+          // producing the batches. So mark hasRemainderForLeftJoin=true and we will take care of it in future next call.
+          isLeftProcessed = handleRemainingLeftRows();
+          hasRemainderForLeftJoin = !isLeftProcessed;
+        } else {
+          // not left join hence ignore rows pending in left batch since right side is done producing the output
+          isLeftProcessed = true;
+        }
+      }
+
       if (isLeftProcessed) {
         leftJoinIndex = -1;
         VectorAccessibleUtilities.clear(left);
+        matchedRecordFound = false;
       }
 
       // Check if output batch still has some space
@@ -562,14 +624,17 @@ private IterOutcome produceOutputBatch() {
           if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
             break;
           } else {
-            logger.debug("Output batch still has some space left, getting new batches from left and right");
+            logger.debug("Output batch still has some space left, getting new batches from left and right. OutIndex: {}"
+              , outputIndex);
             // Get both left batch and the right batch and make sure indexes are properly set
             leftUpstream = processLeftBatch();
 
+            logger.debug("Received left batch with outcome {}", leftUpstream);
+
             // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome
             if (processLeftBatchInFuture) {
-              logger.debug("Received left batch with outcome {} such that we have to return the current outgoing " +
-                "batch and process the new batch in subsequent next call", leftUpstream);
+              logger.debug("Received left batch such that we have to return the current outgoing batch and process " +
+                "the new batch in subsequent next call");
               // We should return the current output batch with OK outcome and don't reset the leftUpstream
               finalizeOutputContainer();
               return OK;
@@ -602,6 +667,9 @@ private IterOutcome produceOutputBatch() {
         // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with
         // OK_NEW_SCHEMA.
         rightUpstream = processRightBatch();
+
+        logger.debug("Received right batch with outcome {}", rightUpstream);
+
         if (rightUpstream == OK_NEW_SCHEMA) {
           leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
           rightUpstream = OK;
@@ -617,10 +685,11 @@ private IterOutcome produceOutputBatch() {
         // Update the batch memory manager to use new right incoming batch
         updateMemoryManager(RIGHT_INDEX);
 
-        // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
-        // output container based on new left schema and old right schema. If schema change failed then return STOP
+        // If previous left batch is fully processed and it didn't produced any output rows and later we got a new
+        // non-empty left batch with OK_NEW_SCHEMA with schema change only on left side vectors, then setup schema
+        // in output container based on new left schema and old right schema. If schema change failed then return STOP
         // downstream
-        if (leftUpstream == OK_NEW_SCHEMA && isLeftProcessed) {
+        if (leftUpstream == OK_NEW_SCHEMA && outputIndex == 0) {
           if (!handleSchemaChange()) {
             return STOP;
           }
@@ -651,7 +720,6 @@ private IterOutcome produceOutputBatch() {
     }
     return OK;
   }
-
   /**
    * Finalizes the current output container with the records produced so far before sending it downstream
    */
@@ -710,7 +778,7 @@ private boolean verifyInputSchema(BatchSchema schema) {
     return isValid;
   }
 
-  private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
+  private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema, boolean isRightBatch) {
     if (excludedFieldNames.size() == 0) {
       return originSchema;
     }
@@ -718,7 +786,10 @@ private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
     final SchemaBuilder newSchemaBuilder =
       BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
     for (MaterializedField field : originSchema) {
-      if (!excludedFieldNames.contains(field.getName())) {
+      // Don't ignore implicit column from left side in multilevel case where plan is generated such that lower lateral
+      // is on the right side of upper lateral.
+      if (!excludedFieldNames.contains(field.getName()) ||
+        (field.getName().equals(implicitColumn) && !isRightBatch)) {
         newSchemaBuilder.addField(field);
       }
     }
@@ -731,13 +802,16 @@ private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
    */
   private void setupNewSchema() throws SchemaChangeException {
 
-    logger.debug(String.format("Setting up new schema based on incoming batch. New left schema: %s" +
-        " and New right schema: %s", left.getSchema(), right.getSchema()));
+    logger.debug("Setting up new schema based on incoming batch. New left schema: %s and New right schema: %s",
+      left.getSchema(), right.getSchema());
 
     // Clear up the container
     container.clear();
-    leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
-    rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());
+    leftInputOutputVector.clear();
+    rightInputOutputVector.clear();
+
+    leftSchema = batchSchemaWithNoExcludedCols(left.getSchema(), false);
+    rightSchema = batchSchemaWithNoExcludedCols(right.getSchema(), true);
 
     if (!verifyInputSchema(leftSchema)) {
       throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -747,10 +821,11 @@ private void setupNewSchema() throws SchemaChangeException {
       throw new SchemaChangeException("Invalid Schema found for right incoming batch");
     }
 
-    // Setup LeftSchema in outgoing container
+    // Setup LeftSchema in outgoing container and also include implicit column if present in left side for multilevel
+    // case if plan is generated such that lower lateral is right child of upper lateral
     for (final VectorWrapper<?> vectorWrapper : left) {
       final MaterializedField leftField = vectorWrapper.getField();
-      if (excludedFieldNames.contains(leftField.getName())) {
+      if (excludedFieldNames.contains(leftField.getName()) && !(leftField.getName().equals(implicitColumn))) {
         continue;
       }
       container.addOrGet(leftField);
@@ -760,6 +835,9 @@ private void setupNewSchema() throws SchemaChangeException {
     for (final VectorWrapper<?> vectorWrapper : right) {
       MaterializedField rightField = vectorWrapper.getField();
       if (excludedFieldNames.contains(rightField.getName())) {
+        if (rightField.getName().equals(implicitColumn)) {
+          implicitVector = vectorWrapper.getValueVector();
+        }
         continue;
       }
 
@@ -778,10 +856,21 @@ private void setupNewSchema() throws SchemaChangeException {
       container.addOrGet(rightField);
     }
 
+    Preconditions.checkState(implicitVector != null,
+      "Implicit column vector %s not found in right incoming batch", implicitColumn);
+
     // Let's build schema for the container
     outputIndex = 0;
     container.setRecordCount(outputIndex);
     container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+    // Setup left vectors
+    setupInputOutputVectors(left, 0, leftSchema.getFieldCount(), 0, false);
+
+    // Setup right vectors
+    setupInputOutputVectors(right, 0, rightSchema.getFieldCount(),
+      leftSchema.getFieldCount(), true);
+
     logger.debug("Output Schema created {} based on input left schema {} and right schema {}", container.getSchema(),
       leftSchema, rightSchema);
   }
@@ -790,6 +879,11 @@ private void setupNewSchema() throws SchemaChangeException {
    * Simple method to allocate space for all the vectors in the container.
    */
   private void allocateVectors() {
+    if (outputIndex > 0) {
+      logger.trace("Allocation is already done for output container vectors since it already holds some record");
+      return;
+    }
+
     for (VectorWrapper w : container) {
       RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
       colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
@@ -816,10 +910,36 @@ private boolean setBatchState(IterOutcome outcome) {
     return true;
   }
 
+  /**
+   * Creates a map of rowId to number of rows with that rowId in the right incoming batch of Lateral Join. It is
+   * expected from UnnestRecordBatch to add an implicit column of IntVectorType with each output row. All the array
+   * records belonging to same row in left incoming will have same rowId in the Unnest output batch.
+   * @return - map of rowId to rowCount in right batch
+   */
+  private Map<Integer, Integer> getRowIdToRowCountMapping() {
+    final Map<Integer, Integer> indexToFreq = new HashMap<>();
+    final IntVector rowIdVector = (IntVector) implicitVector;
+    int prevRowId = rowIdVector.getAccessor().get(rightJoinIndex);
+    int countRows = 1;
+    for (int i=rightJoinIndex + 1; i < right.getRecordCount(); ++i) {
+      int currentRowId = rowIdVector.getAccessor().get(i);
+      if (prevRowId == currentRowId) {
+        ++countRows;
+      } else {
+        indexToFreq.put(prevRowId, countRows);
+        prevRowId = currentRowId;
+        countRows = 1;
+      }
+    }
+    indexToFreq.put(prevRowId, countRows);
+    return indexToFreq;
+  }
+
   /**
    * Main entry point for producing the output records. This method populates the output batch after cross join of
-   * the record in a given left batch at left index and all the corresponding right batches produced for
-   * this left index. The right container is copied starting from rightIndex until number of records in the container.
+   * the record in a given left batch at left index and all the corresponding rows in right batches produced by Unnest
+   * for current left batch. For each call to this function number of records copied in output batch is limited to
+   * maximum rows output batch can hold or the number of rows in right incoming batch
    */
   private void crossJoinAndOutputRecords() {
     final int rightRecordCount = right.getRecordCount();
@@ -834,11 +954,7 @@ private void crossJoinAndOutputRecords() {
 
     int currentOutIndex = outputIndex;
     // Number of rows that can be copied in output batch
-    final int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;
-    // Number of rows that can be copied inside output batch is minimum of available slot in
-    // output batch and available data to copy from right side. It can be half consumed right batch
-    // which has few more rows to be copied to output but output batch has more to fill.
-    final int rowsToCopy = Math.min(maxAvailableRowSlot, (rightRecordCount - rightJoinIndex));
+    int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;
 
     if (logger.isDebugEnabled()) {
       logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and " +
@@ -846,40 +962,85 @@ private void crossJoinAndOutputRecords() {
       logger.debug("Output Batch stats before copying new data: {}", new RecordBatchSizer(this));
     }
 
-    // First copy all the left vectors data. Doing it in this way since it's the same data being copied over may be
-    // we will have performance gain from JVM
-    emitLeft(leftJoinIndex, currentOutIndex, rowsToCopy);
-
-    // Copy all the right side vectors data
-    emitRight(rightJoinIndex, currentOutIndex, rowsToCopy);
-
-    // Update outputIndex
-    outputIndex += rowsToCopy;
+    // Assuming that first vector in right batch is for implicitColumn.
+    // get a mapping of number of rows for each rowId present in current right side batch
+    //final Map<Integer, Integer> indexToFreq = getRowIdToRowCountMapping();
+    final IntVector rowIdVector = (IntVector) implicitVector;
+    final int leftRecordCount = left.getRecordCount();
+
+    // we need to have both conditions because in left join case we can exceed the maxAvailableRowSlot before reaching
+    // rightBatch end or vice-versa
+    while(maxAvailableRowSlot > 0 && rightJoinIndex < rightRecordCount) {
+      // Get rowId from current right row
+      int currentRowId = rowIdVector.getAccessor().get(rightJoinIndex);
+      int leftRowId = leftJoinIndex + 1;
+      int numRowsCopied = 0;
+
+      Preconditions.checkState(currentRowId <= leftRecordCount || leftJoinIndex <= leftRecordCount,
+        "Either RowId in right batch is greater than total records in left batch or all rows in left batch " +
+          "is processed but there are still rows in right batch. Details[RightRowId: %s, LeftRecordCount: %s, " +
+          "LeftJoinIndex: %s, RightJoinIndex: %s]", currentRowId, leftRecordCount, leftJoinIndex, rightJoinIndex);
+
+      logger.trace("leftRowId and currentRowId are: {}, {}", leftRowId, currentRowId);
+
+      // If leftRowId matches the rowId in right row then emit left and right row. Increment outputIndex, rightJoinIndex
+      // and numRowsCopied. Also set leftMatchFound to true to indicate when to increase leftJoinIndex.
+      if (leftRowId == currentRowId) {
+        // there is a match
+        matchedRecordFound = true;
+        numRowsCopied = 1;
+        //numRowsCopied = Math.min(indexToFreq.get(currentRowId), maxAvailableRowSlot);
+        emitRight(rightJoinIndex, outputIndex, numRowsCopied);
+        emitLeft(leftJoinIndex, outputIndex, numRowsCopied);
+        outputIndex += numRowsCopied;
+        rightJoinIndex += numRowsCopied;
+      } else if (leftRowId < currentRowId) {
+        // If a matching record for leftRowId was found in right batch in previous iteration, increase the leftJoinIndex
+        // and reset the matchedRecordFound flag
+        if (matchedRecordFound) {
+          matchedRecordFound = false;
+          ++leftJoinIndex;
+          continue;
+        } else { // If no matching row was found in right batch then in case of left join produce left row in output
+          // and increase the indexes properly to reflect that
+          if (JoinRelType.LEFT == popConfig.getJoinType()) {
+            numRowsCopied = 1;
+            emitLeft(leftJoinIndex, outputIndex, numRowsCopied);
+            ++outputIndex;
+          }
+          ++leftJoinIndex;
+        }
+      } else {
+        Preconditions.checkState(leftRowId <= currentRowId, "Unexpected case where rowId " +
+          "%s in right batch of lateral is smaller than rowId %s in left batch being processed",
+          currentRowId, leftRowId);
+      }
+      // Update the max available rows slot in output batch
+      maxAvailableRowSlot -= numRowsCopied;
+    }
   }
 
   /**
-   * Given a record batch, copies data from all it's vectors at fromRowIndex to all the vectors in output batch at
-   * toRowIndex. It iterates over all the vectors from startVectorIndex to endVectorIndex inside the record batch to
-   * copy the data and copies it inside vectors from startVectorIndex + baseVectorIndex to endVectorIndex +
-   * baseVectorIndex.
-   * @param fromRowIndex - row index of all the vectors in batch to copy data from
-   * @param toRowIndex - row index of all the vectors in outgoing batch to copy data to
-   * @param batch - source record batch holding vectors with data
-   * @param startVectorIndex - start index of vector inside source record batch
-   * @param endVectorIndex - end index of vector inside source record batch
-   * @param baseVectorIndex - base index to be added to startVectorIndex to get corresponding vector in outgoing batch
-   * @param numRowsToCopy - Number of rows to copy into output batch
-   * @param moveFromIndex - boolean to indicate if the fromIndex should also be increased or not. Since in case of
-   *                      copying data from left vector fromIndex is constant whereas in case of copying data from right
-   *                      vector fromIndex will move along with output index.
+   * Get's references of vector's from input and output vector container and create the mapping between them in
+   * respective maps. Example: for right incoming batch the references of input vector to corresponding output
+   * vector will be stored in {@link LateralJoinBatch#rightInputOutputVector}. This is done here such that during
+   * copy we don't have to figure out this mapping everytime for each input and output vector and then do actual copy.
+   * There was overhead seen with functions {@link MaterializedField#getValueClass()} and
+   * {@link RecordBatch#getValueAccessorById(Class, int...)} since it will be called for each row copy.
+   * @param batch - Incoming RecordBatch
+   * @param startVectorIndex - StartIndex of output vector container
+   * @param endVectorIndex - endIndex of output vector container
+   * @param baseVectorIndex - delta to add in startIndex for getting vectors in output container
+   * @param isRightBatch - is batch passed left or right child
    */
-  private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBatch batch,
-                                       int startVectorIndex, int endVectorIndex, int baseVectorIndex,
-                                       int numRowsToCopy, boolean moveFromIndex) {
+  private void setupInputOutputVectors(RecordBatch batch, int startVectorIndex, int endVectorIndex,
+                                       int baseVectorIndex, boolean isRightBatch) {
     // Get the vectors using field index rather than Materialized field since input batch field can be different from
     // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
     // container.
     int inputIndex = 0;
+    final Map<ValueVector, ValueVector> mappingToUse = (isRightBatch) ? rightInputOutputVector : leftInputOutputVector;
+
     for (int i = startVectorIndex; i < endVectorIndex; ++i) {
       // Get output vector
       final int outputVectorIndex = i + baseVectorIndex;
@@ -895,21 +1056,47 @@ private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBat
         inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
         inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
         inputFieldName = inputVector.getField().getName();
+
+        // If implicit column is in left batch then preserve it
+        if (inputFieldName.equals(implicitColumn) && !isRightBatch) {
+          ++inputIndex;
+          break;
+        }
+
         ++inputIndex;
       } while (excludedFieldNames.contains(inputFieldName));
 
-      Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
-        new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
-          ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));
+      Preconditions.checkState(outputFieldName.equals(inputFieldName),
+        "Non-excluded Input and output container fields are not in same order. " +
+          "[Output Schema: %s and Input Schema:%s]", this.getSchema(), batch.getSchema());
 
+      mappingToUse.put(inputVector, outputVector);
+    }
+  }
+
+  /**
+   * Given a vector reference mapping between source and destination vector, copies data from all the source vectors
+   * at fromRowIndex to all the destination vectors in output batch at toRowIndex.
+   *
+   * @param fromRowIndex - row index of all the vectors in batch to copy data from
+   * @param toRowIndex - row index of all the vectors in outgoing batch to copy data to
+   * @param mapping - source record batch holding vectors with data
+   * @param numRowsToCopy - Number of rows to copy into output batch
+   * @param isRightBatch - boolean to indicate if the fromIndex should also be increased or not. Since in case of
+   *                      copying data from left vector fromIndex is constant whereas in case of copying data from right
+   *                      vector fromIndex will move along with output index.
+   */
+  private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, Map<ValueVector, ValueVector> mapping,
+                                       int numRowsToCopy, boolean isRightBatch) {
+    for (Map.Entry<ValueVector, ValueVector> entry : mapping.entrySet()) {
       logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
-          "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
-          " NumBaseIndex: {}) ]",
-        fromRowIndex, inputValueClass, toRowIndex, outputValueClass, numRowsToCopy, baseVectorIndex);
+          "(RowIndex: {}, ColumnName: {}), OutputBatch: (RowIndex: {}, ColumnName: {}) and Other: (TimeEachValue: {})]",
+        fromRowIndex, entry.getKey().getField().getName(), toRowIndex, entry.getValue().getField().getName(),
+        numRowsToCopy);
 
       // Copy data from input vector to output vector for numRowsToCopy times.
       for (int j = 0; j < numRowsToCopy; ++j) {
-        outputVector.copyEntry(toRowIndex + j, inputVector, (moveFromIndex) ? fromRowIndex + j : fromRowIndex);
+        entry.getValue().copyEntry(toRowIndex + j, entry.getKey(), (isRightBatch) ? fromRowIndex + j : fromRowIndex);
       }
     }
   }
@@ -919,10 +1106,12 @@ private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBat
    * outgoing record batch
    * @param leftIndex - index to copy data from left incoming batch vectors
    * @param outIndex - index to copy data to in outgoing batch vectors
+   * @param numRowsToCopy - number of rows to copy from source vector to destination vectors
    */
   private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
-    copyDataToOutputVectors(leftIndex, outIndex, left, 0,
-      leftSchema.getFieldCount(), 0, numRowsToCopy, false);
+    logger.trace("Copying the left batch data. Details: [leftIndex: {}, outputIndex: {}, numsCopy: {}]",
+      leftIndex, outIndex, numRowsToCopy);
+    copyDataToOutputVectors(leftIndex, outIndex, leftInputOutputVector, numRowsToCopy, false);
   }
 
   /**
@@ -930,10 +1119,12 @@ private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
    * outgoing record batch
    * @param rightIndex - index to copy data from right incoming batch vectors
    * @param outIndex - index to copy data to in outgoing batch vectors
+   * @param numRowsToCopy - number of rows to copy from source vector to destination vectors
    */
   private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
-    copyDataToOutputVectors(rightIndex, outIndex, right, 0,
-      rightSchema.getFieldCount(), leftSchema.getFieldCount(), numRowsToCopy, true);
+    logger.trace("Copying the right batch data. Details: [rightIndex: {}, outputIndex: {}, numsCopy: {}]",
+      rightIndex, outIndex, numRowsToCopy);
+    copyDataToOutputVectors(rightIndex, outIndex, rightInputOutputVector, numRowsToCopy, true);
   }
 
   /**
@@ -960,7 +1151,6 @@ private boolean isOutgoingBatchFull() {
   }
 
   private void updateMemoryManager(int inputIndex) {
-
     if (inputIndex == LEFT_INDEX && isNewLeftBatch) {
       // reset state and continue to update
       isNewLeftBatch = false;
@@ -986,6 +1176,7 @@ private void updateMemoryManager(int inputIndex) {
   }
 
   private void populateExcludedField(PhysicalOperator lateralPop) {
+    excludedFieldNames.add(implicitColumn);
     final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
     if (excludedCols != null) {
       for (SchemaPath currentPath : excludedCols) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 5888c347fc0..d28fd47a110 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -78,6 +78,10 @@ public IterOutcome innerNext() {
         }
         // If EMIT that means leaf operator is UNNEST, in this case refresh the limit states and return EMIT.
         if (upStream == EMIT) {
+          // Clear the memory for the incoming batch
+          for (VectorWrapper<?> wrapper : incoming) {
+            wrapper.getValueVector().clear();
+          }
           refreshLimitState();
           return upStream;
         }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
index 1a042b4aad6..3fe19cb706f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/Unnest.java
@@ -19,9 +19,9 @@
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 
 import java.util.List;
@@ -35,8 +35,8 @@
   //TemplateClassDefinition<Unnest> TEMPLATE_DEFINITION = new TemplateClassDefinition<Unnest>(Unnest.class, UnnestImpl
   // .class);
 
-  void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers,
-      LateralContract lateral) throws SchemaChangeException;
+  void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers)
+      throws SchemaChangeException;
 
   /**
    * Performs the actual unnest operation.
@@ -59,6 +59,11 @@ void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
 
   RepeatedValueVector getUnnestField();
 
+  /**
+   * Set the vector for the rowId implicit column
+   */
+  void setRowIdVector (IntVector v);
+
   /**
    * Reset the index at which the incoming vector is being processed. Called every
    * time a new batch comes in.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 1d3b8f236cd..02d2f183393 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -21,10 +21,10 @@
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.LateralContract;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.SchemaChangeCallBack;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -36,9 +36,10 @@
 import static org.apache.drill.exec.record.BatchSchema.SelectionVectorMode.NONE;
 
 /**
- * Contains the actual unnest operation. Unnest is a simple transfer operation in this impelementation.
- * For use as a table function, we will need to change the logic of the unnest method to operate on
- * more than one row at a time and remove any dependence on Lateral
+ * Contains the actual unnest operation. Unnest is a simple transfer operation in this implementation.
+ * Additionally, unnest produces an implicit rowId column that allows unnest to output batches with many
+ * rows of incoming data being unnested in a single call to innerNext(). Downstream blocking operators need
+ * to be aware of this rowId column and include the rowId as the sort or group by key.
  * This class follows the pattern of other operators that generate code at runtime. Normally this class
  * would be abstract and have placeholders for doSetup and doEval. Unnest however, doesn't require code
  * generation so we can simply implement the code in a simple class that looks similar to the code gen
@@ -48,12 +49,14 @@
   private static final Logger logger = LoggerFactory.getLogger(UnnestImpl.class);
 
   private ImmutableList<TransferPair> transfers;
-  private LateralContract lateral; // corresponding lateral Join (or other operator implementing the Lateral Contract)
   private SelectionVectorMode svMode;
   private RepeatedValueVector fieldToUnnest;
   private RepeatedValueVector.RepeatedAccessor accessor;
   private RecordBatch outgoing;
 
+  private IntVector rowIdVector ; // Allocated and owned by the UnnestRecordBatch
+  private IntVector.Mutator rowIdVectorMutator;
+
   /**
    * The output batch limit starts at OUTPUT_ROW_COUNT, but may be decreased
    * if records are found to be large.
@@ -61,11 +64,32 @@
   private int outputLimit = ValueVector.MAX_ROW_COUNT;
 
 
-  // The index in the unnest column that is being processed.We start at zero and continue until
-  // InnerValueCount is reached or  if the batch limit is reached
-  // this allows for groups to be written between batches if we run out of space, for cases where we have finished
+
+  /**
+   * We maintain three indexes
+   *
+   *
+   *
+                valueIndex  0         1       2   3
+                            |- - - - -|- - - -|- -|- - - -|
+                            | | | | | | | | | | | | | | | |
+                            |- - - - -|- - - -|- -|- - - -|
+           innerValueIndex  0 1 2 3 4 0 1 2 3 0 1 0 1 2 3 |
+     runningInnerValueIndex 0 1 2 3 4 5 6 7 8 9 ...
+   *
+   *
+   *
+   */
+  private int valueIndex; // index in the incoming record being processed
+  // The index of the array element in the unnest column at row pointed by valueIndex which is currently being
+  // processed. It starts at zero and continue until InnerValueCount is reached or the batch limit is reached. It
+  // allows for groups to be written across batches if we run out of space. For cases where we have finished
   // a batch on the boundary it will be set to 0
   private int innerValueIndex = 0;
+  // The index in the "values" vector of the current value being processed.
+  private int runningInnerValueIndex;
+
+
 
   @Override
   public void setUnnestField(RepeatedValueVector unnestField) {
@@ -83,47 +107,69 @@ public void setOutputCount(int outputCount) {
     outputLimit = outputCount;
   }
 
+  @Override
+  public void setRowIdVector(IntVector v) {
+    this.rowIdVector = v;
+    this.rowIdVectorMutator = rowIdVector.getMutator();
+  }
+
   @Override
   public final int unnestRecords(final int recordCount) {
     Preconditions.checkArgument(svMode == NONE, "Unnest does not support selection vector inputs.");
-    if (innerValueIndex == -1) {
-      innerValueIndex = 0;
-    }
-
-    // Current record being processed in the incoming record batch. We could keep
-    // track of it ourselves, but it is better to check with the Lateral Join and get the
-    // current record being processed thru the Lateral Join Contract.
-    final int currentRecord = lateral.getRecordIndex();
-    final int innerValueCount = accessor.getInnerValueCountAt(currentRecord);
-    final int count = Math.min(Math.min(innerValueCount, outputLimit), recordCount);
 
-    logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}", innerValueCount,
-        recordCount, outputLimit);
+    final int initialInnerValueIndex = runningInnerValueIndex;
+
+    outer:
+    {
+      int outputIndex = 0; // index in the output vector that we are writing to
+      final int valueCount = accessor.getValueCount();
+
+      for (; valueIndex < valueCount; valueIndex++) {
+        final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
+        logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}",
+            innerValueCount, recordCount, outputLimit);
+
+        for (; innerValueIndex < innerValueCount; innerValueIndex++) {
+          // If we've hit the batch size limit, stop and flush what we've got so far.
+          if (outputIndex == outputLimit) {
+            // Flush this batch.
+            break outer;
+          }
+          try {
+            // rowId starts at 1, so the value for rowId is valueIndex+1
+            rowIdVectorMutator.setSafe(outputIndex, valueIndex + 1);
+
+          } finally {
+            outputIndex++;
+            runningInnerValueIndex++;
+          }
+        }
+        innerValueIndex = 0;
+      }  // forevery value in the array
+    }  // for every incoming record
+    final int delta = runningInnerValueIndex - initialInnerValueIndex;
     final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
-      t.splitAndTransfer(innerValueIndex, count);
+      t.splitAndTransfer(initialInnerValueIndex, delta);
 
       // Get the corresponding ValueVector in output container and transfer the data
       final ValueVector vectorWithData = t.getTo();
       final ValueVector outputVector = outgoing.getContainer().addOrGet(vectorWithData.getField(), callBack);
-      Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have " +
-        "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
+      Preconditions.checkState(!callBack.getSchemaChangedAndReset(), "Outgoing container doesn't have "
+          + "expected ValueVector of type %s, present in TransferPair of unnest field", vectorWithData.getClass());
       vectorWithData.makeTransferPair(outputVector).transfer();
     }
-    innerValueIndex += count;
-    return count;
-
+    return delta;
   }
 
   @Override
   public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
-      List<TransferPair> transfers, LateralContract lateral) throws SchemaChangeException {
+      List<TransferPair> transfers) throws SchemaChangeException {
 
     this.svMode = incoming.getSchema().getSelectionVectorMode();
     this.outgoing = outgoing;
     if (svMode == NONE) {
       this.transfers = ImmutableList.copyOf(transfers);
-      this.lateral = lateral;
     } else {
       throw new UnsupportedOperationException("Unnest does not support selection vector inputs.");
     }
@@ -131,7 +177,9 @@ public final void setup(FragmentContext context, RecordBatch incoming, RecordBat
 
   @Override
   public void resetGroupIndex() {
+    this.valueIndex = 0;
     this.innerValueIndex = 0;
+    this.runningInnerValueIndex = 0;
   }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index d985423ef45..2e2f405a3fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -21,6 +21,8 @@
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -36,6 +38,7 @@
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedMapVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -49,6 +52,9 @@
 public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPOP> {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnnestRecordBatch.class);
 
+  private final String rowIdColumnName; // name of the field holding the rowId implicit column
+  private IntVector rowIdVector; // vector to keep the implicit rowId column in
+
   private Unnest unnest = new UnnestImpl();
   private boolean hasNewSchema = false; // set to true if a new schema was encountered and an empty batch was
                                         // sent. The next iteration, we need to make sure the record batch sizer
@@ -103,8 +109,11 @@ public void update() {
       final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
 
       // Get column size of unnest column.
+
       RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName());
 
+      final int rowIdColumnSize = TypeHelper.getSize(rowIdVector.getField().getType());
+
       // Average rowWidth of single element in the unnest list.
       // subtract the offset vector size from column data size.
       final int avgRowWidthSingleUnnestEntry = RecordBatchSizer
@@ -112,7 +121,7 @@ public void update() {
               .getElementCount());
 
       // Average rowWidth of outgoing batch.
-      final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry;
+      final int avgOutgoingRowWidth = avgRowWidthSingleUnnestEntry + rowIdColumnSize;
 
       // Number of rows in outgoing batch
       final int outputBatchSize = getOutputBatchSize();
@@ -141,6 +150,7 @@ public UnnestRecordBatch(UnnestPOP pop, FragmentContext context) throws OutOfMem
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
     memoryManager = new UnnestMemoryManager(configuredBatchSize);
+    rowIdColumnName = pop.getImplicitColumn();
   }
 
   @Override
@@ -181,6 +191,7 @@ public IterOutcome innerNext() {
     }
 
     if (nextState == IterOutcome.NONE || nextState == IterOutcome.EMIT) {
+      recordCount = 0;
       return nextState;
     }
 
@@ -297,9 +308,13 @@ protected IterOutcome doWork() {
     // inside of the the unnest for the current batch
     setUnnestVector();
 
-    //Expected output count is the num of values in the unnest colum array for the current record
-    final int childCount =
-        incomingRecordCount == 0 ? 0 : unnest.getUnnestField().getAccessor().getInnerValueCountAt(currentRecord) - remainderIndex;
+    int remainingRecordCount = unnest.getUnnestField().getAccessor().getInnerValueCount() - remainderIndex;
+
+    // Allocate vector for rowId
+    rowIdVector.allocateNew(Math.min(remainingRecordCount, memoryManager.getOutputRowCount()));
+
+    //Expected output count is the num of values in the unnest column array
+    final int childCount = incomingRecordCount == 0 ? 0 : remainingRecordCount;
 
     // Unnest the data
     final int outputRecords = childCount == 0 ? 0 : unnest.unnestRecords(childCount);
@@ -317,6 +332,7 @@ protected IterOutcome doWork() {
       logger.debug("IterOutcome: EMIT.");
     }
     this.recordCount = outputRecords;
+    this.rowIdVector.getMutator().setValueCount(outputRecords);
 
     memoryManager.updateOutgoingStats(outputRecords);
     // If the current incoming record has spilled into two batches, we return
@@ -376,7 +392,7 @@ private TransferPair resetUnnestTransferPair() throws SchemaChangeException {
     transfers.add(transferPair);
     logger.debug("Added transfer for unnest expression.");
     unnest.close();
-    unnest.setup(context, incoming, this, transfers, lateral);
+    unnest.setup(context, incoming, this, transfers);
     setUnnestVector();
     return transferPair;
   }
@@ -386,7 +402,12 @@ protected boolean setupNewSchema() throws SchemaChangeException {
     Preconditions.checkNotNull(lateral);
     container.clear();
     recordCount = 0;
+    final MaterializedField rowIdField = MaterializedField.create(rowIdColumnName, Types.required(TypeProtos
+        .MinorType.INT));
+    this.rowIdVector= (IntVector)TypeHelper.getNewVector(rowIdField, oContext.getAllocator());
+    container.add(rowIdVector);
     unnest = new UnnestImpl();
+    unnest.setRowIdVector(rowIdVector);
     final TransferPair tp = resetUnnestTransferPair();
     container.add(TypeHelper.getNewVector(tp.getTo().getField(), oContext.getAllocator()));
     container.buildSchema(SelectionVectorMode.NONE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
index 5a2b40e162b..31866a8543f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
@@ -42,6 +42,8 @@
 
 public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {
 
+  final public static String IMPLICIT_COLUMN = DrillRelOptUtil.IMPLICIT_COLUMN;
+
   final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST;
   final public boolean excludeCorrelateColumn;
   public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
@@ -71,7 +73,7 @@ protected RelDataType deriveRowType() {
       case LEFT:
       case INNER:
         return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
-          right.getRowType(), joinType.toJoinType(),
+          removeImplicitField(right.getRowType()), joinType.toJoinType(),
           getCluster().getTypeFactory(), null,
           ImmutableList.of()));
       case ANTI:
@@ -118,6 +120,21 @@ public RelDataType constructRowType(RelDataType inputRowType) {
     return inputRowType;
   }
 
+  public RelDataType removeImplicitField(RelDataType inputRowType) {
+    List<RelDataType> fields = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
+
+    for (RelDataTypeField field : inputRowType.getFieldList()) {
+      if (field.getName().equals(IMPLICIT_COLUMN)) {
+        continue;
+      }
+      fieldNames.add(field.getName());
+      fields.add(field.getType());
+    }
+
+    return getCluster().getTypeFactory().createStructType(fields, fieldNames);
+  }
+
   @Override
   public double estimateRowCount(RelMetadataQuery mq) {
     return mq.getRowCount(left);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 9dd5032b1e0..4ef539ad752 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -20,6 +20,7 @@
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -55,6 +56,8 @@
  */
 public abstract class DrillRelOptUtil {
 
+  final public static String IMPLICIT_COLUMN = "$drill_implicit_field$";
+
   // Similar to RelOptUtil.areRowTypesEqual() with the additional check for allowSubstring
   public static boolean areRowTypesCompatible(
       RelDataType rowType1,
@@ -317,6 +320,47 @@ public Void visitCall(RexCall call) {
     }
   }
 
+  /**
+   * For a given row type return a map between old field indices and one index right shifted fields.
+   * @param rowType : row type to be right shifted.
+   * @return map: hash map between old and new indices
+   */
+  public static Map<Integer, Integer> rightShiftColsInRowType(RelDataType rowType) {
+    Map<Integer, Integer> map = new HashMap<>();
+    int fieldCount = rowType.getFieldCount();
+    for (int i = 0; i< fieldCount; i++) {
+      map.put(i, i+1);
+    }
+    return map;
+  }
+
+  /**
+   * Given a list of rexnodes it transforms the rexnodes by changing the expr to use new index mapped to the old index.
+   * @param builder : RexBuilder from the planner.
+   * @param exprs: RexNodes to be transformed.
+   * @param corrMap: Mapping between old index to new index.
+   * @return
+   */
+  public static List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
+    List<RexNode> outputExprs = new ArrayList<>();
+    DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
+    for (RexNode expr : exprs) {
+      outputExprs.add(transformer.go(expr));
+    }
+    return outputExprs;
+  }
+
+  /**
+   * Given a of rexnode it transforms the rexnode by changing the expr to use new index mapped to the old index.
+   * @param builder : RexBuilder from the planner.
+   * @param expr: RexNode to be transformed.
+   * @param corrMap: Mapping between old index to new index.
+   * @return
+   */
+  public static RexNode transformExpr(RexBuilder builder, RexNode expr, Map<Integer, Integer> corrMap) {
+    DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
+    return transformer.go(expr);
+  }
 
   /**
    * RexFieldsTransformer is a utility class used to convert column refs in a RexNode
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
index 04bb2d68a1e..ed6942b1795 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillUnnestRelBase.java
@@ -29,6 +29,7 @@
 public class DrillUnnestRelBase extends AbstractRelNode implements DrillRelNode {
 
   final protected RexNode ref;
+  final public static String IMPLICIT_COLUMN = DrillRelOptUtil.IMPLICIT_COLUMN;
 
   public DrillUnnestRelBase(RelOptCluster cluster, RelTraitSet traitSet, RexNode ref) {
     super(cluster, traitSet);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
index 6a57c89fbba..622eb05728e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
@@ -22,14 +22,12 @@
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.drill.exec.planner.StarColumnHelper;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -80,7 +78,7 @@ public void onMatch(RelOptRuleCall call) {
 
     if (!DrillRelOptUtil.isTrivialProject(origProj, true)) {
       Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex);
-      List<RexNode> outputExprs = transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr);
+      List<RexNode> outputExprs = DrillRelOptUtil.transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr);
 
       relNode = new DrillProjectRel(origProj.getCluster(),
                                     left.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
@@ -89,15 +87,6 @@ public void onMatch(RelOptRuleCall call) {
     call.transformTo(relNode);
   }
 
-  private List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
-    List<RexNode> outputExprs = new ArrayList<>();
-    DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
-    for (RexNode expr : exprs) {
-      outputExprs.add(transformer.go(expr));
-    }
-    return outputExprs;
-  }
-
   private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) {
     int index = 0;
     Map<Integer, Integer> result = new HashMap();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index b10f9432235..a4f51f315eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -188,4 +188,26 @@ public boolean needsFinalColumnReordering() {
     return true;
   }
 
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    List<Integer> groupingCols = Lists.newArrayList();
+    groupingCols.add(0);
+    for (int groupingCol : groupSet.asList()) {
+      groupingCols.add(groupingCol + 1);
+    }
+
+    ImmutableBitSet groupingSet = ImmutableBitSet.of(groupingCols);
+    List<ImmutableBitSet> groupingSets = Lists.newArrayList();
+    groupingSets.add(groupingSet);
+    List<AggregateCall> aggregateCalls = Lists.newArrayList();
+    for (AggregateCall aggCall : aggCalls) {
+      List<Integer> arglist = Lists.newArrayList();
+      for (int arg : aggCall.getArgList()) {
+        arglist.add(arg + 1);
+      }
+      aggregateCalls.add(AggregateCall.create(aggCall.getAggregation(), aggCall.isDistinct(),
+              aggCall.isApproximate(), arglist, aggCall.filterArg, aggCall.type, aggCall.name));
+    }
+    return (Prel) copy(traitSet, children.get(0),indicator,groupingSet,groupingSets, aggregateCalls);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 08c633ed6cb..1c9112c5673 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -19,10 +19,13 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.Filter;
 import org.apache.drill.exec.planner.common.DrillFilterRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -81,4 +84,11 @@ public boolean needsFinalColumnReordering() {
     return true;
   }
 
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    RexBuilder builder = this.getCluster().getRexBuilder();
+    // right shift the previous field indices.
+    return (Prel) this.copy(this.traitSet, children.get(0), DrillRelOptUtil.transformExpr(builder,
+            condition, DrillRelOptUtil.rightShiftColsInRowType(this.getInput().getRowType())));
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index b647279a3e8..acaa5d2a36e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -83,5 +83,4 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
   public SelectionVectorMode getEncoding() {
     return SelectionVectorMode.NONE;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index 40ac60e5b25..1f4a86949da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -138,4 +138,5 @@ protected void buildJoinConditions(List<JoinCondition> conditions,
           FieldReference.getWithQuotedRef(rightFields.get(pair.right))));
     }
   }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
index b10eff000df..44163bac1e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
@@ -72,7 +72,7 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
     if (getColumn() != null) {
       excludedColumns.add(getColumn());
     }
-    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), excludedColumns);
+    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), DrillLateralJoinRelBase.IMPLICIT_COLUMN, excludedColumns);
     return creator.addMetadata(this, ljoin);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index baf80d11a6c..5d3c6c68653 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -89,27 +89,8 @@ public boolean needsFinalColumnReordering() {
     return true;
   }
 
-//  @Override
-//  public LogicalOperator implement(DrillImplementor implementor) {
-//    LogicalOperator inputOp = implementor.visitChild(this, 0, getInput());
-//
-//    // First offset to include into results (inclusive). Null implies it is starting from offset 0
-//    int first = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0;
-//
-//    // Last offset to stop including into results (exclusive), translating fetch row counts into an offset.
-//    // Null value implies including entire remaining result set from first offset
-//    Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
-//    Limit limit = new Limit(first, last);
-//    limit.setInput(inputOp);
-//    return limit;
-//  }
-
-//  public static LimitPrel convert(Limit limit, ConversionContext context) throws InvalidRelException{
-//    RelNode input = context.toRel(limit.getInput());
-//    RexNode first = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getFirst()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
-//    RexNode last = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getLast()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
-//    return new LimitPrel(context.getCluster(), context.getLogicalTraits(), input, first, last);
-//  }
-
-
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    return (Prel) this.copy(this.traitSet, children);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
index 77794d05ee5..b72aff70b9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
@@ -18,9 +18,10 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
-
+import java.util.List;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.common.DrillRelNode;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -54,4 +55,10 @@ public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits,
    */
   SelectionVectorMode getEncoding();
   boolean needsFinalColumnReordering();
+
+  default Prel addImplicitRowIDCol(List<RelNode> children) {
+    throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
+            this.getClass().getSimpleName() + " operator ");
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 7f634c3d70e..0a9e8bf1aeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -18,11 +18,18 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -128,4 +135,29 @@ public boolean needsFinalColumnReordering() {
     return false;
   }
 
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
+    RexBuilder builder = this.getCluster().getRexBuilder();
+    List<RexNode> projects = Lists.newArrayList();
+    projects.add(builder.makeInputRef(typeFactory.createSqlType(SqlTypeName.INTEGER), 0));
+    // right shift the previous field indices.
+    projects.addAll(DrillRelOptUtil.transformExprs(builder, this.getProjects(),
+                        DrillRelOptUtil.rightShiftColsInRowType(this.getInput().getRowType())));
+
+    List<String> fieldNames = new ArrayList<>();
+    List<RelDataType> fieldTypes = new ArrayList<>();
+
+    fieldNames.add("$drill_implicit_field$");
+    fieldTypes.add(typeFactory.createSqlType(SqlTypeName.INTEGER));
+
+    for (RelDataTypeField field : this.rowType.getFieldList()) {
+      fieldNames.add(field.getName());
+      fieldTypes.add(field.getType());
+    }
+
+    RelDataType newRowType = typeFactory.createStructType(fieldTypes, fieldNames);
+
+    return (Prel) this.copy(this.getTraitSet(), children.get(0), projects, newRowType);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 5cf01dad5bc..a2655b36e9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -168,4 +168,5 @@ public boolean needsFinalColumnReordering() {
   public DistributionAffinity getDistributionAffinity() {
     return groupScan.getDistributionAffinity();
   }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
index 047f7430daf..ee2b3091960 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScreenPrel.java
@@ -84,4 +84,5 @@ public boolean needsFinalColumnReordering() {
   public DistributionAffinity getDistributionAffinity() {
     return DistributionAffinity.HARD;
   }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index 591261907ec..a4cd9211d0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -54,5 +54,8 @@ public SelectionVectorMode getEncoding() {
     return SelectionVectorMode.NONE;
   }
 
-
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    return (Prel) this.copy(this.traitSet, children);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java
index a3190c84c50..959acb4ca58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SinglePrel.java
@@ -51,4 +51,5 @@ public SinglePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
   public boolean needsFinalColumnReordering() {
     return true;
   }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 4365f2c775d..686e04a4031 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -19,7 +19,11 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ExternalSort;
@@ -119,4 +123,22 @@ public boolean needsFinalColumnReordering() {
     return true;
   }
 
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
+    relFieldCollations.add(new RelFieldCollation(0,
+                            RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
+    for (RelFieldCollation fieldCollation : this.collation.getFieldCollations()) {
+      relFieldCollations.add(new RelFieldCollation(fieldCollation.getFieldIndex() + 1,
+              fieldCollation.direction, fieldCollation.nullDirection));
+    }
+
+    RelCollation collationTrait = RelCollationImpl.of(relFieldCollations);
+    RelTraitSet traits = RelTraitSet.createEmpty()
+                                    .replace(this.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE))
+                                    .replace(collationTrait)
+                                    .replace(DRILL_PHYSICAL);
+
+    return this.copy(traits, children.get(0), collationTrait, this.offset, this.fetch);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 61d744d25a8..9bdcad06b9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -20,6 +20,9 @@
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelCollationImpl;
+import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -87,7 +90,6 @@ public RelWriter explainTerms(RelWriter pw) {
         .item("limit", limit);
   }
 
-
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.NONE_AND_TWO;
@@ -97,4 +99,22 @@ public RelWriter explainTerms(RelWriter pw) {
   public SelectionVectorMode getEncoding() {
     return SelectionVectorMode.FOUR_BYTE;
   }
+
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
+    relFieldCollations.add(new RelFieldCollation(0,
+                          RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
+    for (RelFieldCollation fieldCollation : this.collation.getFieldCollations()) {
+      relFieldCollations.add(new RelFieldCollation(fieldCollation.getFieldIndex() + 1,
+              fieldCollation.direction, fieldCollation.nullDirection));
+    }
+
+    RelCollation collationTrait = RelCollationImpl.of(relFieldCollations);
+    RelTraitSet traits = RelTraitSet.createEmpty()
+                                    .replace(this.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE))
+                                    .replace(collationTrait)
+                                    .replace(DRILL_PHYSICAL);
+    return (Prel) this.copy(traits, children);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
index 692b3d2ae14..23311383242 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -19,9 +19,13 @@
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -30,8 +34,10 @@
 import org.apache.drill.exec.record.BatchSchema;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 public class UnnestPrel extends DrillUnnestRelBase implements Prel {
 
@@ -40,7 +46,7 @@
   public UnnestPrel(RelOptCluster cluster, RelTraitSet traits,
                     RelDataType rowType, RexNode ref) {
     super(cluster, traits, ref);
-    this.unnestPOP = new UnnestPOP(null, SchemaPath.getSimplePath(((RexFieldAccess)ref).getField().getName()));
+    this.unnestPOP = new UnnestPOP(null, SchemaPath.getSimplePath(((RexFieldAccess)ref).getField().getName()), DrillUnnestRelBase.IMPLICIT_COLUMN);
     this.rowType = rowType;
   }
 
@@ -78,4 +84,22 @@ public boolean needsFinalColumnReordering() {
   public Class<?> getParentClass() {
     return LateralJoinPrel.class;
   }
+
+  @Override
+  public Prel addImplicitRowIDCol(List<RelNode> children) {
+    RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
+    List<String> fieldNames = new ArrayList<>();
+    List<RelDataType> fieldTypes = new ArrayList<>();
+
+    fieldNames.add(IMPLICIT_COLUMN);
+    fieldTypes.add(typeFactory.createSqlType(SqlTypeName.INTEGER));
+
+    for (RelDataTypeField field : this.rowType.getFieldList()) {
+      fieldNames.add(field.getName());
+      fieldTypes.add(field.getType());
+    }
+
+    RelDataType newRowType = typeFactory.createStructType(fieldTypes, fieldNames);
+    return new UnnestPrel(this.getCluster(), this.getTraitSet(), newRowType, ref);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
new file mode 100644
index 00000000000..469220252eb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical.visitor;
+
+import com.google.common.collect.Lists;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.physical.LateralJoinPrel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.UnnestPrel;
+
+import java.util.List;
+
+/**
+ * LateralUnnestRowIDVisitor traverses the physical plan and modifies all the operators in the
+ * pipeline of Lateral and Unnest operators to accommodate IMPLICIT_COLUMN. The source for the
+ * IMPLICIT_COLUMN is unnest operator and the sink for the column is the corresponding Lateral
+ * join operator.
+ */
+public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, RuntimeException> {
+
+  private static LateralUnnestRowIDVisitor INSTANCE = new LateralUnnestRowIDVisitor();
+
+  public static Prel insertRowID(Prel prel){
+    return prel.accept(INSTANCE, false);
+  }
+
+  @Override
+  public Prel visitPrel(Prel prel, Boolean isRightOfLateral) throws RuntimeException {
+    List<RelNode> children = getChildren(prel, isRightOfLateral);
+    if (isRightOfLateral) {
+      return prel.addImplicitRowIDCol(children);
+    } else {
+      return (Prel) prel.copy(prel.getTraitSet(), children);
+    }
+  }
+
+  private List<RelNode> getChildren(Prel prel, Boolean isRightOfLateral) {
+    List<RelNode> children = Lists.newArrayList();
+    for(Prel child : prel){
+      child = child.accept(this, isRightOfLateral);
+      children.add(child);
+    }
+    return children;
+  }
+
+  @Override
+  public Prel visitLateral(LateralJoinPrel prel, Boolean value) throws RuntimeException {
+    List<RelNode> children = Lists.newArrayList();
+    children.add(((Prel)prel.getInput(0)).accept(this, false));
+    children.add(((Prel) prel.getInput(1)).accept(this, true));
+
+    return (Prel) prel.copy(prel.getTraitSet(), children);
+  }
+
+  @Override
+  public Prel visitUnnest(UnnestPrel prel, Boolean value) throws RuntimeException {
+    return prel.addImplicitRowIDCol(null);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
index b28d827009d..5b31683316a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
@@ -40,5 +40,4 @@
   RETURN visitPrel(Prel prel, EXTRA value) throws EXCEP;
   RETURN visitUnnest(UnnestPrel prel, EXTRA value) throws EXCEP;
   RETURN visitLateral(LateralJoinPrel prel, EXTRA value) throws EXCEP;
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index cc2ec609267..e0a2357979b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -93,6 +93,7 @@
 import org.apache.drill.exec.planner.physical.visitor.ExcessiveExchangeIdentifier;
 import org.apache.drill.exec.planner.physical.visitor.FinalColumnReorderer;
 import org.apache.drill.exec.planner.physical.visitor.InsertLocalExchangeVisitor;
+import org.apache.drill.exec.planner.physical.visitor.LateralUnnestRowIDVisitor;
 import org.apache.drill.exec.planner.physical.visitor.MemoryEstimationVisitor;
 import org.apache.drill.exec.planner.physical.visitor.RelUniqifier;
 import org.apache.drill.exec.planner.physical.visitor.RewriteProjectToFlatten;
@@ -562,6 +563,8 @@ protected Prel convertToPrel(RelNode drel, RelDataType validatedRowType) throws
      */
     phyRelNode = ExcessiveExchangeIdentifier.removeExcessiveEchanges(phyRelNode, targetSliceSize);
 
+    /* Insert the IMPLICIT_COLUMN in the lateral unnest pipeline */
+    phyRelNode = LateralUnnestRowIDVisitor.insertRowID(phyRelNode);
 
     /* 5.)
      * Add ProducerConsumer after each scan if the option is set
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index ffac4b6ef2e..6626176c423 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -28,6 +28,7 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
@@ -97,6 +98,9 @@ public static void setUpBeforeClass() throws Exception {
     PhysicalOperator mockPopConfig = new MockStorePOP(null);
     operatorContext = fixture.newOperatorContext(mockPopConfig);
 
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
     leftSchema = new SchemaBuilder()
       .add("id_left", TypeProtos.MinorType.INT)
       .add("cost_left", TypeProtos.MinorType.INT)
@@ -105,13 +109,12 @@ public static void setUpBeforeClass() throws Exception {
     emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build();
 
     rightSchema = new SchemaBuilder()
+      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right", TypeProtos.MinorType.INT)
       .add("cost_right", TypeProtos.MinorType.INT)
       .add("name_right", TypeProtos.MinorType.VARCHAR)
       .buildSchema();
     emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
-
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
   }
 
   @AfterClass
@@ -129,9 +132,9 @@ public void beforeTest() throws Exception {
       .build();
 
     nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema)
-      .addRow(1, 11, "item11")
-      .addRow(2, 21, "item21")
-      .addRow(3, 31, "item31")
+      .addRow(1, 1, 11, "item11")
+      .addRow(1, 2, 21, "item21")
+      .addRow(1, 3, 31, "item31")
       .build();
   }
 
@@ -193,10 +196,6 @@ public void testBuildSchemaEmptyLRBatch() throws Exception {
       while (!isTerminal(ljBatch.next())) {
         // do nothing
       }
-
-      // TODO: We can add check for output correctness as well
-      //assertTrue (((MockRecordBatch) leftMockBatch).isCompleted());
-      //assertTrue(((MockRecordBatch) rightMockBatch).isCompleted());
     } catch (AssertionError | Exception error) {
       fail();
     } finally {
@@ -450,8 +449,8 @@ public void test1RecordLeftBatchTo2RightRecordBatch() throws Exception {
 
     // Get the right container with dummy data
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     rightContainer.add(emptyRightRowSet.container());
@@ -547,8 +546,8 @@ public void testFillingUpOutputBatch() throws Exception {
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -620,8 +619,8 @@ public void testHandlingSchemaChangeForNonUnnestField() throws Exception {
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -693,6 +692,7 @@ public void testHandlingSchemaChangeForUnnestField() throws Exception {
 
     // Create right input schema
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right", TypeProtos.MinorType.INT)
       .add("cost_right", TypeProtos.MinorType.VARCHAR)
       .add("name_right", TypeProtos.MinorType.VARCHAR)
@@ -709,8 +709,8 @@ public void testHandlingSchemaChangeForUnnestField() throws Exception {
       .build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(4, "41", "item41")
-      .addRow(5, "51", "item51")
+      .addRow(1, 4, "41", "item41")
+      .addRow(1, 5, "51", "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -759,7 +759,8 @@ public void testHandlingSchemaChangeForUnnestField() throws Exception {
           leftRowSet2.rowCount() * nonEmptyRightRowSet2.rowCount()));
       assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
     } catch (AssertionError | Exception error) {
-      fail();
+      //fail();
+      throw error;
     } finally {
       // Close all the resources for this test case
       ljBatch.close();
@@ -788,6 +789,7 @@ public void testHandlingUnexpectedSchemaChangeForUnnestField() throws Exception
 
     // Create right input schema
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right", TypeProtos.MinorType.INT)
       .add("cost_right", TypeProtos.MinorType.VARCHAR)
       .add("name_right", TypeProtos.MinorType.VARCHAR)
@@ -804,8 +806,8 @@ public void testHandlingUnexpectedSchemaChangeForUnnestField() throws Exception
       .build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(4, "41", "item41")
-      .addRow(5, "51", "item51")
+      .addRow(1, 4, "41", "item41")
+      .addRow(1, 5, "51", "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -873,8 +875,8 @@ public void testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForNonUnnestField() throw
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -944,8 +946,8 @@ public void testOK_NEW_SCHEMA_WithNoActualSchemaChange_ForUnnestField() throws E
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -1019,8 +1021,8 @@ public void testHandlingEMITFromLeft() throws Exception {
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -1213,8 +1215,8 @@ public void testHandlingNonEmptyEMITAfterOK() throws Exception {
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -1289,8 +1291,8 @@ public void testHandlingNonEmpty_EMITAfterOK_WithMultipleOutput() throws Excepti
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -1399,12 +1401,6 @@ public void testHandlingOOMFromLeft() throws Exception {
       // Compare the total records generated in 2 output batches with expected count.
       assertTrue(totalRecordCount ==
         (nonEmptyLeftRowSet.rowCount() * nonEmptyRightRowSet.rowCount()));
-
-      // TODO: We are not draining left or right batch anymore on receiving terminal outcome from either branch
-      // TODO: since not sure if that's the right behavior
-      //assertTrue(((MockRecordBatch) leftMockBatch).isCompleted());
-      //assertTrue(((MockRecordBatch) rightMockBatch).isCompleted());
-
     } catch (AssertionError | Exception error) {
       fail();
     } finally {
@@ -1447,11 +1443,6 @@ public void testHandlingOOMFromRight() throws Exception {
 
       // 2nd output batch
       assertTrue(RecordBatch.IterOutcome.OUT_OF_MEMORY == ljBatch.next());
-
-      // TODO: We are not draining left or right batch anymore on receiving terminal outcome from either branch
-      // TODO: since not sure if that's the right behavior
-      //assertTrue(((MockRecordBatch) leftMockBatch).isCompleted());
-      //assertTrue(((MockRecordBatch) rightMockBatch).isCompleted());
     } catch (AssertionError | Exception error) {
       fail();
     } finally {
@@ -1492,7 +1483,7 @@ public void testBasicLeftLateralJoin() throws Exception {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1528,9 +1519,9 @@ public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception {
       .build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(6, 60, "item61")
-      .addRow(7, 70, "item71")
-      .addRow(8, 80, "item81")
+      .addRow(2, 6, 60, "item61")
+      .addRow(2, 7, 70, "item71")
+      .addRow(2, 8, 80, "item81")
       .build();
 
     leftContainer.add(leftRowSet2.container());
@@ -1550,7 +1541,7 @@ public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception {
     rightContainer.add(emptyRightRowSet.container());
 
     rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
     rightOutcomes.add(RecordBatch.IterOutcome.OK);
     rightOutcomes.add(RecordBatch.IterOutcome.OK);
     rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
@@ -1558,7 +1549,7 @@ public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1576,6 +1567,8 @@ public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception {
       ljBatch.close();
       leftMockBatch.close();
       rightMockBatch.close();
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
     }
   }
 
@@ -1598,9 +1591,9 @@ public void testLeftLateralJoin_WithAndWithoutMatching() throws Exception {
       .build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(6, 60, "item61")
-      .addRow(7, 70, "item71")
-      .addRow(8, 80, "item81")
+      .addRow(3, 6, 60, "item61")
+      .addRow(3, 7, 70, "item71")
+      .addRow(3, 8, 80, "item81")
       .build();
 
     leftContainer.add(leftRowSet2.container());
@@ -1619,14 +1612,14 @@ public void testLeftLateralJoin_WithAndWithoutMatching() throws Exception {
     rightContainer.add(nonEmptyRightRowSet2.container());
 
     rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
     rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
 
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1638,12 +1631,15 @@ public void testLeftLateralJoin_WithAndWithoutMatching() throws Exception {
       assertTrue(ljBatch.getRecordCount() == expectedOutputRecordCount);
       assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
     } catch (AssertionError | Exception error) {
-      fail();
+      //fail();
+      throw error;
     } finally {
       // Close all the resources for this test case
       ljBatch.close();
       leftMockBatch.close();
       rightMockBatch.close();
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
     }
   }
 
@@ -1669,9 +1665,9 @@ public void testLeftLateralJoin_WithAndWithoutMatching_MultipleBatch() throws Ex
       .build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(6, 60, "item61")
-      .addRow(7, 70, "item71")
-      .addRow(8, 80, "item81")
+      .addRow(3, 6, 60, "item61")
+      .addRow(3, 7, 70, "item71")
+      .addRow(3, 8, 80, "item81")
       .build();
 
     leftContainer.add(leftRowSet2.container());
@@ -1690,14 +1686,14 @@ public void testLeftLateralJoin_WithAndWithoutMatching_MultipleBatch() throws Ex
     rightContainer.add(nonEmptyRightRowSet2.container());
 
     rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
     rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
 
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1721,12 +1717,15 @@ public void testLeftLateralJoin_WithAndWithoutMatching_MultipleBatch() throws Ex
       assertTrue(actualOutputRecordCount == expectedOutputRecordCount);
       assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
     } catch (AssertionError | Exception error) {
-      fail();
+      //fail();
+      throw error;
     } finally {
       // Close all the resources for this test case
       ljBatch.close();
       leftMockBatch.close();
       rightMockBatch.close();
+      //leftRowSet2.clear();
+      //nonEmptyRightRowSet2.clear();
     }
   }
 
@@ -1758,7 +1757,7 @@ public void testMultipleUnnestAtSameLevel() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1769,6 +1768,7 @@ public void testMultipleUnnestAtSameLevel() throws Exception {
 
     // Create right input schema
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right_1", TypeProtos.MinorType.INT)
       .add("cost_right_1", TypeProtos.MinorType.INT)
       .add("name_right_1", TypeProtos.MinorType.VARCHAR)
@@ -1777,9 +1777,9 @@ public void testMultipleUnnestAtSameLevel() throws Exception {
     final RowSet.SingleRowSet emptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2).build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(6, 60, "item61")
-      .addRow(7, 70, "item71")
-      .addRow(8, 80, "item81")
+      .addRow(1, 6, 60, "item61")
+      .addRow(1, 7, 70, "item71")
+      .addRow(1, 8, 80, "item81")
       .build();
 
     final List<VectorContainer> rightContainer2 = new ArrayList<>(5);
@@ -1791,8 +1791,8 @@ public void testMultipleUnnestAtSameLevel() throws Exception {
 
     final List<RecordBatch.IterOutcome> rightOutcomes2 = new ArrayList<>(5);
     rightOutcomes2.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes2.add(RecordBatch.IterOutcome.EMIT);
-    rightOutcomes2.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes2.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes2.add(RecordBatch.IterOutcome.OK);
     rightOutcomes2.add(RecordBatch.IterOutcome.EMIT);
 
     final CloseableRecordBatch rightMockBatch_2 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
@@ -1834,13 +1834,30 @@ public void testMultipleUnnestAtSameLevel() throws Exception {
   public void testMultiLevelLateral() throws Exception {
 
     // ** Prepare first pair of left batch and right batch for Lateral_1 **
-    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchema)
-      .addRow(2, 20, "item2")
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    // Create a left batch with implicit column for lower lateral left unnest
+    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1, 10, "item1")
       .build();
 
-    leftContainer.add(emptyLeftRowSet.container());
-    leftContainer.add(nonEmptyLeftRowSet.container());
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_2 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 2, 20, "item2")
+      .build();
+
+    leftContainer.add(emptyLeftRowSet_1.container());
     leftContainer.add(nonEmptyLeftRowSet_1.container());
+    leftContainer.add(nonEmptyLeftRowSet_2.container());
 
     // Get the left IterOutcomes for Lateral Join
     leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
@@ -1852,9 +1869,9 @@ public void testMultiLevelLateral() throws Exception {
 
     // Get the right container with dummy data
     final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema)
-      .addRow(5, 51, "item51")
-      .addRow(6, 61, "item61")
-      .addRow(7, 71, "item71")
+      .addRow(1, 5, 51, "item51")
+      .addRow(1, 6, 61, "item61")
+      .addRow(1, 7, 71, "item71")
       .build();
     rightContainer.add(emptyRightRowSet.container());
     rightContainer.add(nonEmptyRightRowSet.container());
@@ -1867,8 +1884,6 @@ public void testMultiLevelLateral() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
-
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
@@ -1935,13 +1950,28 @@ public void testMultiLevelLateral() throws Exception {
   public void testMultiLevelLateral_MultipleOutput() throws Exception {
 
     // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
-    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchema)
-      .addRow(2, 20, "item2")
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1, 10, "item1")
       .build();
 
-    leftContainer.add(emptyLeftRowSet.container());
-    leftContainer.add(nonEmptyLeftRowSet.container());
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_2 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 2, 20, "item2")
+      .build();
+
+    leftContainer.add(emptyLeftRowSet_1.container());
     leftContainer.add(nonEmptyLeftRowSet_1.container());
+    leftContainer.add(nonEmptyLeftRowSet_2.container());
 
     // Get the left IterOutcomes for Lateral Join
     leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
@@ -1953,9 +1983,9 @@ public void testMultiLevelLateral_MultipleOutput() throws Exception {
 
     // Get the right container with dummy data
     final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema)
-      .addRow(5, 51, "item51")
-      .addRow(6, 61, "item61")
-      .addRow(7, 71, "item71")
+      .addRow(1, 5, 51, "item51")
+      .addRow(1, 6, 61, "item61")
+      .addRow(1, 7, 71, "item71")
       .build();
     rightContainer.add(emptyRightRowSet.container());
     rightContainer.add(nonEmptyRightRowSet.container());
@@ -1968,8 +1998,6 @@ public void testMultiLevelLateral_MultipleOutput() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
-
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
@@ -2051,11 +2079,27 @@ public void testMultiLevelLateral_MultipleOutput() throws Exception {
   public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {
 
     // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
-    leftContainer.add(emptyLeftRowSet.container());
-    leftContainer.add(nonEmptyLeftRowSet.container());
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1, 10, "item1")
+      .build();
+
+    leftContainer.add(emptyLeftRowSet_1.container());
+    leftContainer.add(nonEmptyLeftRowSet_1.container());
 
     // Create left input schema2 for schema change batch
     TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("new_id_left", TypeProtos.MinorType.INT)
       .add("new_cost_left", TypeProtos.MinorType.INT)
       .add("new_name_left", TypeProtos.MinorType.VARCHAR)
@@ -2063,7 +2107,7 @@ public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {
 
     final RowSet.SingleRowSet emptyLeftRowSet_Schema2 = fixture.rowSetBuilder(leftSchema2).build();
     final RowSet.SingleRowSet nonEmptyLeftRowSet_Schema2 = fixture.rowSetBuilder(leftSchema2)
-      .addRow(1111, 10001, "NewRecord")
+      .addRow(1, 1111, 10001, "NewRecord")
       .build();
 
     leftContainer.add(emptyLeftRowSet_Schema2.container());
@@ -2080,9 +2124,9 @@ public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {
 
     // Get the right container with dummy data
     final RowSet.SingleRowSet nonEmptyRightRowSet_1 = fixture.rowSetBuilder(rightSchema)
-      .addRow(5, 51, "item51")
-      .addRow(6, 61, "item61")
-      .addRow(7, 71, "item71")
+      .addRow(1, 5, 51, "item51")
+      .addRow(1, 6, 61, "item61")
+      .addRow(1, 7, 71, "item71")
       .build();
     rightContainer.add(emptyRightRowSet.container());
     rightContainer.add(nonEmptyRightRowSet.container());
@@ -2095,8 +2139,6 @@ public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
-
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
@@ -2185,13 +2227,26 @@ public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {
   @Test
   public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
     // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
 
-    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema)
-      .addRow(1111, 10001, "NewRecord")
+    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1, 10, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1111, 10001, "NewRecord")
       .build();
 
-    leftContainer.add(emptyLeftRowSet.container());
-    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(emptyLeftRowSet_1.container());
+    leftContainer.add(nonEmptyLeftRowSet_1.container());
     leftContainer.add(nonEmptyLeftRowSet2.container());
 
     // Get the left IterOutcomes for Lateral Join
@@ -2204,6 +2259,7 @@ public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
 
     // Get the right container with dummy data
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right_new", TypeProtos.MinorType.INT)
       .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
       .add("name_right_new", TypeProtos.MinorType.VARCHAR)
@@ -2211,9 +2267,9 @@ public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
 
     final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
     final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(5, "51", "item51")
-      .addRow(6, "61", "item61")
-      .addRow(7, "71", "item71")
+      .addRow(1, 5, "51", "item51")
+      .addRow(1, 6, "61", "item61")
+      .addRow(1, 7, "71", "item71")
       .build();
 
     rightContainer.add(emptyRightRowSet.container());
@@ -2229,8 +2285,6 @@ public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
-
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
@@ -2319,9 +2373,24 @@ public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
   @Test
   public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exception {
     // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1, 10, "item1")
+      .build();
 
     // Create left input schema for first batch
     TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_left_new", TypeProtos.MinorType.INT)
       .add("cost_left_new", TypeProtos.MinorType.INT)
       .add("name_left_new", TypeProtos.MinorType.VARCHAR)
@@ -2329,11 +2398,11 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exceptio
 
     final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
     final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
-      .addRow(6, 60, "item6")
+      .addRow(1, 6, 60, "item6")
       .build();
 
-    leftContainer.add(emptyLeftRowSet.container());
-    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(emptyLeftRowSet_1.container());
+    leftContainer.add(nonEmptyLeftRowSet_1.container());
     leftContainer.add(emptyLeftRowSet_leftSchema2.container());
     leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
 
@@ -2348,6 +2417,7 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exceptio
 
     // Get the right container with dummy data
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right_new", TypeProtos.MinorType.INT)
       .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
       .add("name_right_new", TypeProtos.MinorType.VARCHAR)
@@ -2355,9 +2425,9 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exceptio
 
     final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
     final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(5, "51", "item51")
-      .addRow(6, "61", "item61")
-      .addRow(7, "71", "item71")
+      .addRow(1, 5, "51", "item51")
+      .addRow(1, 6, "61", "item61")
+      .addRow(1, 7, "71", "item71")
       .build();
 
     rightContainer.add(emptyRightRowSet.container());
@@ -2373,8 +2443,6 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exceptio
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
-
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
@@ -2502,7 +2570,7 @@ public void testUnsupportedSelectionVector() throws Exception {
 
   /**
    * Test to verify if OK_NEW_SCHEMA is received from left side of LATERAL post build schema phase and EMIT is
-   * received from right side of LATERAL for each row on lest side, then Lateral sends OK_NEW_SCHEMA downstream with
+   * received from right side of LATERAL for each row on left side, then Lateral sends OK_NEW_SCHEMA downstream with
    * the output batch. LATERAL shouldn't send any batch with EMIT outcome to the downstream operator as it is the
    * consumer of all the EMIT outcomes. It will work fine in case of Multilevel LATERAL too since there the lower
    * LATERAL only sends EMIT after it receives it from left UNNEST.
@@ -2513,13 +2581,13 @@ public void test_OK_NEW_SCHEMAFromLeft_EmitFromRight_PostBuildSchema() throws Ex
     // Get the left container with dummy data for Lateral Join
     TupleMetadata leftSchema3 = new SchemaBuilder()
       .add("id_left_left", TypeProtos.MinorType.INT)
-      .add("cost_left_left", TypeProtos.MinorType.INT)
+      .add("cost_left_left", TypeProtos.MinorType.VARCHAR)
       .add("name_left_left", TypeProtos.MinorType.VARCHAR)
       .buildSchema();
 
     final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema3 = fixture.rowSetBuilder(leftSchema3)
-      .addRow(6, 60, "item6")
-      .addRow(7, 70, "item7")
+      .addRow(6, "60", "item6")
+      .addRow(7, "70", "item7")
       .build();
 
     leftContainer.add(emptyLeftRowSet.container());
@@ -2535,7 +2603,7 @@ public void test_OK_NEW_SCHEMAFromLeft_EmitFromRight_PostBuildSchema() throws Ex
 
     // Get the right container with dummy data
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(10, 100, "list10")
+      .addRow(2, 10, 100, "list10")
       .build();
 
     rightContainer.add(emptyRightRowSet.container());
@@ -2543,7 +2611,7 @@ public void test_OK_NEW_SCHEMAFromLeft_EmitFromRight_PostBuildSchema() throws Ex
     rightContainer.add(nonEmptyRightRowSet2.container());
 
     rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
-    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
     rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
 
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
@@ -2586,6 +2654,7 @@ public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Except
 
     // Create right input schema
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right", TypeProtos.MinorType.INT)
       .add("cost_right", TypeProtos.MinorType.VARCHAR)
       .add("name_right", TypeProtos.MinorType.VARCHAR)
@@ -2602,8 +2671,8 @@ public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Except
       .build();
 
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(4, "41", "item41")
-      .addRow(5, "51", "item51")
+      .addRow(1, 4, "41", "item41")
+      .addRow(1, 5, "51", "item51")
       .build();
 
     // Get the left container with dummy data for Lateral Join
@@ -2673,9 +2742,24 @@ public void testPostBuildSchema_OK_NEW_SCHEMA_NonEmptyRightBatch() throws Except
   @Test
   public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() throws Exception {
     // ** Prepare first pair of left batch and right batch for lower level LATERAL Lateral_1 **
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    TupleMetadata leftSchemaWithImplicit = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .build();
+    final RowSet.SingleRowSet nonEmptyLeftRowSet_1 = fixture.rowSetBuilder(leftSchemaWithImplicit)
+      .addRow(1, 1, 10, "item1")
+      .build();
 
     // Create left input schema for first batch
     TupleMetadata leftSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_left_new", TypeProtos.MinorType.INT)
       .add("cost_left_new", TypeProtos.MinorType.INT)
       .add("name_left_new", TypeProtos.MinorType.VARCHAR)
@@ -2683,11 +2767,11 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
 
     final RowSet.SingleRowSet emptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2).build();
     final RowSet.SingleRowSet nonEmptyLeftRowSet_leftSchema2 = fixture.rowSetBuilder(leftSchema2)
-      .addRow(6, 60, "item6")
+      .addRow(1, 6, 60, "item6")
       .build();
 
-    leftContainer.add(emptyLeftRowSet.container());
-    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(emptyLeftRowSet_1.container());
+    leftContainer.add(nonEmptyLeftRowSet_1.container());
     leftContainer.add(emptyLeftRowSet_leftSchema2.container());
     leftContainer.add(nonEmptyLeftRowSet_leftSchema2.container());
 
@@ -2702,6 +2786,7 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
 
     // Get the right container with dummy data
     TupleMetadata rightSchema2 = new SchemaBuilder()
+      .add(popConfig_1.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
       .add("id_right_new", TypeProtos.MinorType.INT)
       .add("cost_right_new", TypeProtos.MinorType.VARCHAR)
       .add("name_right_new", TypeProtos.MinorType.VARCHAR)
@@ -2709,9 +2794,9 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
 
     final RowSet.SingleRowSet emptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2).build();
     final RowSet.SingleRowSet nonEmptyRightRowSet_rightSchema2 = fixture.rowSetBuilder(rightSchema2)
-      .addRow(5, "51", "item51")
-      .addRow(6, "61", "item61")
-      .addRow(7, "71", "item71")
+      .addRow(1, 5, "51", "item51")
+      .addRow(1, 6, "61", "item61")
+      .addRow(1, 7, "71", "item71")
       .build();
 
     rightContainer.add(emptyRightRowSet.container());
@@ -2727,8 +2812,6 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
-
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
 
@@ -2845,7 +2928,6 @@ public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Excep
     rightContainer.add(emptyRightRowSet.container());
     rightContainer.add(emptyRightRowSet.container());
     rightContainer.add(nonEmptyRightRowSet.container()); // non-empty OK_NEW_SCHEMA batch
-    rightContainer.add(emptyRightRowSet.container());
 
     rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
@@ -2876,7 +2958,7 @@ public void testLateral_SchemaChange_Left_EmptyRightBatchForFirst() throws Excep
 
   private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left,
                                    CloseableRecordBatch right, RowSet expectedRowSet) throws Exception {
-    LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, excludedCols);
     final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right);
 
     try {
@@ -2902,8 +2984,8 @@ public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception {
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     TupleMetadata expectedSchema = new SchemaBuilder()
@@ -2967,23 +3049,17 @@ public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception {
 
     // Create data for right input
     final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
-      .addRow(4, 41, "item41")
-      .addRow(5, 51, "item51")
+      .addRow(1, 4, 41, "item41")
+      .addRow(1, 5, 51, "item51")
       .build();
 
     TupleMetadata expectedSchema = new SchemaBuilder()
       .add("name_left", TypeProtos.MinorType.VARCHAR)
-      //.add("id_right", TypeProtos.MinorType.INT)
       .add("cost_right", TypeProtos.MinorType.INT)
       .add("name_right", TypeProtos.MinorType.VARCHAR)
       .buildSchema();
 
     final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
-      /*.addRow("item1", 1, 11, "item11")
-      .addRow("item1", 2, 21, "item21")
-      .addRow("item1", 3, 31, "item31")
-      .addRow("item20", 4, 41, "item41")
-      .addRow("item20", 5, 51, "item51") */
       .addRow("item1", 11, "item11")
       .addRow("item1", 21, "item21")
       .addRow("item1", 31, "item31")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectnessBatchProcessing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectnessBatchProcessing.java
new file mode 100644
index 00000000000..574d8dc023c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectnessBatchProcessing.java
@@ -0,0 +1,1007 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.LateralJoinPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestLateralJoinCorrectnessBatchProcessing extends SubOperatorTest {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestNewLateralJoinCorrectness.class);
+
+  // Operator Context for mock batch
+  private static OperatorContext operatorContext;
+
+  // Left Batch Schema
+  private static TupleMetadata leftSchema;
+
+  // Right Batch Schema
+  private static TupleMetadata rightSchema;
+
+  // Right Batch Schema
+  private static TupleMetadata expectedSchema;
+
+  // Right Batch Schema
+  private static TupleMetadata expectedSchemaLeftJoin;
+
+  // Empty left RowSet
+  private static RowSet.SingleRowSet emptyLeftRowSet;
+
+  // Non-Empty left RowSet
+  private static RowSet.SingleRowSet nonEmptyLeftRowSet;
+
+  // List of left incoming containers
+  private static final List<VectorContainer> leftContainer = new ArrayList<>(5);
+
+  // List of left IterOutcomes
+  private static final List<RecordBatch.IterOutcome> leftOutcomes = new ArrayList<>(5);
+
+  // Empty right RowSet
+  private static RowSet.SingleRowSet emptyRightRowSet;
+
+  // Non-Empty right RowSet
+  private static RowSet.SingleRowSet nonEmptyRightRowSet;
+
+  // List of right incoming containers
+  private static final List<VectorContainer> rightContainer = new ArrayList<>(5);
+
+  // List of right IterOutcomes
+  private static final List<RecordBatch.IterOutcome> rightOutcomes = new ArrayList<>(5);
+
+  // Lateral Join POP Config
+  private static LateralJoinPOP ljPopConfig;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    PhysicalOperator mockPopConfig = new MockStorePOP(null);
+    operatorContext = fixture.newOperatorContext(mockPopConfig);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+
+    leftSchema = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+    emptyLeftRowSet = fixture.rowSetBuilder(leftSchema).build();
+
+    rightSchema = new SchemaBuilder()
+      .add(ljPopConfig.getImplicitRIDColumn(), TypeProtos.MinorType.INT)
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+    emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
+
+    expectedSchema = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    expectedSchemaLeftJoin = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .add("id_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("cost_right", TypeProtos.MinorType.INT, TypeProtos.DataMode.OPTIONAL)
+      .add("name_right", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    operatorContext.close();
+    emptyLeftRowSet.clear();
+    emptyRightRowSet.clear();
+  }
+
+
+  @Before
+  public void beforeTest() throws Exception {
+    nonEmptyLeftRowSet = fixture.rowSetBuilder(leftSchema)
+      .addRow(1, 10, "item1")
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(4, 40, "item4")
+      .build();
+
+    nonEmptyRightRowSet = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(2, 22, 220, "item22")
+      .addRow(3, 33, 330, "item33")
+      .addRow(4, 44, 440, "item44")
+      .build();
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    nonEmptyLeftRowSet.clear();
+    leftContainer.clear();
+    leftOutcomes.clear();
+    nonEmptyRightRowSet.clear();
+    rightContainer.clear();
+    rightOutcomes.clear();
+  }
+
+  @Test
+  public void testLeftAndRightAllMatchingRows_SingleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightAllMatchingRows_MultipleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 44, 440, "item44")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet3.container());
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .build();
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (nonEmptyLeftRowSet.rowCount() + nonEmptyRightRowSet3.rowCount()));
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet3.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightAllMatchingRows_SecondBatch_Empty() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(emptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightWithMissingRows_SingleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(4, 44, 440, "item44")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == nonEmptyRightRowSet2.rowCount());
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightWithMissingRows_LeftJoin_SingleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(4, 44, 440, "item44")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", null, null, null)
+      .addRow(3, 30, "item3", null, null, null)
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == nonEmptyLeftRowSet.rowCount());
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightWithInitialMissingRows_MultipleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(2, 22, 220, "item22")
+      .addRow(3, 33, 330, "item33")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 44, 440, "item44_1")
+      .addRow(4, 44, 440, "item44_2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .addRow(4, 40, "item4", 44, 440, "item44_1")
+      .addRow(4, 40, "item4", 44, 440, "item44_2")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+    rightContainer.add(nonEmptyRightRowSet3.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightWithInitialMissingRows_LeftJoin_MultipleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(2, 22, 220, "item22")
+      .addRow(3, 33, 330, "item33")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 44, 440, "item44_1")
+      .addRow(4, 44, 440, "item44_2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(1, 10, "item1", null, null, null)
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .addRow(4, 40, "item4", 44, 440, "item44_1")
+      .addRow(4, 40, "item4", 44, 440, "item44_2")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+    rightContainer.add(nonEmptyRightRowSet3.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (1 + nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightWithLastMissingRows_MultipleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(2, 22, 220, "item22")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
+      .addRow(3, 33, 330, "item33_1")
+      .addRow(3, 33, 330, "item33_2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33_1")
+      .addRow(3, 30, "item3", 33, 330, "item33_2")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+    rightContainer.add(nonEmptyRightRowSet3.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRightWithLastMissingRows_LeftJoin_MultipleBatch() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(2, 22, 220, "item22")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
+      .addRow(3, 33, 330, "item33_1")
+      .addRow(3, 33, 330, "item33_2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33_1")
+      .addRow(3, 30, "item3", 33, 330, "item33_2")
+      .addRow(4, 40, "item4", null, null, null)
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+    rightContainer.add(nonEmptyRightRowSet3.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (1 + nonEmptyRightRowSet2.rowCount() + nonEmptyRightRowSet3.rowCount()));
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRight_OutputFull_InRightBatchMiddle() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet3 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 44, 440, "item44_2_1")
+      .addRow(4, 44, 440, "item44_2_2")
+      .addRow(4, 44, 440, "item44_2_3")
+      .addRow(4, 44, 440, "item44_2_4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .addRow(4, 40, "item4", 44, 440, "item44")
+      .addRow(4, 40, "item4", 44, 440, "item44_2_1")
+      .addRow(4, 40, "item4", 44, 440, "item44_2_2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchema)
+      .addRow(4, 40, "item4", 44, 440, "item44_2_3")
+      .addRow(4, 40, "item4", 44, 440, "item44_2_4")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet3.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.OK);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+    ljBatch.setMaxOutputRowCount(6);
+    ljBatch.setUseMemoryManager(false);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == 6);
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      // Release container memory for this output batch since other operators will do the same
+      VectorAccessibleUtilities.clear(ljBatch);
+
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (nonEmptyRightRowSet.rowCount() + nonEmptyRightRowSet3.rowCount() - 6));
+
+
+      // verify results
+      RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet1).verify(actualRowSet2);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      expectedRowSet.clear();
+      expectedRowSet1.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRight_OutputFull_WithPendingLeftRow() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(2, 22, 220, "item22")
+      .addRow(3, 33, 330, "item33")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", 33, 330, "item33")
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    ljBatch.setMaxOutputRowCount(3);
+    ljBatch.setUseMemoryManager(false);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == nonEmptyRightRowSet2.rowCount());
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
+    leftContainer.add(nonEmptyLeftRowSet.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(2, 22, 220, "item22")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", null, null, null)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(4, 40, "item4", null, null, null)
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    ljBatch.setMaxOutputRowCount(3);
+    ljBatch.setUseMemoryManager(false);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == 3);
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      // Release output container memory for this batch as other operators will do
+      VectorAccessibleUtilities.clear(ljBatch);
+
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (nonEmptyLeftRowSet.rowCount() - 3));
+
+      // verify results
+      RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet1).verify(actualRowSet2);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+      expectedRowSet1.clear();
+    }
+  }
+
+  @Test
+  public void testMultipleLeftAndRight_OutputFull_WithPendingLeftRow_LeftJoin() throws Exception {
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyLeftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(5, 50, "item5")
+      .addRow(6, 60, "item6")
+      .build();
+
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(nonEmptyLeftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(1, 11, 110, "item11")
+      .addRow(2, 22, 220, "item22")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(1, 10, "item1", 11, 110, "item11")
+      .addRow(2, 20, "item2", 22, 220, "item22")
+      .addRow(3, 30, "item3", null, null, null)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = fixture.rowSetBuilder(expectedSchemaLeftJoin)
+      .addRow(4, 40, "item4", null, null, null)
+      .addRow(5, 50, "item5", null, null, null)
+      .addRow(6, 60, "item6", null, null, null)
+      .build();
+
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+    rightContainer.add(emptyRightRowSet.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    LateralJoinPOP ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT,
+      DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(ljPopConfig, fixture.getFragmentContext(),
+      leftMockBatch, rightMockBatch);
+
+    ljBatch.setMaxOutputRowCount(3);
+    ljBatch.setUseMemoryManager(false);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() == 3);
+
+      // verify results
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+      // Release output container memory for this batch as other operators will do
+      VectorAccessibleUtilities.clear(ljBatch);
+
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      assertTrue(ljBatch.getRecordCount() ==
+        (nonEmptyLeftRowSet.rowCount() + nonEmptyLeftRowSet2.rowCount() - 3));
+
+      // verify results
+      RowSet actualRowSet2 = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet1).verify(actualRowSet2);
+
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      // Close all the resources for this test case
+      ljBatch.close();
+      leftMockBatch.close();
+      rightMockBatch.close();
+      nonEmptyLeftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+      expectedRowSet.clear();
+      expectedRowSet1.clear();
+    }
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index f8b58b9a940..e5775bbc4ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -25,6 +25,7 @@
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.TestBuilder;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -81,6 +82,7 @@ public void testLateral_WithFilterAndLimitInSubQuery() throws Exception {
   }
 
   @Test
+  @Ignore ("DRILL-6635")
   public void testLateral_WithTopNInSubQuery() throws Exception {
     String Sql = "SELECT customer.c_name, orders.o_id, orders.o_amount " +
       "FROM cp.`lateraljoin/nested-customer.parquet` customer, LATERAL " +
@@ -103,6 +105,7 @@ public void testLateral_WithTopNInSubQuery() throws Exception {
    * subquery. The same query as in above test is executed and same result is expected.
    */
   @Test
+  @Ignore ("DRILL-6635")
   public void testLateral_WithSortAndLimitInSubQuery() throws Exception {
 
     runAndLog("alter session set `planner.enable_topn`=false");
@@ -171,6 +174,7 @@ public void testMultiUnnestAtSameLevel() throws Exception {
   }
 
   @Test
+  @Ignore ("DRILL-6638")
   public void testUnnestWithItem() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -204,6 +208,7 @@ public void testUnnestWithFunctionCall() throws Exception {
   }
 
   @Test
+  @Ignore ("DRILL-6638")
   public void testUnnestWithMap() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -222,6 +227,7 @@ public void testUnnestWithMap() throws Exception {
   }
 
   @Test
+  @Ignore ("DRILL-6638")
   public void testMultiUnnestWithMap() throws Exception {
     String sql = "select u.item from\n" +
         "cp.`lateraljoin/nested-customer.parquet` c," +
@@ -285,6 +291,7 @@ public void testMultipleBatchesLateral_WithLimitInSubQuery() throws Exception {
   }
 
   @Test
+  @Ignore ("DRILL-6635")
   public void testMultipleBatchesLateral_WithTopNInSubQuery() throws Exception {
     String sql = "SELECT customer.c_name, orders.o_orderkey, orders.o_totalprice " +
       "FROM dfs.`lateraljoin/multipleFiles` customer, LATERAL " +
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 222b03678f5..9cb5b6dc12e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -19,7 +19,6 @@
 
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -34,7 +33,6 @@
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Ignore;
 
 import java.nio.file.Paths;
 import java.util.regex.Matcher;
@@ -51,12 +49,6 @@ public static void enableUnnestLateral() throws Exception {
     test("alter session set `planner.enable_unnest_lateral`=true");
   }
 
-  @Test
-  public void testLateralPlan1() throws Exception {
-    int numOutputRecords = testPhysical(getFile("lateraljoin/lateralplan1.json"));
-    assertEquals(numOutputRecords, 12);
-  }
-
   @Test
   public void testLateralSql() throws Exception {
     String Sql = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
@@ -257,13 +249,12 @@ public void testUnnestWithFilter() throws Exception {
   }
 
   @Test
-  @Ignore ()
   public void testUnnestWithAggInSubquery() throws Exception {
-    String Sql = "select t.c_name, t3.items from cp.`lateraljoin/nested-customer.parquet` t," +
+    String Sql = "select t.c_name, sum(t4.items) from cp.`lateraljoin/nested-customer.parquet` t," +
         " lateral (select t2.ord.items as items from unnest(t.orders) t2(ord)) d1," +
-        " lateral (select avg(t3.items.i_number) from unnest(d1.items) t3(items)) where t.c_id > 1";
+        " lateral (select sum(t3.items.i_number) from unnest(d1.items) t3(items)) t4(items) where t.c_id > 1 group by t.c_name";
 
-    String baselineQuery = "select t.c_name, avg(t3.items.i_number) from cp.`lateraljoin/nested-customer.parquet` t " +
+    String baselineQuery = "select t.c_name, sum(t3.items.i_number) from cp.`lateraljoin/nested-customer.parquet` t " +
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group by t.c_name";
 
@@ -273,9 +264,11 @@ public void testUnnestWithAggInSubquery() throws Exception {
     try (ClusterFixture cluster = builder.build();
          ClientFixture client = cluster.clientFixture()) {
       client
-          .queryBuilder()
-          .sql(Sql)
-          .run();
+          .testBuilder()
+          .ordered()
+          .sqlBaselineQuery(baselineQuery)
+          .sqlQuery(Sql)
+          .go();
     }
   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
index 36d004ce32f..3f52351f980 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/MockLateralJoinBatch.java
@@ -111,8 +111,8 @@ public IterOutcome next() {
         IterOutcome outcome;
         // consume all the outout from unnest until EMIT or end of
         // incoming data
-        int unnestCount = 0; // number of values unnested for current record
-        while (recordIndex < incoming.getRecordCount()) {
+        int unnestCount = 0; // number of values unnested by current iteration
+        while (true) {
           outcome = unnest.next();
           if (outcome == IterOutcome.OK_NEW_SCHEMA) {
             // setup schema does nothing (this is just a place holder)
@@ -122,13 +122,11 @@ public IterOutcome next() {
           }
           // We put each batch output from unnest into a hypervector
           // the calling test can match this against the baseline
-          //unnestCount +=
-          //    unnest.getOutgoingContainer().hasRecordCount() ? unnest.getOutgoingContainer().getRecordCount() : 0;
           unnestCount += addBatchToHyperContainer(unnest);
           if (outcome == IterOutcome.EMIT) {
             // reset unnest count
             unnestCount = 0;
-            moveToNextRecord();
+            break;
           }
           // Pretend that an operator somewhere between lateral and unnest
           // wants to terminate processing of the record.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
index c04bff77535..4791829d37d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestCorrectness.java
@@ -29,6 +29,7 @@
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
@@ -83,13 +84,14 @@ public void testUnnestFixedWidthColumn() {
     // Create input schema
     TupleMetadata incomingSchema =
         new SchemaBuilder()
-            .add("rowNumber", TypeProtos.MinorType.INT)
+            .add("otherColumn", TypeProtos.MinorType.INT)
             .addArray("unnestColumn", TypeProtos.MinorType.INT)
             .buildSchema();
     TupleMetadata[] incomingSchemas = { incomingSchema, incomingSchema };
 
     // First batch in baseline is an empty batch corresponding to OK_NEW_SCHEMA
-    Integer[][] baseline = {{}, {1, 2}, {3, 4, 5}, {6, 7, 8, 9}, {10, 11, 12, 13, 14}};
+    Integer[][] baseline = {{}, {}, {1, 1, 2, 2, 2}, {1, 2, 3, 4, 5}, {1, 1, 1, 1, 2, 2, 2, 2, 2}, {6, 7, 8, 9, 10, 11,
+        12, 13, 14}};
 
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
@@ -118,8 +120,11 @@ public void testUnnestVarWidthColumn() {
     TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema};
 
     // First batch in baseline is an empty batch corresponding to OK_NEW_SCHEMA
-    String[][] baseline = {{}, {"", "zero"}, {"one", "two", "three"}, {"four", "five", "six", "seven"},
-        {"eight", "nine", "ten", "eleven", "twelve"}};
+    Object[][] baseline = {
+        {}, {},
+        {1, 1, 2, 2, 2}, {"", "zero", "one", "two", "three"},
+        { 1, 1, 1, 1, 2, 2, 2, 2, 2}, {"four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve"}
+    };
 
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
@@ -173,7 +178,7 @@ public void testUnnestEmptyList() {
 
     // First batch in baseline is an empty batch corresponding to OK_NEW_SCHEMA
     // All subsequent batches are also empty
-    String[][] baseline = {{}, {}, {}, {}, {}};
+    String[][] baseline = {{}, {}, {}, {}, {}, {}};
 
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
@@ -210,7 +215,13 @@ public void testUnnestMultipleNewSchemaIncoming() {
     TupleMetadata[] incomingSchemas = {incomingSchema, incomingSchema, incomingSchema};
 
     // First batch in baseline is an empty batch corresponding to OK_NEW_SCHEMA
-    String[][] baseline = {{}, {"0", "1"}, {"2", "3", "4"}, {"5", "6" }, {"9"} };
+    Object[][] baseline = {
+        {}, {},
+        {1, 1, 2, 2, 2}, {"0", "1", "2", "3", "4"},
+        {1, 1},
+        {"5", "6" },
+        {1}, {"9"}
+    };
 
     RecordBatch.IterOutcome[] iterOutcomes = {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA,
@@ -252,7 +263,13 @@ public void testUnnestSchemaChange() {
 
     // First batch in baseline is an empty batch corresponding to OK_NEW_SCHEMA
     // Another empty batch introduced by the schema change in the last batch
-    Object[][] baseline = {{}, {"0", "1"}, {"2", "3", "4"}, {"5", "6" }, {}, {9} };
+    Object[][] baseline = {
+        {}, {},
+        {1, 1, 2, 2, 2}, {"0", "1", "2", "3", "4"},
+        {1, 1}, {"5", "6" },
+        {}, {},
+        {1}, {9}
+    };
 
     RecordBatch.IterOutcome[] iterOutcomes = {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA,
@@ -272,7 +289,7 @@ public void testUnnestLimitBatchSize() {
 
     final int limitedOutputBatchSize = 1023; // one less than the power of two. See RecordBatchMemoryManager
                                              // .adjustOutputRowCount
-    final int limitedOutputBatchSizeBytes = 1024*4*1; // (num rows+1) * size of int
+    final int limitedOutputBatchSizeBytes = 1024*4*2; // (num rows+1) * size of int * num of columns (rowId, unnest_col)
     final int inputBatchSize = 1023+1;
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
@@ -286,18 +303,23 @@ public void testUnnestLimitBatchSize() {
         }
       }
     }
-    Integer[][] baseline = new Integer[3][];
+    Integer[][] baseline = new Integer[6][];
     baseline[0] = new Integer[] {};
-    baseline[1] = new Integer[limitedOutputBatchSize];
-    baseline[2] = new Integer[1];
+    baseline[1] = new Integer[] {};
+    baseline[2] = new Integer[limitedOutputBatchSize];
+    baseline[3] = new Integer[limitedOutputBatchSize];
+    baseline[4] = new Integer[1];
+    baseline[5] = new Integer[1];
     for (int i = 0; i < limitedOutputBatchSize; i++) {
-      baseline[1][i] = i;
+      baseline[2][i] = 1;
+      baseline[3][i] = i;
     }
-    baseline[2][0] = limitedOutputBatchSize;
+    baseline[4][0] = 1;
+    baseline[5][0] = limitedOutputBatchSize;
 
     // Create input schema
     TupleMetadata incomingSchema = new SchemaBuilder()
-        .add("rowNumber", TypeProtos.MinorType.INT)
+        .add("otherColumn", TypeProtos.MinorType.INT)
         .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
 
     TupleMetadata[] incomingSchemas = {incomingSchema};
@@ -328,7 +350,7 @@ public void testUnnestKillFromLimitSubquery1() {
 
     final int limitedOutputBatchSize = 1023; // one less than the power of two. See RecordBatchMemoryManager
                                              // .adjustOutputRowCount
-    final int limitedOutputBatchSizeBytes = 1024*4*1; // (num rows+1) * size of int
+    final int limitedOutputBatchSizeBytes = 1024*4*2; // (num rows+1) * size of int * num of columns (rowId, unnest_col)
     final int inputBatchSize = 1023+1;
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
@@ -342,18 +364,23 @@ public void testUnnestKillFromLimitSubquery1() {
         }
       }
     }
-    Integer[][] baseline = new Integer[3][];
+    Integer[][] baseline = new Integer[6][];
     baseline[0] = new Integer[] {};
-    baseline[1] = new Integer[limitedOutputBatchSize];
-    baseline[2] = new Integer[1];
+    baseline[1] = new Integer[] {};
+    baseline[2] = new Integer[limitedOutputBatchSize];
+    baseline[3] = new Integer[limitedOutputBatchSize];
+    baseline[4] = new Integer[1];
+    baseline[5] = new Integer[1];
     for (int i = 0; i < limitedOutputBatchSize; i++) {
-      baseline[1][i] = i;
+      baseline[2][i] = 1;
+      baseline[3][i] = i;
     }
-    baseline[2] = new Integer[] {}; // because of kill the next batch is an empty batch
+    baseline[4] = new Integer[] {}; // because of kill the next batch is an empty batch
+    baseline[5] = new Integer[] {}; // because of kill the next batch is an empty batch
 
     // Create input schema
     TupleMetadata incomingSchema = new SchemaBuilder()
-        .add("rowNumber", TypeProtos.MinorType.INT)
+        .add("otherColumn", TypeProtos.MinorType.INT)
         .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
 
     TupleMetadata[] incomingSchemas = {incomingSchema};
@@ -384,7 +411,7 @@ public void testUnnestKillFromLimitSubquery2() {
     // similar to previous test but the size of the array fits exactly into the record batch;
     final int limitedOutputBatchSize = 1023; // one less than the power of two. See RecordBatchMemoryManager
                                              // .adjustOutputRowCount
-    final int limitedOutputBatchSizeBytes = 1024*4*1; // (num rows+1) * size of int
+    final int limitedOutputBatchSizeBytes = 1024*4*2; // (num rows+1) * size of int * num of columns (rowId, unnest_col)
     final int inputBatchSize = 1023;
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
@@ -398,11 +425,14 @@ public void testUnnestKillFromLimitSubquery2() {
         }
       }
     }
-    Integer[][] baseline = new Integer[2][];
+    Integer[][] baseline = new Integer[4][];
     baseline[0] = new Integer[] {};
-    baseline[1] = new Integer[limitedOutputBatchSize];
+    baseline[1] = new Integer[] {};
+    baseline[2] = new Integer[limitedOutputBatchSize];
+    baseline[3] = new Integer[limitedOutputBatchSize];
     for (int i = 0; i < limitedOutputBatchSize; i++) {
-      baseline[1][i] = i;
+      baseline[2][i] = 1;
+      baseline[3][i] = i;
     }
 
     // Create input schema
@@ -501,7 +531,7 @@ public void testUnnestNonArrayColumn() {
     }
 
     // Get the unnest POPConfig
-    final UnnestPOP unnestPopConfig = new UnnestPOP(null, new SchemaPath(new PathSegment.NameSegment("unnestColumn")));
+    final UnnestPOP unnestPopConfig = new UnnestPOP(null, new SchemaPath(new PathSegment.NameSegment("unnestColumn")), DrillUnnestRelBase.IMPLICIT_COLUMN);
 
     // Get the IterOutcomes for LJ
     final List<RecordBatch.IterOutcome> outcomes = new ArrayList<>(iterOutcomes.length);
@@ -661,16 +691,16 @@ private TupleMetadata getRepeatedMapSchema() {
 
     Object[][] d = {
         new Object[] {},    // Empty record batch returned by OK_NEW_SCHEMA
-        new Object[] {},    // First incoming batch is empty
+        new Object[] {},    // Empty record batch returned by OK_NEW_SCHEMA
+        new Object[] {2, 2, 3, 3, 3}, // rowId 1 has no corresponding rows in the unnest array
         new Object[] {
             "{\"colA\":11,\"colB\":[\"1.1.1\",\"1.1.2\"]}",
-            "{\"colA\":12,\"colB\":[\"1.2.1\",\"1.2.2\"]}"
-        },
-        new Object[] {
+            "{\"colA\":12,\"colB\":[\"1.2.1\",\"1.2.2\"]}",
             "{\"colA\":21,\"colB\":[\"2.1.1\",\"2.1.2\"]}",
             "{\"colA\":22,\"colB\":[]}",
             "{\"colA\":23,\"colB\":[\"2.3.1\",\"2.3.2\"]}"
         },
+        new Object[] {1, 1},
         new Object[] {
             "{\"colA\":31,\"colB\":[\"3.1.1\",\"3.1.2\"]}",
             "{\"colA\":32,\"colB\":[\"3.2.1\",\"3.2.2\"]}"
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index c2e64f4d47b..7a1058cbe1b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -35,6 +35,8 @@
 import org.apache.drill.exec.physical.impl.join.LateralJoinBatch;
 import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
+import org.apache.drill.exec.planner.common.DrillUnnestRelBase;
 import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
@@ -70,7 +72,7 @@
 
   @BeforeClass public static void setUpBeforeClass() throws Exception {
     mockPopConfig = new MockStorePOP(null);
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
     operatorContext = fixture.newOperatorContext(mockPopConfig);
   }
 
@@ -350,7 +352,7 @@ public void testUnnestLimitBatchSize_WithExcludedCols() {
     LateralJoinPOP previoudPop = ljPopConfig;
     List<SchemaPath> excludedCols = new ArrayList<>();
     excludedCols.add(SchemaPath.getSimplePath("unnestColumn"));
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, excludedCols);
     final int limitedOutputBatchSize = 127;
     final int inputBatchSize = limitedOutputBatchSize + 1;
     // Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to
@@ -569,7 +571,7 @@ public void testUnnestNonArrayColumn() {
     }
 
     // Get the unnest POPConfig
-    final UnnestPOP unnestPopConfig = new UnnestPOP(null, SchemaPath.getCompoundPath("unnestColumn"));
+    final UnnestPOP unnestPopConfig = new UnnestPOP(null, SchemaPath.getCompoundPath("unnestColumn"), DrillUnnestRelBase.IMPLICIT_COLUMN);
 
     // Get the IterOutcomes for LJ
     final List<RecordBatch.IterOutcome> outcomes = new ArrayList<>(iterOutcomes.length);
@@ -588,7 +590,8 @@ public void testUnnestNonArrayColumn() {
 
     // project is required to rename the columns so as to disambiguate the same column name from
     // unnest operator and the regular scan.
-    final Project projectPopConfig = new Project(DrillLogicalTestutils.parseExprs("unnestColumn", "unnestColumn1"), null);
+    final Project projectPopConfig = new Project(DrillLogicalTestutils.parseExprs("unnestColumn", "unnestColumn1",
+      unnestPopConfig.getImplicitColumn(), unnestPopConfig.getImplicitColumn()), null);
 
     final ProjectRecordBatch projectBatch =
         new ProjectRecordBatch( projectPopConfig, unnestBatch, fixture.getFragmentContext());
@@ -874,8 +877,8 @@ private boolean isTerminal(RecordBatch.IterOutcome outcome) {
     }
 
     // Get the unnest POPConfig
-    final UnnestPOP unnestPopConfig1 = new UnnestPOP(null, SchemaPath.getSimplePath("unnestColumn"));
-    final UnnestPOP unnestPopConfig2 = new UnnestPOP(null, SchemaPath.getSimplePath("colB"));
+    final UnnestPOP unnestPopConfig1 = new UnnestPOP(null, SchemaPath.getSimplePath("unnestColumn"), DrillUnnestRelBase.IMPLICIT_COLUMN);
+    final UnnestPOP unnestPopConfig2 = new UnnestPOP(null, SchemaPath.getSimplePath("colB"), DrillUnnestRelBase.IMPLICIT_COLUMN);
 
     // Get the IterOutcomes for LJ
     final List<RecordBatch.IterOutcome> outcomes = new ArrayList<>(iterOutcomes.length);
@@ -896,16 +899,18 @@ private boolean isTerminal(RecordBatch.IterOutcome outcome) {
 
     // Create intermediate Project
     final Project projectPopConfig1 =
-        new Project(DrillLogicalTestutils.parseExprs("unnestColumn.colB", "colB"), unnestPopConfig1);
+        new Project(DrillLogicalTestutils.parseExprs("unnestColumn.colB", "colB",
+          unnestPopConfig1.getImplicitColumn(), unnestPopConfig1.getImplicitColumn()), unnestPopConfig1);
     final ProjectRecordBatch projectBatch1 =
         new ProjectRecordBatch(projectPopConfig1, unnestBatch1, fixture.getFragmentContext());
     final Project projectPopConfig2 =
-        new Project(DrillLogicalTestutils.parseExprs("colB", "unnestColumn2"), unnestPopConfig2);
+        new Project(DrillLogicalTestutils.parseExprs("colB", "unnestColumn2",
+          unnestPopConfig2.getImplicitColumn(), unnestPopConfig2.getImplicitColumn()), unnestPopConfig2);
     final ProjectRecordBatch projectBatch2 =
         new ProjectRecordBatch(projectPopConfig2, unnestBatch2, fixture.getFragmentContext());
 
-    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER, Lists.newArrayList());
-    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER, Lists.newArrayList());
+    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
+    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER, DrillLateralJoinRelBase.IMPLICIT_COLUMN, Lists.newArrayList());
 
     final LateralJoinBatch lateralJoinBatch2 =
         new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), projectBatch1, projectBatch2);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services