You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/02 22:52:11 UTC

[drill] 03/03: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 069c3049f1a500e5ae0b47caeebc5856ab182b73
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Fri Jun 29 10:27:55 2018 -0700

    DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
    
    This closes #1356
---
 .../exec/physical/impl/join/HashJoinBatch.java     |   7 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |  71 +++++++--
 .../exec/physical/impl/join/MergeJoinBatch.java    |   3 +-
 .../drill/exec/record/JoinBatchMemoryManager.java  |  19 ++-
 .../impl/join/TestLateralJoinCorrectness.java      | 160 ++++++++++++++++++++-
 .../unnest/TestUnnestWithLateralCorrectness.java   | 104 +++++++++-----
 6 files changed, 310 insertions(+), 54 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 047c597..345d182 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -67,9 +68,6 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
-
 /**
  *   This class implements the runtime execution for the Hash-Join operator
  *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -892,7 +890,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
       configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
 
-    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
+    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
+    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 84dc5c3..fc3c8b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -34,10 +36,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.HashSet;
+import java.util.List;
+
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
   private boolean isNewLeftBatch = false;
 
+  private final HashSet<String> excludedFieldNames = new HashSet<>();
+
   /* ****************************************************************************************************************
    * Public Methods
    * ****************************************************************************************************************/
@@ -91,7 +99,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
     final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
+    // Prepare Schema Path Mapping
+    populateExcludedField(popConfig);
+    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);
 
     // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
     // row count
@@ -700,6 +710,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     return isValid;
   }
 
+  private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
+    if (excludedFieldNames.size() == 0) {
+      return originSchema;
+    }
+
+    final SchemaBuilder newSchemaBuilder =
+      BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
+    for (MaterializedField field : originSchema) {
+      if (!excludedFieldNames.contains(field.getName())) {
+        newSchemaBuilder.addField(field);
+      }
+    }
+    return newSchemaBuilder.build();
+  }
+
   /**
    * Helps to create the outgoing container vectors based on known left and right batch schemas
    * @throws SchemaChangeException
@@ -711,8 +736,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // Clear up the container
     container.clear();
-    leftSchema = left.getSchema();
-    rightSchema = right.getSchema();
+    leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
+    rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());
 
     if (!verifyInputSchema(leftSchema)) {
       throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -724,12 +749,20 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
 
     // Setup LeftSchema in outgoing container
     for (final VectorWrapper<?> vectorWrapper : left) {
-      container.addOrGet(vectorWrapper.getField());
+      final MaterializedField leftField = vectorWrapper.getField();
+      if (excludedFieldNames.contains(leftField.getName())) {
+        continue;
+      }
+      container.addOrGet(leftField);
     }
 
     // Setup RightSchema in the outgoing container
     for (final VectorWrapper<?> vectorWrapper : right) {
       MaterializedField rightField = vectorWrapper.getField();
+      if (excludedFieldNames.contains(rightField.getName())) {
+        continue;
+      }
+
       TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
 
       // make right input schema optional if we have LEFT join
@@ -846,15 +879,28 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
     // Get the vectors using field index rather than Materialized field since input batch field can be different from
     // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
     // container.
+    int inputIndex = 0;
     for (int i = startVectorIndex; i < endVectorIndex; ++i) {
-      // Get input vector
-      final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass();
-      final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector();
-
       // Get output vector
       final int outputVectorIndex = i + baseVectorIndex;
       final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
       final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
+      final String outputFieldName = outputVector.getField().getName();
+
+      ValueVector inputVector;
+      Class<?> inputValueClass;
+      String inputFieldName;
+      do {
+        // Get input vector
+        inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
+        inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
+        inputFieldName = inputVector.getField().getName();
+        ++inputIndex;
+      } while (excludedFieldNames.contains(inputFieldName));
+
+      Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
+        new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
+          ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));
 
       logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
           "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
@@ -938,4 +984,13 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
       maxOutputRowCount = newOutputRowCount;
     }
   }
+
+  private void populateExcludedField(PhysicalOperator lateralPop) {
+    final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
+    if (excludedCols != null) {
+      for (SchemaPath currentPath : excludedCols) {
+        excludedFieldNames.add(currentPath.rootName());
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 62967a9..ea34ed9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -62,6 +62,7 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 
 import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
@@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
   private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
 
     MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
-      super(outputBatchSize, leftBatch, rightBatch);
+      super(outputBatchSize, leftBatch, rightBatch, new HashSet<>());
     }
 
     /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 2ebe887..4344e13 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -17,29 +17,44 @@
  */
 package org.apache.drill.exec.record;
 
