You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/02 22:52:25 UTC

[GitHub] parthchandra closed pull request #1356: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules

parthchandra closed pull request #1356: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
URL: https://github.com/apache/drill/pull/1356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index a12fed1267e..55ede962826 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -34,6 +35,9 @@
 public class LateralJoinPOP extends AbstractJoinPop {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
 
+  @JsonProperty("excludedColumns")
+  private List<SchemaPath> excludedColumns;
+
   @JsonProperty("unnestForLateralJoin")
   private UnnestPOP unnestForLateralJoin;
 
@@ -41,19 +45,21 @@
   public LateralJoinPOP(
       @JsonProperty("left") PhysicalOperator left,
       @JsonProperty("right") PhysicalOperator right,
-      @JsonProperty("joinType") JoinRelType joinType) {
+      @JsonProperty("joinType") JoinRelType joinType,
+      @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
     super(left, right, joinType, null, null);
     Preconditions.checkArgument(joinType != JoinRelType.FULL,
       "Full outer join is currently not supported with Lateral Join");
     Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
       "Right join is currently not supported with Lateral Join");
+    this.excludedColumns = excludedColumns;
   }
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.size() == 2,
       "Lateral join should have two physical operators");
-    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), children.get(1), joinType);
+    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns);
     newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
     return newPOP;
   }
@@ -63,6 +69,11 @@ public UnnestPOP getUnnestForLateralJoin() {
     return this.unnestForLateralJoin;
   }
 
+  @JsonProperty("excludedColumns")
+  public List<SchemaPath> getExcludedColumns() {
+    return this.excludedColumns;
+  }
+
   public void setUnnestForLateralJoin(UnnestPOP unnest) {
     this.unnestForLateralJoin = unnest;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 428a47ebf33..63ac6ef90b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -67,9 +68,6 @@
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 import org.apache.calcite.rel.core.JoinRelType;
 
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
-
 /**
  *   This class implements the runtime execution for the Hash-Join operator
  *   supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -887,7 +885,7 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
 
     // get the output batch size from config.
     int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
+    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
     logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 578cbc8742d..3dc7b6a7dc3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -20,6 +20,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.ExecConstants;
@@ -27,6 +28,7 @@
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
@@ -34,10 +36,14 @@
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.SchemaBuilder;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.HashSet;
+import java.util.List;
+
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -82,6 +88,8 @@
   // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
   private boolean isNewLeftBatch = false;
 
+  private final HashSet<String> excludedFieldNames = new HashSet<>();
+
   /* ****************************************************************************************************************
    * Public Methods
    * ****************************************************************************************************************/
@@ -91,7 +99,9 @@ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
     Preconditions.checkNotNull(left);
     Preconditions.checkNotNull(right);
     final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
+    // Prepare Schema Path Mapping
+    populateExcludedField(popConfig);
+    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);
 
     // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
     // row count
@@ -674,6 +684,21 @@ private boolean verifyInputSchema(BatchSchema schema) {
     return isValid;
   }
 