+import java.util.Set;
+
 public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
 
   private int rowWidth[];
   private RecordBatch recordBatch[];
+  private Set<String> columnsToExclude;
 
   private static final int numInputs = 2;
   public static final int LEFT_INDEX = 0;
   public static final int RIGHT_INDEX = 1;
 
-  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
+  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
+                                RecordBatch rightBatch, Set<String> excludedColumns) {
     super(numInputs, outputBatchSize);
     recordBatch = new RecordBatch[numInputs];
     recordBatch[LEFT_INDEX] = leftBatch;
     recordBatch[RIGHT_INDEX] = rightBatch;
     rowWidth = new int[numInputs];
+    this.columnsToExclude = excludedColumns;
   }
 
   private int updateInternal(int inputIndex, int outputPosition,  boolean useAggregate) {
     updateIncomingStats(inputIndex);
     rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
 
-    final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
+    // Reduce the width of excluded columns from actual rowWidth
+    for (String columnName : columnsToExclude) {
+      final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
+      if (currentColSizer == null) {
+        continue;
+      }
+      rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
+    }
+
+    // Get final net outgoing row width after reducing the excluded columns width
+    int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
 
     // If outgoing row width is 0 or there is no change in outgoing row width, just return.
     // This is possible for empty batches or
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 2723e30..ffac4b6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -21,6 +21,7 @@ import avro.shaded.com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -29,11 +30,13 @@ import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -2870,4 +2873,159 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       leftRowSet2.clear();
     }
   }