+  private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
+    if (excludedFieldNames.size() == 0) {
+      return originSchema;
+    }
+
+    final SchemaBuilder newSchemaBuilder =
+      BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
+    for (MaterializedField field : originSchema) {
+      if (!excludedFieldNames.contains(field.getName())) {
+        newSchemaBuilder.addField(field);
+      }
+    }
+    return newSchemaBuilder.build();
+  }
+
   /**
    * Helps to create the outgoing container vectors based on known left and right batch schemas
    * @throws SchemaChangeException
@@ -685,8 +710,8 @@ private void setupNewSchema() throws SchemaChangeException {
 
     // Clear up the container
     container.clear();
-    leftSchema = left.getSchema();
-    rightSchema = right.getSchema();
+    leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
+    rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());
 
     if (!verifyInputSchema(leftSchema)) {
       throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -698,12 +723,20 @@ private void setupNewSchema() throws SchemaChangeException {
 
     // Setup LeftSchema in outgoing container
     for (final VectorWrapper<?> vectorWrapper : left) {
-      container.addOrGet(vectorWrapper.getField());
+      final MaterializedField leftField = vectorWrapper.getField();
+      if (excludedFieldNames.contains(leftField.getName())) {
+        continue;
+      }
+      container.addOrGet(leftField);
     }
 
     // Setup RightSchema in the outgoing container
     for (final VectorWrapper<?> vectorWrapper : right) {
       MaterializedField rightField = vectorWrapper.getField();
+      if (excludedFieldNames.contains(rightField.getName())) {
+        continue;
+      }
+
       TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
 
       // make right input schema optional if we have LEFT join
@@ -817,15 +850,28 @@ private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBat
     // Get the vectors using field index rather than Materialized field since input batch field can be different from
     // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
     // container.
+    int inputIndex = 0;
     for (int i = startVectorIndex; i < endVectorIndex; ++i) {
-      // Get input vector
-      final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass();
-      final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector();
-
       // Get output vector
       final int outputVectorIndex = i + baseVectorIndex;
       final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
       final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
+      final String outputFieldName = outputVector.getField().getName();
+
+      ValueVector inputVector;
+      Class<?> inputValueClass;
+      String inputFieldName;
+      do {
+        // Get input vector
+        inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
+        inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
+        inputFieldName = inputVector.getField().getName();
+        ++inputIndex;
+      } while (excludedFieldNames.contains(inputFieldName));
+
+      Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
+        new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
+          ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));
 
       logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
           "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
@@ -909,4 +955,13 @@ private void updateMemoryManager(int inputIndex) {
       maxOutputRowCount = newOutputRowCount;
     }
   }
+
+  private void populateExcludedField(PhysicalOperator lateralPop) {
+    final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
+    if (excludedCols != null) {
+      for (SchemaPath currentPath : excludedCols) {
+        excludedFieldNames.add(currentPath.rootName());
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 62967a9fa1a..ea34ed930e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -62,6 +62,7 @@
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 
 import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
@@ -108,7 +109,7 @@
   private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
 
     MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
-      super(outputBatchSize, leftBatch, rightBatch);
+      super(outputBatchSize, leftBatch, rightBatch, new HashSet<>());
     }
 
     /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index c8bb2a4f56c..519d5036e72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -37,6 +37,8 @@
 import org.apache.drill.exec.planner.logical.DrillJoinRule;
 import org.apache.drill.exec.planner.logical.DrillLimitRule;
 import org.apache.drill.exec.planner.logical.DrillMergeProjectRule;
+import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule;
+import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule;
 import org.apache.drill.exec.planner.logical.DrillProjectRule;
 import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule;
 import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule;
@@ -287,7 +289,8 @@ static RuleSet getDrillUserConfigurableLogicalRules(OptimizerRulesContext optimi
       // Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner
       // RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE,
       DrillFilterAggregateTransposeRule.INSTANCE,
-
+      DrillProjectLateralJoinTransposeRule.INSTANCE,
+      DrillProjectPushIntoLateralJoinRule.INSTANCE,
       RuleInstance.FILTER_MERGE_RULE,
       RuleInstance.FILTER_CORRELATE_RULE,
       RuleInstance.AGGREGATE_REMOVE_RULE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
index a7bbbca9278..28e5246b0e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.planner.common;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -25,17 +27,27 @@
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 
+import java.util.ArrayList;
+import java.util.List;
+
 
 public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {
-  public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
-                                 CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
+
+  final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST;
+  final public boolean excludeCorrelateColumn;
+  public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
+                               CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
     super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+    this.excludeCorrelateColumn = excludeCorrelateCol;
   }
 
   @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
@@ -49,7 +61,53 @@ public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNod
     double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth;
 
     double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
-    double memCost = 0;
+    double memCost = !excludeCorrelateColumn ? CORRELATE_MEM_COPY_COST : 0.0;
     return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost);
   }
+
+  @Override
+  protected RelDataType deriveRowType() {
+    switch (joinType) {
+      case LEFT:
+      case INNER:
+        return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
+          right.getRowType(), joinType.toJoinType(),
+          getCluster().getTypeFactory(), null,
+          ImmutableList.<RelDataTypeField>of()));
+      case ANTI:
+      case SEMI:
+        return constructRowType(left.getRowType());
+      default:
+        throw new IllegalStateException("Unknown join type " + joinType);
+    }
+  }
+
+  public int getInputSize(int offset, RelNode input) {
+    if (this.excludeCorrelateColumn &&
+      offset == 0) {
+      return input.getRowType().getFieldList().size() - 1;
+    }
+    return input.getRowType().getFieldList().size();
+  }
+
+  public RelDataType constructRowType(RelDataType inputRowType) {
+    Preconditions.checkArgument(this.requiredColumns.cardinality() == 1);
+
+    List<RelDataType> fields = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
+    if (excludeCorrelateColumn) {
+      int corrVariable = this.requiredColumns.nextSetBit(0);
+
+      for (RelDataTypeField field : inputRowType.getFieldList()) {
+        if (field.getIndex() == corrVariable) {
+          continue;
+        }
+        fieldNames.add(field.getName());
+        fields.add(field.getType());
+      }
+
+      return getCluster().getTypeFactory().createStructType(fields, fieldNames);
+    }
+    return inputRowType;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 36d7db296c1..9dd5032b1e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -18,9 +18,12 @@
 package org.apache.drill.exec.planner.common;
 
 import java.util.AbstractList;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
@@ -29,6 +32,7 @@
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
@@ -282,4 +286,70 @@ public Void visitCall(RexCall call) {
     }
     return false;
   }
+
+  /**
+   * InputRefVisitor is a utility class used to collect all the RexInputRef nodes in a
+   * RexNode.
+   *
+   */
+  public static class InputRefVisitor extends RexVisitorImpl<Void> {
+    private final List<RexInputRef> inputRefList;
+
+    public InputRefVisitor() {
+      super(true);
+      inputRefList = new ArrayList<>();
+    }
+
+    public Void visitInputRef(RexInputRef ref) {
+      inputRefList.add(ref);
+      return null;
+    }
+
+    public Void visitCall(RexCall call) {
+      for (RexNode operand : call.operands) {
+        operand.accept(this);
+      }
+      return null;
+    }
+
+    public List<RexInputRef> getInputRefs() {
+      return inputRefList;
+    }
+  }
+
+
+  /**
+   * RexFieldsTransformer is a utility class used to convert column refs in a RexNode
+   * based on inputRefMap (input to output ref map).
+   *
+   * This transformer can be used to find and replace the existing inputRef in a RexNode with a new inputRef.
+   */
+  public static class RexFieldsTransformer {
+    private final RexBuilder rexBuilder;
+    private final Map<Integer, Integer> inputRefMap;
+
+    public RexFieldsTransformer(
+      RexBuilder rexBuilder,
+      Map<Integer, Integer> inputRefMap) {
+      this.rexBuilder = rexBuilder;
+      this.inputRefMap = inputRefMap;
+    }
+
+    public RexNode go(RexNode rex) {
+      if (rex instanceof RexCall) {
+        ImmutableList.Builder<RexNode> builder = ImmutableList.builder();
+        final RexCall call = (RexCall) rex;
+        for (RexNode operand : call.operands) {
+          builder.add(go(operand));
+        }
+        return call.clone(call.getType(), builder.build());
+      } else if (rex instanceof RexInputRef) {
+        RexInputRef var = (RexInputRef) rex;
+        int index = var.getIndex();
+        return rexBuilder.makeInputRef(var.getType(), inputRefMap.get(index));
+      } else {
+        return rex;
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
index 52e603f3f88..9f91818be7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
@@ -46,7 +46,7 @@ public void onMatch(RelOptRuleCall call) {
 
     final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
     DrillLateralJoinRel lateralJoinRel = new DrillLateralJoinRel(correlate.getCluster(),
-        traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
+        traits, convertedLeft, convertedRight, false, correlate.getCorrelationId(),
         correlate.getRequiredColumns(), correlate.getJoinType());
     call.transformTo(lateralJoinRel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 035dae9bb9c..aa6ccb051ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -33,16 +33,16 @@
 
 public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements DrillRel {
 
-  protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+  protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean includeCorrelateVar,
                                 CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
-    super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+    super(cluster, traits, left, right, includeCorrelateVar, correlationId, requiredColumns, semiJoinType);
   }
 
   @Override
   public Correlate copy(RelTraitSet traitSet,
         RelNode left, RelNode right, CorrelationId correlationId,
         ImmutableBitSet requiredColumns, SemiJoinType joinType) {
-    return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
+    return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
         this.getJoinType());
   }
 
@@ -50,7 +50,7 @@ public Correlate copy(RelTraitSet traitSet,
   public LogicalOperator implement(DrillImplementor implementor) {
     final List<String> fields = getRowType().getFieldNames();
     assert DrillJoinRel.isUnique(fields);
-    final int leftCount = left.getRowType().getFieldCount();
+    final int leftCount = getInputSize(0,left);
 
     final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this);
     final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
new file mode 100644
index 00000000000..5cb984a4c38
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rules.ProjectCorrelateTransposeRule;
+import org.apache.calcite.rel.rules.PushProjector;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+public class DrillProjectLateralJoinTransposeRule extends ProjectCorrelateTransposeRule {
+
+  public static final DrillProjectLateralJoinTransposeRule INSTANCE = new DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER);
+
+  public DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
+    super(preserveExprCondition, relFactory);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    Correlate correlate = call.rel(1);
+
+
+    // No need to call ProjectCorrelateTransposeRule if the current lateralJoin contains excludeCorrelationColumn set to true.
+    // This is needed as the project push into Lateral join rule changes the output row type which will fail assertions in ProjectCorrelateTransposeRule.
+    if (correlate instanceof DrillLateralJoinRel &&
+        ((DrillLateralJoinRel)correlate).excludeCorrelateColumn) {
+      return false;
+    }
+
+    return true;
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
new file mode 100644
index 00000000000..6a57c89fbba
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.drill.exec.planner.StarColumnHelper;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class DrillProjectPushIntoLateralJoinRule extends RelOptRule {
+
+  public static final DrillProjectPushIntoLateralJoinRule INSTANCE =
+    new DrillProjectPushIntoLateralJoinRule(RelFactories.LOGICAL_BUILDER);
+
+
+  public DrillProjectPushIntoLateralJoinRule(RelBuilderFactory relFactory) {
+    super(operand(DrillProjectRel.class,
+        operand(DrillLateralJoinRel.class, any())),
+      relFactory, null);
+  }
+
+  public void onMatch(RelOptRuleCall call) {
+    DrillProjectRel origProj = call.rel(0);
+    final DrillLateralJoinRel corr = call.rel(1);
+
+    if (StarColumnHelper.containsStarColumn(origProj.getRowType()) ||
+        StarColumnHelper.containsStarColumn(corr.getRowType()) ||
+         corr.excludeCorrelateColumn) {
+      return;
+    }
+    DrillRelOptUtil.InputRefVisitor collectRefs = new DrillRelOptUtil.InputRefVisitor();
+    for (RexNode exp: origProj.getChildExps()) {
+      exp.accept(collectRefs);
+    }
+
+    int correlationIndex = corr.getRequiredColumns().nextSetBit(0);
+    for (RexInputRef inputRef : collectRefs.getInputRefs()) {
+      if (inputRef.getIndex() == correlationIndex) {
+        return;
+      }
+    }
+
+    final RelNode left = corr.getLeft();
+    final RelNode right = corr.getRight();
+    final RelNode convertedLeft = convert(left, left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+    final RelNode convertedRight = convert(right, right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+
+    final RelTraitSet traits = corr.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+    RelNode relNode = new DrillLateralJoinRel(corr.getCluster(),
+                            traits, convertedLeft, convertedRight, true, corr.getCorrelationId(),
+                            corr.getRequiredColumns(), corr.getJoinType());
+
+    if (!DrillRelOptUtil.isTrivialProject(origProj, true)) {
+      Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex);
+      List<RexNode> outputExprs = transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr);
+
+      relNode = new DrillProjectRel(origProj.getCluster(),
+                                    left.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+                                    relNode, outputExprs, origProj.getRowType());
+    }
+    call.transformTo(relNode);
+  }
+
+  private List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
+    List<RexNode> outputExprs = new ArrayList<>();
+    DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
+    for (RexNode expr : exprs) {
+      outputExprs.add(transformer.go(expr));
+    }
+    return outputExprs;
+  }
+
+  private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) {
+    int index = 0;
+    Map<Integer, Integer> result = new HashMap();
+    for (int i=0;i<corr.getRowType().getFieldList().size();i++) {
+      if (i == correlationIndex) {
+        continue;
+      } else {
+        result.put(i, index++);
+      }
+    }
+    return result;
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
index 565871bb21b..b55076bd02e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
@@ -22,6 +22,7 @@
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
@@ -30,6 +31,8 @@
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.ListUtils;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
@@ -38,21 +41,23 @@
 import org.apache.drill.exec.record.BatchSchema;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
 
 
-  protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+  protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
                             CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
-    super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+    super(cluster, traits, left, right, excludeCorrelateCol, correlationId, requiredColumns, semiJoinType);
   }
+
   @Override
   public Correlate copy(RelTraitSet traitSet,
                         RelNode left, RelNode right, CorrelationId correlationId,
                         ImmutableBitSet requiredColumns, SemiJoinType joinType) {
-    return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
+    return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
         this.getJoinType());
   }
 
@@ -63,11 +68,22 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
     PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
 
     SemiJoinType jtype = this.getJoinType();
-
-    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType());
+    List<SchemaPath> excludedColumns = new ArrayList<>();
+    if (getColumn() != null) {
+      excludedColumns.add(getColumn());
+    }
+    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), excludedColumns);
     return creator.addMetadata(this, ljoin);
   }
 
+  private SchemaPath getColumn() {
+    if (this.excludeCorrelateColumn) {
+      int index = this.getRequiredColumns().asList().get(0);
+      return  SchemaPath.getSimplePath(this.getInput(0).getRowType().getFieldNames().get(index));
+    }
+    return null;
+  }
+
   /**
    * Check to make sure that the fields of the inputs are the same as the output field names.
    * If not, insert a project renaming them.
@@ -76,8 +92,8 @@ public RelNode getLateralInput(int offset, RelNode input) {
     Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType()));
     final List<String> fields = getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
-    if (!outputFields.equals(inputFields)) {
+    final List<String> outputFields = fields.subList(offset, offset + getInputSize(offset, input));
+    if (ListUtils.subtract(outputFields, inputFields).size() != 0) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
       // lost.
@@ -104,6 +120,16 @@ private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, List<S
     return proj;
   }
 
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    if (this.excludeCorrelateColumn) {
+      return super.explainTerms(pw).item("column excluded from output: ", this.getColumn());
+    } else {
+      return super.explainTerms(pw);
+    }
+  }
+
+
   @Override
   public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E {
     return visitor.visitLateral(this, value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
index e531dca4a9a..10e247b010c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
@@ -48,7 +48,7 @@ public void onMatch(RelOptRuleCall call) {
 
     final LateralJoinPrel lateralJoinPrel = new LateralJoinPrel(lateralJoinRel.getCluster(),
                                   corrTraits,
-                                  convertedLeft, convertedRight, lateralJoinRel.getCorrelationId(),
+                                  convertedLeft, convertedRight, lateralJoinRel.excludeCorrelateColumn, lateralJoinRel.getCorrelationId(),
                                   lateralJoinRel.getRequiredColumns(),lateralJoinRel.getJoinType());
     call.transformTo(lateralJoinPrel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
index d450c5616fe..850f0bdf088 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
@@ -76,7 +76,7 @@ public Prel visitLateral(LateralJoinPrel prel, Void value) throws RuntimeExcepti
 
     List<RelNode> children = getChildren(prel);
 
-    final int leftCount = children.get(0).getRowType().getFieldCount();
+    final int leftCount = prel.getInputSize(0,children.get(0));
 
     List<RelNode> reNamedChildren = Lists.newArrayList();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 2ebe8878309..4344e1374a0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -17,29 +17,44 @@
  */
 package org.apache.drill.exec.record;
 