+
+  private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left,
+                                   CloseableRecordBatch right, RowSet expectedRowSet) throws Exception {
+    LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      ljBatch.close();
+      left.close();
+      right.close();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception {
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(2, 20, "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 41, "item41")
+      .addRow(5, 51, "item51")
+      .build();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, "item1", 1, 11, "item11")
+      .addRow(1, "item1", 2, 21, "item21")
+      .addRow(1, "item1", 3, 31, "item31")
+      .addRow(2, "item20", 4, 41, "item41")
+      .addRow(2, "item20", 5, 51, "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+
+    try {
+      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+    } finally {
+      // Close all the resources for this test case
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
+
+  @Test
+  public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception {
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(2, 20, "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 41, "item41")
+      .addRow(5, 51, "item51")
+      .build();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      //.add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      /*.addRow("item1", 1, 11, "item11")
+      .addRow("item1", 2, 21, "item21")
+      .addRow("item1", 3, 31, "item31")
+      .addRow("item20", 4, 41, "item41")
+      .addRow("item20", 5, 51, "item51") */
+      .addRow("item1", 11, "item11")
+      .addRow("item1", 21, "item21")
+      .addRow("item1", 31, "item31")
+      .addRow("item20", 41, "item41")
+      .addRow("item20", 51, "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+    excludedCols.add(SchemaPath.getSimplePath("id_left"));
+    excludedCols.add(SchemaPath.getSimplePath("id_right"));
+
+    try {
+      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+    } finally {
+      // Close all the resources for this test case
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 3a7f899..c2e64f4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -37,8 +37,8 @@ import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarCharVector;
@@ -106,7 +106,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -140,7 +140,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -161,7 +161,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -192,7 +192,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -240,7 +240,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -289,28 +289,15 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
 
   }
 
-  @Test
-  public void testUnnestLimitBatchSize() {
-
-    final int limitedOutputBatchSize = 127;
-    final int inputBatchSize = limitedOutputBatchSize + 1;
-    // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
-    //  Lateral output batch size =  N * input row size + N * size of single unnest column
-    //                            =  N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
-    //                              + N * 4
-    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
-    //                            = N * (16 + 4N) + N * 4
-    //                            = 4N * (N + 5)
-    // configure the output batch size to be one more record than that so that the batch sizer can round down
-    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
-
+  private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
+                                     int limitOutputBatchSizeBytes, boolean excludeUnnestColumn) {
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -323,39 +310,76 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
         }
       }
     }
+
     Integer[][][] baseline = new Integer[2][2][];
-    baseline[0][0] = new Integer[limitedOutputBatchSize];
-    baseline[0][1] = new Integer[limitedOutputBatchSize];
+    baseline[0][0] = new Integer[limitOutputBatchSize];
+    baseline[0][1] = new Integer[limitOutputBatchSize];
     baseline[1][0] = new Integer[1];
     baseline[1][1] = new Integer[1];
-    for (int i = 0; i < limitedOutputBatchSize; i++) {
+    for (int i = 0; i < limitOutputBatchSize; i++) {
       baseline[0][0][i] = 1;
       baseline[0][1][i] = i;
     }
     baseline[1][0][0] = 1; // row Num
-    baseline[1][1][0] = limitedOutputBatchSize; // value
+    baseline[1][1][0] = limitOutputBatchSize; // value
 
     // Create input schema
     TupleMetadata incomingSchema = new SchemaBuilder()
-        .add("rowNumber", TypeProtos.MinorType.INT)
-        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+      .add("rowNumber", TypeProtos.MinorType.INT)
+      .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
 
     TupleMetadata[] incomingSchemas = {incomingSchema};
 
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
 
     final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
-        .OUTPUT_BATCH_SIZE_VALIDATOR);
-    fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
+      .OUTPUT_BATCH_SIZE_VALIDATOR);
+    fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, excludeUnnestColumn);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
       fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
     }
+  }
 
+  @Test
+  public void testUnnestLimitBatchSize_WithExcludedCols() {
+    LateralJoinPOP previoudPop = ljPopConfig;
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("unnestColumn"));
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to
+    // 64. So we should configure batch size for (N+1) rows if we want to output N rows where N is not power of 2
+    // size of lateral output batch = (N+1)*8 bytes, where N = output batch row count
+    //  Lateral output batch size = (N+1) * (input row size without unnest field) + (N+1) * size of single unnest column
+    //                            = (N+1) * (size of row id) + (N+1) * (size of single array entry)
+    //                            = (N+1)*4 + (N+1) * 4
+    //                            = (N+1) * 8
+    // configure the output batch size to be one more record than that so that the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 8 * (limitedOutputBatchSize + 1);
+    testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, true);
+    ljPopConfig = previoudPop;
+  }
+
+  @Test
+  public void testUnnestLimitBatchSize() {
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
+    //  Lateral output batch size =  N * input row size + N * size of single unnest column
+    //                            =  N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
+    //                              + N * 4
+    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
+    //                            = N * (16 + 4N) + N * 4
+    //                            = 4N * (N + 5)
+    // configure the output batch size to be one more record than that so that the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
+    testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, false);
   }
 
   @Test
@@ -405,7 +429,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
@@ -463,7 +487,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
@@ -496,7 +520,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (UserException|UnsupportedOperationException e) {
       return; // succeeded
     } catch (Exception e) {
@@ -511,8 +535,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       TupleMetadata[] incomingSchemas,
       RecordBatch.IterOutcome[] iterOutcomes,
       T[][] data,
-      T[][][] baseline ) throws Exception{
-    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline);
+      T[][][] baseline,
+      boolean excludeUnnestColumn) throws Exception{
+    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline, excludeUnnestColumn);
   }
 
   // test unnest for various input conditions optionally invoking kill. if the kill or killBatch
@@ -522,7 +547,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       int unnestLimit, // kill unnest after every 'unnestLimit' number of values in every record
       int execKill, // number of batches after which to kill the execution (!)
       T[][] data,
-      T[][][] baseline) throws Exception {
+      T[][][] baseline,
+      boolean excludeUnnestColumn) throws Exception {
 
     // Get the incoming container with dummy data for LJ
     final List<VectorContainer> incomingContainer = new ArrayList<>(data.length);
@@ -606,7 +632,9 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
       //int valueIndex = 0;
       for ( List<ValueVector> batch: results) {
         int vectorCount= batch.size();
-        if (vectorCount!= baseline[batchIndex].length+1) { // baseline does not include the original unnest column
+        int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
+        expectedVectorCount += baseline[batchIndex].length;
+        if (vectorCount!= expectedVectorCount) { // baseline does not include the original unnest column
           fail("Test failed in validating unnest output. Batch column count mismatch.");
         }
         for (ValueVector vv : batch) {