+import java.util.Set;
+
 public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
 
   private int rowWidth[];
   private RecordBatch recordBatch[];
+  private Set<String> columnsToExclude;
 
   private static final int numInputs = 2;
   public static final int LEFT_INDEX = 0;
   public static final int RIGHT_INDEX = 1;
 
-  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
+  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
+                                RecordBatch rightBatch, Set<String> excludedColumns) {
     super(numInputs, outputBatchSize);
     recordBatch = new RecordBatch[numInputs];
     recordBatch[LEFT_INDEX] = leftBatch;
     recordBatch[RIGHT_INDEX] = rightBatch;
     rowWidth = new int[numInputs];
+    this.columnsToExclude = excludedColumns;
   }
 
   private int updateInternal(int inputIndex, int outputPosition,  boolean useAggregate) {
     updateIncomingStats(inputIndex);
     rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
 
-    final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
+    // Reduce the width of excluded columns from actual rowWidth
+    for (String columnName : columnsToExclude) {
+      final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
+      if (currentColSizer == null) {
+        continue;
+      }
+      rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
+    }
+
+    // Get final net outgoing row width after reducing the excluded columns width
+    int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
 
     // If outgoing row width is 0 or there is no change in outgoing row width, just return.
     // This is possible for empty batches or
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 79a7bd438b0..a5c3c193863 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -17,9 +17,11 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import avro.shaded.com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -28,11 +30,13 @@
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.DirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.test.rowSet.schema.SchemaBuilder;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -107,7 +111,7 @@ public static void setUpBeforeClass() throws Exception {
       .buildSchema();
     emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
 
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
   }
 
   @AfterClass
@@ -1488,7 +1492,7 @@ public void testBasicLeftLateralJoin() throws Exception {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1554,7 +1558,7 @@ public void testLeftLateralJoin_WithMatchingAndEmptyBatch() throws Exception {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1622,7 +1626,7 @@ public void testLeftLateralJoin_WithAndWithoutMatching() throws Exception {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1693,7 +1697,7 @@ public void testLeftLateralJoin_WithAndWithoutMatching_MultipleBatch() throws Ex
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1754,7 +1758,7 @@ public void testMultipleUnnestAtSameLevel() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1863,7 +1867,7 @@ public void testMultiLevelLateral() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1964,7 +1968,7 @@ public void testMultiLevelLateral_MultipleOutput() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2091,7 +2095,7 @@ public void testMultiLevelLateral_SchemaChange_LeftUnnest() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2225,7 +2229,7 @@ public void testMultiLevelLateral_SchemaChange_RightUnnest() throws Exception {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2369,7 +2373,7 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest() throws Exceptio
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2723,7 +2727,7 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2802,4 +2806,159 @@ public void testMultiLevelLateral_SchemaChange_LeftRightUnnest_NonEmptyBatch() t
       leftOutcomes2.clear();
     }
   }
+
+  private void testExcludedColumns(List<SchemaPath> excludedCols, CloseableRecordBatch left,
+                                   CloseableRecordBatch right, RowSet expectedRowSet) throws Exception {
+    LateralJoinPOP lateralPop = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    final LateralJoinBatch ljBatch = new LateralJoinBatch(lateralPop, fixture.getFragmentContext(), left, right);
+
+    try {
+      assertTrue(RecordBatch.IterOutcome.OK_NEW_SCHEMA == ljBatch.next());
+      assertTrue(RecordBatch.IterOutcome.OK == ljBatch.next());
+      RowSet actualRowSet = DirectRowSet.fromContainer(ljBatch.getContainer());
+      new RowSetComparison(expectedRowSet).verify(actualRowSet);
+      assertTrue(RecordBatch.IterOutcome.NONE == ljBatch.next());
+    } finally {
+      ljBatch.close();
+      left.close();
+      right.close();
+      expectedRowSet.clear();
+    }
+  }
+
+  @Test
+  public void testFillingUpOutputBatch_WithExcludedColumns() throws Exception {
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(2, 20, "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 41, "item41")
+      .addRow(5, 51, "item51")
+      .build();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      .addRow(1, "item1", 1, 11, "item11")
+      .addRow(1, "item1", 2, 21, "item21")
+      .addRow(1, "item1", 3, 31, "item31")
+      .addRow(2, "item20", 4, 41, "item41")
+      .addRow(2, "item20", 5, 51, "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+
+    try {
+      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+    } finally {
+      // Close all the resources for this test case
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
+
+  @Test
+  public void testFillingUpOutputBatch_With2ExcludedColumns() throws Exception {
+    // Create data for left input
+    final RowSet.SingleRowSet leftRowSet2 = fixture.rowSetBuilder(leftSchema)
+      .addRow(2, 20, "item20")
+      .build();
+
+    // Create data for right input
+    final RowSet.SingleRowSet nonEmptyRightRowSet2 = fixture.rowSetBuilder(rightSchema)
+      .addRow(4, 41, "item41")
+      .addRow(5, 51, "item51")
+      .build();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      //.add("id_right", TypeProtos.MinorType.INT)
+      .add("cost_right", TypeProtos.MinorType.INT)
+      .add("name_right", TypeProtos.MinorType.VARCHAR)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet = fixture.rowSetBuilder(expectedSchema)
+      /*.addRow("item1", 1, 11, "item11")
+      .addRow("item1", 2, 21, "item21")
+      .addRow("item1", 3, 31, "item31")
+      .addRow("item20", 4, 41, "item41")
+      .addRow("item20", 5, 51, "item51") */
+      .addRow("item1", 11, "item11")
+      .addRow("item1", 21, "item21")
+      .addRow("item1", 31, "item31")
+      .addRow("item20", 41, "item41")
+      .addRow("item20", 51, "item51")
+      .build();
+
+    // Get the left container with dummy data for Lateral Join
+    leftContainer.add(nonEmptyLeftRowSet.container());
+    leftContainer.add(leftRowSet2.container());
+
+    // Get the left IterOutcomes for Lateral Join
+    leftOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    leftOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    // Create Left MockRecordBatch
+    final CloseableRecordBatch leftMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      leftContainer, leftOutcomes, leftContainer.get(0).getSchema());
+
+    // Get the right container with dummy data
+    rightContainer.add(emptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet.container());
+    rightContainer.add(nonEmptyRightRowSet2.container());
+
+    rightOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    rightOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
+      rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
+
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("cost_left"));
+    excludedCols.add(SchemaPath.getSimplePath("id_left"));
+    excludedCols.add(SchemaPath.getSimplePath("id_right"));
+
+    try {
+      testExcludedColumns(excludedCols, leftMockBatch, rightMockBatch, expectedRowSet);
+    } finally {
+      // Close all the resources for this test case
+      leftRowSet2.clear();
+      nonEmptyRightRowSet2.clear();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 53df9ebf59a..8ff164fe3a6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -58,6 +58,10 @@ public void testLateralPlan1() throws Exception {
   public void testLateralSql() throws Exception {
     String Sql = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
         " unnest(t.orders) t2(ord) limit 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"},
+      new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -68,9 +72,16 @@ public void testLateralSql() throws Exception {
 
   @Test
   public void testExplainLateralSql() throws Exception {
-    String Sql = "explain plan without implementation for select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
+    String explainSql = "explain plan without implementation for select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
         " unnest(t.orders) t2(ord) limit 1";
-    test(Sql);
+
+    String Sql = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
+      " unnest(t.orders) t2(ord) limit 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"},
+      new String[]{});
+
+    test(explainSql);
   }
 
   @Test
@@ -82,6 +93,9 @@ public void testFilterPushCorrelate() throws Exception {
     PlanTestBase.testPlanMatchingPatterns(query, new String[]{"LateralJoin(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
         new String[]{});
 
+    PlanTestBase.testPlanMatchingPatterns(query, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"},
+      new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(query)
@@ -94,6 +108,10 @@ public void testFilterPushCorrelate() throws Exception {
   public void testLateralSqlPlainCol() throws Exception {
     String Sql = "select t.c_name, t2.phone as c_phone from cp.`lateraljoin/nested-customer.json` t,"
         + " unnest(t.c_phone) t2(phone) limit 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`c_phone\\`\\]"},
+      new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -106,6 +124,9 @@ public void testLateralSqlPlainCol() throws Exception {
   public void testLateralSqlStar() throws Exception {
     String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{},
+      new String[]{"column excluded from output: =\\[\\`orders\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -118,6 +139,9 @@ public void testLateralSqlStar() throws Exception {
   public void testLateralSqlStar2() throws Exception {
     String Sql = "select c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{},
+      new String[]{"column excluded from output: =\\[\\`orders\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -130,6 +154,9 @@ public void testLateralSqlStar2() throws Exception {
   public void testLateralSqlStar3() throws Exception {
     String Sql = "select Orders.*, c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{},
+      new String[]{"column excluded from output: =\\[\\`orders\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -142,6 +169,8 @@ public void testLateralSqlStar3() throws Exception {
   public void testLateralSqlStar4() throws Exception {
     String Sql = "select Orders.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -158,11 +187,14 @@ public void testLateralSqlWithAS() throws Exception {
         " (select c_name, flatten(orders) from cp" +
         ".`lateraljoin/nested-customer.parquet` ) as t2(name, orders) on t.c_name = t2.name";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
         .sqlBaselineQuery(baselineQuery)
         .go();
+
   }
 
   @Test
@@ -174,6 +206,8 @@ public void testMultiUnnestLateralAtSameLevel() throws Exception {
         " (select c_name, flatten(orders) from cp.`lateraljoin/nested-customer.parquet` ) as t2 (name, orders) on t.c_name = t2.name " +
         " inner join (select c_name, flatten(orders) from cp.`lateraljoin/nested-customer.parquet` ) as t3(name, orders) on t.c_name = t3.name";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -190,6 +224,9 @@ public void testSubQuerySql() throws Exception {
     String baselineQuery = "select t.c_name, t3.orders.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " +
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name ";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{"column excluded from output: =\\[\\`items\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -206,6 +243,9 @@ public void testUnnestWithFilter() throws Exception {
     String baselineQuery = "select t.c_name, t3.orders.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " +
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{"column excluded from output: =\\[\\`items\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -246,11 +286,14 @@ public void testUnnestWithAggOnOuterTable() throws Exception {
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group by t.c_id";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]", "column excluded from output: =\\[\\`items\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
         .sqlBaselineQuery(baselineQuery)
         .go();
+
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 03fd1c1383f..c2e64f4d47b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.unnest;
 
+import com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.DrillException;
@@ -36,8 +37,8 @@
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
 import org.apache.drill.exec.planner.logical.DrillLogicalTestutils;
 import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.store.mock.MockStorePOP;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VarCharVector;
@@ -69,7 +70,7 @@
 
   @BeforeClass public static void setUpBeforeClass() throws Exception {
     mockPopConfig = new MockStorePOP(null);
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
     operatorContext = fixture.newOperatorContext(mockPopConfig);
   }
 
@@ -105,7 +106,7 @@ public void testUnnestFixedWidthColumn() {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -139,7 +140,7 @@ public void testUnnestVarWidthColumn() {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -160,7 +161,7 @@ public void testUnnestMapColumn() {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -191,7 +192,7 @@ public void testUnnestEmptyList() {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -239,7 +240,7 @@ public void testUnnestMultipleNewSchemaIncoming() {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
@@ -288,28 +289,15 @@ public void testUnnestSchemaChange() {
         RecordBatch.IterOutcome.OK_NEW_SCHEMA};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     }
 
   }
 
-  @Test
-  public void testUnnestLimitBatchSize() {
-
-    final int limitedOutputBatchSize = 127;
-    final int inputBatchSize = limitedOutputBatchSize + 1;
-    // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
-    //  Lateral output batch size =  N * input row size + N * size of single unnest column
-    //                            =  N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
-    //                              + N * 4
-    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
-    //                            = N * (16 + 4N) + N * 4
-    //                            = 4N * (N + 5)
-    // configure the output batch size to be one more record than that so that the batch sizer can round down
-    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
-
+  private void testUnnestBatchSizing(int inputBatchSize, int limitOutputBatchSize,
+                                     int limitOutputBatchSizeBytes, boolean excludeUnnestColumn) {
     // single record batch with single row. The unnest column has one
     // more record than the batch size we want in the output
     Object[][] data = new Object[1][1];
@@ -322,39 +310,76 @@ public void testUnnestLimitBatchSize() {
         }
       }
     }
+
     Integer[][][] baseline = new Integer[2][2][];
-    baseline[0][0] = new Integer[limitedOutputBatchSize];
-    baseline[0][1] = new Integer[limitedOutputBatchSize];
+    baseline[0][0] = new Integer[limitOutputBatchSize];
+    baseline[0][1] = new Integer[limitOutputBatchSize];
     baseline[1][0] = new Integer[1];
     baseline[1][1] = new Integer[1];
-    for (int i = 0; i < limitedOutputBatchSize; i++) {
+    for (int i = 0; i < limitOutputBatchSize; i++) {
       baseline[0][0][i] = 1;
       baseline[0][1][i] = i;
     }
     baseline[1][0][0] = 1; // row Num
-    baseline[1][1][0] = limitedOutputBatchSize; // value
+    baseline[1][1][0] = limitOutputBatchSize; // value
 
     // Create input schema
     TupleMetadata incomingSchema = new SchemaBuilder()
-        .add("rowNumber", TypeProtos.MinorType.INT)
-        .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
+      .add("rowNumber", TypeProtos.MinorType.INT)
+      .addArray("unnestColumn", TypeProtos.MinorType.INT).buildSchema();
 
     TupleMetadata[] incomingSchemas = {incomingSchema};
 
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK};
 
     final long outputBatchSize = fixture.getFragmentContext().getOptions().getOption(ExecConstants
-        .OUTPUT_BATCH_SIZE_VALIDATOR);
-    fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
+      .OUTPUT_BATCH_SIZE_VALIDATOR);
+    fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, excludeUnnestColumn);
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
       fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize);
     }
+  }
 
+  @Test
+  public void testUnnestLimitBatchSize_WithExcludedCols() {
+    LateralJoinPOP previoudPop = ljPopConfig;
+    List<SchemaPath> excludedCols = new ArrayList<>();
+    excludedCols.add(SchemaPath.getSimplePath("unnestColumn"));
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, excludedCols);
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // Since we want 127 row count and because of nearest power of 2 adjustment output row count will be reduced to
+    // 64. So we should configure batch size for (N+1) rows if we want to output N rows where N is not power of 2
+    // size of lateral output batch = (N+1)*8 bytes, where N = output batch row count
+    //  Lateral output batch size = (N+1) * (input row size without unnest field) + (N+1) * size of single unnest column
+    //                            = (N+1) * (size of row id) + (N+1) * (size of single array entry)
+    //                            = (N+1)*4 + (N+1) * 4
+    //                            = (N+1) * 8
+    // configure the output batch size to be one more record than that so that the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 8 * (limitedOutputBatchSize + 1);
+    testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, true);
+    ljPopConfig = previoudPop;
+  }
+
+  @Test
+  public void testUnnestLimitBatchSize() {
+    final int limitedOutputBatchSize = 127;
+    final int inputBatchSize = limitedOutputBatchSize + 1;
+    // size of lateral output batch = 4N * (N + 5) bytes, where N = output batch row count
+    //  Lateral output batch size =  N * input row size + N * size of single unnest column
+    //                            =  N * (size of row id + size of array offset vector + (N + 1 )*size of single array entry))
+    //                              + N * 4
+    //                            = N * (4 + 2*4 + (N+1)*4 )  + N * 4
+    //                            = N * (16 + 4N) + N * 4
+    //                            = 4N * (N + 5)
+    // configure the output batch size to be one more record than that so that the batch sizer can round down
+    final int limitedOutputBatchSizeBytes = 4 * limitedOutputBatchSize * (limitedOutputBatchSize + 6);
+    testUnnestBatchSizing(inputBatchSize, limitedOutputBatchSize, limitedOutputBatchSizeBytes, false);
   }
 
   @Test
@@ -404,7 +429,7 @@ public void testUnnestKillFromLimitSubquery1() {
     fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
@@ -462,7 +487,7 @@ public void testUnnestKillFromLimitSubquery2() {
     fixture.getFragmentContext().getOptions().setLocalOption(ExecConstants.OUTPUT_BATCH_SIZE, limitedOutputBatchSizeBytes);
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline); // Limit of 100 values for unnest.
+      testUnnest(incomingSchemas, iterOutcomes, -1, 1, data, baseline, false); // Limit of 100 values for unnest.
     } catch (Exception e) {
       fail("Failed due to exception: " + e.getMessage());
     } finally {
@@ -495,7 +520,7 @@ public void testUnnestNonArrayColumn() {
     RecordBatch.IterOutcome[] iterOutcomes = {RecordBatch.IterOutcome.OK_NEW_SCHEMA, RecordBatch.IterOutcome.OK};
 
     try {
-      testUnnest(incomingSchemas, iterOutcomes, data, baseline);
+      testUnnest(incomingSchemas, iterOutcomes, data, baseline, false);
     } catch (UserException|UnsupportedOperationException e) {
       return; // succeeded
     } catch (Exception e) {
@@ -510,8 +535,9 @@ public void testUnnestNonArrayColumn() {
       TupleMetadata[] incomingSchemas,
       RecordBatch.IterOutcome[] iterOutcomes,
       T[][] data,
-      T[][][] baseline ) throws Exception{
-    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline);
+      T[][][] baseline,
+      boolean excludeUnnestColumn) throws Exception{
+    testUnnest(incomingSchemas, iterOutcomes, -1, -1, data, baseline, excludeUnnestColumn);
   }
 
   // test unnest for various input conditions optionally invoking kill. if the kill or killBatch
@@ -521,7 +547,8 @@ public void testUnnestNonArrayColumn() {
       int unnestLimit, // kill unnest after every 'unnestLimit' number of values in every record
       int execKill, // number of batches after which to kill the execution (!)
       T[][] data,
-      T[][][] baseline) throws Exception {
+      T[][][] baseline,
+      boolean excludeUnnestColumn) throws Exception {
 
     // Get the incoming container with dummy data for LJ
     final List<VectorContainer> incomingContainer = new ArrayList<>(data.length);
@@ -605,7 +632,9 @@ public void testUnnestNonArrayColumn() {
       //int valueIndex = 0;
       for ( List<ValueVector> batch: results) {
         int vectorCount= batch.size();
-        if (vectorCount!= baseline[batchIndex].length+1) { // baseline does not include the original unnest column
+        int expectedVectorCount = (excludeUnnestColumn) ? 0 : 1;
+        expectedVectorCount += baseline[batchIndex].length;
+        if (vectorCount!= expectedVectorCount) { // baseline does not include the original unnest column
           fail("Test failed in validating unnest output. Batch column count mismatch.");
         }
         for (ValueVector vv : batch) {
@@ -875,8 +904,8 @@ private boolean isTerminal(RecordBatch.IterOutcome outcome) {
     final ProjectRecordBatch projectBatch2 =
         new ProjectRecordBatch(projectPopConfig2, unnestBatch2, fixture.getFragmentContext());
 
-    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER);
-    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER);
+    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER, Lists.newArrayList());
+    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lateralJoinBatch2 =
         new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), projectBatch1, projectBatch2);
diff --git a/pom.xml b/pom.xml
index 30b9129d8bd..d1d65eb9f72 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
     <parquet.version>1.10.0</parquet.version>
-    <calcite.version>1.16.0-drill-r4</calcite.version>
+    <calcite.version>1.16.0-drill-r6</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
     <sqlline.version>1.1.9-drill-r7</sqlline.version>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services