You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/07/02 06:23:36 UTC

[drill] branch master updated (f481a7c -> 482a635)

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from f481a7c  DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
     new e6a0cdd1 DRILL-4020: The not-equal operator returns incorrect results when used on the HBase row key
     new 7c22e35  DRILL-4580: Support for exporting storage plugin configurations
     new 8ec2dc6  DRILL-6545: Projection Push down into Lateral Join operator.
     new bf40c5c  DRILL-6548: IllegalStateException: Unexpected EMIT outcome received in buildSchema phase
     new dcc2580  DRILL-6554: Minor code improvements in parquet statistics handling
     new 482a635  DRILL-6537: Limit the batch size for buffering operators based on how much memory they get

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../drill/exec/store/hbase/HBaseFilterBuilder.java |   1 +
 .../drill/hbase/TestHBaseFilterPushDown.java       |  18 ++++
 .../java/org/apache/drill/exec/ExecConstants.java  |   4 +
 .../drill/exec/expr/stat/ParquetIsPredicate.java   |  16 +--
 .../exec/expr/stat/ParquetPredicatesHelper.java    |  16 +--
 .../drill/exec/physical/config/LateralJoinPOP.java |  15 ++-
 .../drill/exec/physical/impl/TopN/TopNBatch.java   |   1 +
 .../exec/physical/impl/join/HashJoinBatch.java     |  10 +-
 .../apache/drill/exec/planner/PlannerPhase.java    |   5 +-
 .../planner/common/DrillLateralJoinRelBase.java    |  64 +++++++++++-
 .../drill/exec/planner/common/DrillRelOptUtil.java |  70 +++++++++++++
 .../exec/planner/logical/DrillCorrelateRule.java   |   2 +-
 .../exec/planner/logical/DrillLateralJoinRel.java  |   8 +-
 .../DrillProjectLateralJoinTransposeRule.java      |  50 +++++++++
 .../DrillProjectPushIntoLateralJoinRule.java       | 113 +++++++++++++++++++++
 .../exec/planner/physical/LateralJoinPrel.java     |  40 ++++++--
 .../exec/planner/physical/LateralJoinPrule.java    |   2 +-
 .../physical/visitor/JoinPrelRenameVisitor.java    |   2 +-
 .../exec/server/options/SystemOptionManager.java   |   3 +-
 .../drill/exec/server/rest/StorageResources.java   |  10 ++
 .../exec/store/parquet/ParquetReaderUtility.java   |  15 +--
 .../exec/store/parquet/metadata/Metadata.java      |   2 +-
 .../java-exec/src/main/resources/drill-module.conf |   1 +
 .../src/main/resources/rest/storage/list.ftl       |   1 +
 .../src/main/resources/rest/storage/update.ftl     |   1 +
 .../physical/impl/TopN/TestTopNEmitOutcome.java    |  16 +++
 .../impl/join/TestLateralJoinCorrectness.java      |  25 ++---
 .../impl/lateraljoin/TestLateralPlans.java         |  47 ++++++++-
 .../unnest/TestUnnestWithLateralCorrectness.java   |   7 +-
 pom.xml                                            |   2 +-
 30 files changed, 497 insertions(+), 70 deletions(-)
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
 create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java


[drill] 05/06: DRILL-6554: Minor code improvements in parquet statistics handling

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit dcc25800902dea81e87e2c7e3ddfb7fd9b281b42
Author: Vlad Rozov <vr...@apache.org>
AuthorDate: Wed Jun 27 10:45:49 2018 -0700

    DRILL-6554: Minor code improvements in parquet statistics handling
    
    closes #1349
---
 .../apache/drill/exec/expr/stat/ParquetIsPredicate.java  | 16 ++++++++--------
 .../drill/exec/expr/stat/ParquetPredicatesHelper.java    | 16 ++++++++--------
 .../drill/exec/store/parquet/ParquetReaderUtility.java   | 15 ++++-----------
 .../drill/exec/store/parquet/metadata/Metadata.java      |  2 +-
 4 files changed, 21 insertions(+), 28 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
index 547dc06..42e6e0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
@@ -113,9 +113,9 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
    * IS TRUE predicate.
    */
   private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
-    return new ParquetIsPredicate<Boolean>(expr,
+    return new ParquetIsPredicate<Boolean>(expr, (exprStat, evaluator) ->
         //if max value is not true or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
+        isAllNulls(exprStat, evaluator.getRowCount()) || exprStat.hasNonNullValue() && !((BooleanStatistics) exprStat).getMax()
     );
   }
 
@@ -123,9 +123,9 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
    * IS FALSE predicate.
    */
   private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
-    return new ParquetIsPredicate<Boolean>(expr,
+    return new ParquetIsPredicate<Boolean>(expr, (exprStat, evaluator) ->
         //if min value is not false or if there are all nulls  -> canDrop
-        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
+        isAllNulls(exprStat, evaluator.getRowCount()) || exprStat.hasNonNullValue() && ((BooleanStatistics) exprStat).getMin()
     );
   }
 
@@ -133,9 +133,9 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
    * IS NOT TRUE predicate.
    */
   private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
-    return new ParquetIsPredicate<Boolean>(expr,
+    return new ParquetIsPredicate<Boolean>(expr, (exprStat, evaluator) ->
         //if min value is not false or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
+        hasNoNulls(exprStat) && exprStat.hasNonNullValue() && ((BooleanStatistics) exprStat).getMin()
     );
   }
 
@@ -143,9 +143,9 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
    * IS NOT FALSE predicate.
    */
   private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
-    return new ParquetIsPredicate<Boolean>(expr,
+    return new ParquetIsPredicate<Boolean>(expr, (exprStat, evaluator) ->
         //if max value is not true or if there are no nulls  -> canDrop
-        (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
+        hasNoNulls(exprStat) && exprStat.hasNonNullValue() && !((BooleanStatistics) exprStat).getMax()
     );
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index f804a7b..de4df1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.expr.stat;
 
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.statistics.Statistics;
 
 /**
@@ -28,7 +29,7 @@ class ParquetPredicatesHelper {
 
   /**
    * @param stat statistics object
-   * @return true if the input stat object has valid statistics; false otherwise
+   * @return <tt>true</tt> if the input stat object has valid statistics; false otherwise
    */
   static boolean isNullOrEmpty(Statistics stat) {
     return stat == null || stat.isEmpty();
@@ -39,22 +40,21 @@ class ParquetPredicatesHelper {
    *
    * @param stat parquet column statistics
    * @param rowCount number of rows in the parquet file
-   * @return True if all rows are null in the parquet file
-   *          False if at least one row is not null.
+   * @return <tt>true</tt> if all rows are null in the parquet file and <tt>false</tt> otherwise
    */
   static boolean isAllNulls(Statistics stat, long rowCount) {
-    return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
+    Preconditions.checkArgument(rowCount >= 0, String.format("negative rowCount %d is not valid", rowCount));
+    return stat.getNumNulls() == rowCount;
   }
 
   /**
-   * Checks that column chunk's statistics has at least one null
+   * Checks that column chunk's statistics does not have nulls
    *
    * @param stat parquet column statistics
-   * @return True if the parquet file has nulls
-   *          False if the parquet file hasn't nulls.
+   * @return <tt>true</tt> if the parquet file does not have nulls and <tt>false</tt> otherwise
    */
   static boolean hasNoNulls(Statistics stat) {
-    return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
+    return stat.getNumNulls() <= 0;
   }
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 40203f5..a7f78fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -32,7 +32,7 @@ import org.apache.drill.exec.work.ExecErrorConstants;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.format.ConvertedType;
 import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.SchemaElement;
@@ -417,16 +417,9 @@ public class ParquetReaderUtility {
             // column does not appear in this file, skip it
             continue;
           }
-          Statistics statistics = footer.getBlocks().get(rowGroupIndex).getColumns().get(colIndex).getStatistics();
-          Integer max = (Integer) statistics.genericGetMax();
-          if (statistics.hasNonNullValue()) {
-            if (max > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
-              return DateCorruptionStatus.META_SHOWS_CORRUPTION;
-            }
-          } else {
-            // no statistics, go check the first page
-            return DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
-          }
+          IntStatistics statistics = (IntStatistics) footer.getBlocks().get(rowGroupIndex).getColumns().get(colIndex).getStatistics();
+          return (statistics.hasNonNullValue() && statistics.compareMaxToValue(ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) > 0) ?
+              DateCorruptionStatus.META_SHOWS_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
         }
       }
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index a61cc18..2259169 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -473,7 +473,7 @@ public class Metadata {
           // Write stats when they are not null
           Object minValue = null;
           Object maxValue = null;
-          if (stats.genericGetMax() != null && stats.genericGetMin() != null ) {
+          if (stats.hasNonNullValue()) {
             minValue = stats.genericGetMin();
             maxValue = stats.genericGetMax();
             if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION


[drill] 04/06: DRILL-6548: IllegalStateException: Unexpected EMIT outcome received in buildSchema phase

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit bf40c5ca84c7a3c783984369564dc151349114b9
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Thu Jun 28 22:20:32 2018 -0700

    DRILL-6548: IllegalStateException: Unexpected EMIT outcome received in buildSchema phase
    
    closes #1352
---
 .../apache/drill/exec/physical/impl/TopN/TopNBatch.java  |  1 +
 .../exec/physical/impl/TopN/TestTopNEmitOutcome.java     | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index a8c6804..4fc0d15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -174,6 +174,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
         return;
       case NONE:
         state = BatchState.DONE;
+        return;
       case EMIT:
         throw new IllegalStateException("Unexpected EMIT outcome received in buildSchema phase");
       default:
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
index 9358ff7..04d06aa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TestTopNEmitOutcome.java
@@ -638,4 +638,20 @@ public class TestTopNEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
   }
+
+  @Test
+  public void testRegularTopNWithEmptyDataSetAndNoneOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputOutcomes.add(RecordBatch.IterOutcome.NONE);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final TopN topNConfig = new TopN(null,
+      Lists.newArrayList(ordering("id_left", RelFieldCollation.Direction.DESCENDING,
+        RelFieldCollation.NullDirection.FIRST)), false, 4);
+    final TopNBatch topNBatch = new TopNBatch(topNConfig, operatorFixture.getFragmentContext(), mockInputBatch);
+
+    assertTrue(topNBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
 }


[drill] 01/06: DRILL-4020: The not-equal operator returns incorrect results when used on the HBase row key

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e6a0cdd193ed1724d7cd1faedb5257c787f468b0
Author: Akihiko Kusanagi <na...@nagi-p.com>
AuthorDate: Tue Jun 5 23:12:13 2018 +0900

    DRILL-4020: The not-equal operator returns incorrect results when used on the HBase row key
    
    - Added a condition that checks if the filter to the scan specification doesn't have NOT_EQUAL operator
    - Added testFilterPushDownRowKeyNotEqual() to TestHBaseFilterPushDown
    
    closes #309
---
 .../drill/exec/store/hbase/HBaseFilterBuilder.java     |  1 +
 .../apache/drill/hbase/TestHBaseFilterPushDown.java    | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
index 8d2e8ff..6e1efe5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseFilterBuilder.java
@@ -61,6 +61,7 @@ public class HBaseFilterBuilder extends AbstractExprVisitor<HBaseScanSpec, Void,
        * remove it since its effect is also achieved through startRow and stopRow.
        */
       if (parsedSpec.filter instanceof RowFilter &&
+          ((RowFilter)parsedSpec.filter).getOperator() != CompareOp.NOT_EQUAL &&
           ((RowFilter)parsedSpec.filter).getComparator() instanceof BinaryComparator) {
         parsedSpec.filter = null;
       }
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index e6eff11..aee18eb 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -45,6 +45,24 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
   }
 
   @Test
+  public void testFilterPushDownRowKeyNotEqual() throws Exception {
+    setColumnWidths(new int[] {8, 38, 38});
+    final String sql = "SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "WHERE\n"
+        + "  row_key <> 'b4'";
+
+    runHBaseSQLVerifyCount(sql, 7);
+
+    final String[] expectedPlan = {".*startRow=, stopRow=, filter=RowFilter \\(NOT_EQUAL, b4\\).*"};
+    final String[] excludedPlan ={};
+    final String sqlHBase = canonizeHBaseSQL(sql);
+    PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan);
+  }
+
+  @Test
   public void testFilterPushDownRowKeyEqualWithItem() throws Exception {
     setColumnWidths(new int[] {20, 30});
     final String sql = "SELECT\n"


[drill] 03/06: DRILL-6545: Projection Push down into Lateral Join operator.

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 8ec2dc64175648103a5ec51f8ad98387496692a9
Author: HanumathRao <ha...@gmail.com>
AuthorDate: Thu Jun 21 18:42:24 2018 -0700

    DRILL-6545: Projection Push down into Lateral Join operator.
    
    closes #1347
---
 .../drill/exec/physical/config/LateralJoinPOP.java |  15 ++-
 .../apache/drill/exec/planner/PlannerPhase.java    |   5 +-
 .../planner/common/DrillLateralJoinRelBase.java    |  64 +++++++++++-
 .../drill/exec/planner/common/DrillRelOptUtil.java |  70 +++++++++++++
 .../exec/planner/logical/DrillCorrelateRule.java   |   2 +-
 .../exec/planner/logical/DrillLateralJoinRel.java  |   8 +-
 .../DrillProjectLateralJoinTransposeRule.java      |  50 +++++++++
 .../DrillProjectPushIntoLateralJoinRule.java       | 113 +++++++++++++++++++++
 .../exec/planner/physical/LateralJoinPrel.java     |  40 ++++++--
 .../exec/planner/physical/LateralJoinPrule.java    |   2 +-
 .../physical/visitor/JoinPrelRenameVisitor.java    |   2 +-
 .../impl/join/TestLateralJoinCorrectness.java      |  25 ++---
 .../impl/lateraljoin/TestLateralPlans.java         |  47 ++++++++-
 .../unnest/TestUnnestWithLateralCorrectness.java   |   7 +-
 pom.xml                                            |   2 +-
 15 files changed, 414 insertions(+), 38 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index a12fed1..55ede96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -34,6 +35,9 @@ import java.util.List;
 public class LateralJoinPOP extends AbstractJoinPop {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
 
+  @JsonProperty("excludedColumns")
+  private List<SchemaPath> excludedColumns;
+
   @JsonProperty("unnestForLateralJoin")
   private UnnestPOP unnestForLateralJoin;
 
@@ -41,19 +45,21 @@ public class LateralJoinPOP extends AbstractJoinPop {
   public LateralJoinPOP(
       @JsonProperty("left") PhysicalOperator left,
       @JsonProperty("right") PhysicalOperator right,
-      @JsonProperty("joinType") JoinRelType joinType) {
+      @JsonProperty("joinType") JoinRelType joinType,
+      @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
     super(left, right, joinType, null, null);
     Preconditions.checkArgument(joinType != JoinRelType.FULL,
       "Full outer join is currently not supported with Lateral Join");
     Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
       "Right join is currently not supported with Lateral Join");
+    this.excludedColumns = excludedColumns;
   }
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.size() == 2,
       "Lateral join should have two physical operators");
-    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), children.get(1), joinType);
+    LateralJoinPOP newPOP =  new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns);
     newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
     return newPOP;
   }
@@ -63,6 +69,11 @@ public class LateralJoinPOP extends AbstractJoinPop {
     return this.unnestForLateralJoin;
   }
 
+  @JsonProperty("excludedColumns")
+  public List<SchemaPath> getExcludedColumns() {
+    return this.excludedColumns;
+  }
+
   public void setUnnestForLateralJoin(UnnestPOP unnest) {
     this.unnestForLateralJoin = unnest;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index c8bb2a4..519d503 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -37,6 +37,8 @@ import org.apache.drill.exec.planner.logical.DrillJoinRel;
 import org.apache.drill.exec.planner.logical.DrillJoinRule;
 import org.apache.drill.exec.planner.logical.DrillLimitRule;
 import org.apache.drill.exec.planner.logical.DrillMergeProjectRule;
+import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule;
+import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule;
 import org.apache.drill.exec.planner.logical.DrillProjectRule;
 import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule;
 import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule;
@@ -287,7 +289,8 @@ public enum PlannerPhase {
       // Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner
       // RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE,
       DrillFilterAggregateTransposeRule.INSTANCE,
-
+      DrillProjectLateralJoinTransposeRule.INSTANCE,
+      DrillProjectPushIntoLateralJoinRule.INSTANCE,
       RuleInstance.FILTER_MERGE_RULE,
       RuleInstance.FILTER_CORRELATE_RULE,
       RuleInstance.AGGREGATE_REMOVE_RULE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
index a7bbbca..28e5246 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.planner.common;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -25,17 +27,27 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 
+import java.util.ArrayList;
+import java.util.List;
+
 
 public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {
-  public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
-                                 CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
+
+  final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST;
+  final public boolean excludeCorrelateColumn;
+  public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
+                               CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
     super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+    this.excludeCorrelateColumn = excludeCorrelateCol;
   }
 
   @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
@@ -49,7 +61,53 @@ public abstract class DrillLateralJoinRelBase extends Correlate implements Drill
     double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth;
 
     double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
-    double memCost = 0;
+    double memCost = !excludeCorrelateColumn ? CORRELATE_MEM_COPY_COST : 0.0;
     return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost);
   }
+
+  @Override
+  protected RelDataType deriveRowType() {
+    switch (joinType) {
+      case LEFT:
+      case INNER:
+        return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
+          right.getRowType(), joinType.toJoinType(),
+          getCluster().getTypeFactory(), null,
+          ImmutableList.<RelDataTypeField>of()));
+      case ANTI:
+      case SEMI:
+        return constructRowType(left.getRowType());
+      default:
+        throw new IllegalStateException("Unknown join type " + joinType);
+    }
+  }
+
+  public int getInputSize(int offset, RelNode input) {
+    if (this.excludeCorrelateColumn &&
+      offset == 0) {
+      return input.getRowType().getFieldList().size() - 1;
+    }
+    return input.getRowType().getFieldList().size();
+  }
+
+  public RelDataType constructRowType(RelDataType inputRowType) {
+    Preconditions.checkArgument(this.requiredColumns.cardinality() == 1);
+
+    List<RelDataType> fields = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
+    if (excludeCorrelateColumn) {
+      int corrVariable = this.requiredColumns.nextSetBit(0);
+
+      for (RelDataTypeField field : inputRowType.getFieldList()) {
+        if (field.getIndex() == corrVariable) {
+          continue;
+        }
+        fieldNames.add(field.getName());
+        fields.add(field.getType());
+      }
+
+      return getCluster().getTypeFactory().createStructType(fields, fieldNames);
+    }
+    return inputRowType;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 36d7db2..9dd5032 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -18,9 +18,12 @@
 package org.apache.drill.exec.planner.common;
 
 import java.util.AbstractList;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
@@ -29,6 +32,7 @@ import org.apache.calcite.rel.rules.ProjectRemoveRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
@@ -282,4 +286,70 @@ public abstract class DrillRelOptUtil {
     }
     return false;
   }
+
+  /**
+   * InputRefVisitor is a utility class used to collect all the RexInputRef nodes in a
+   * RexNode.
+   *
+   */
+  public static class InputRefVisitor extends RexVisitorImpl<Void> {
+    private final List<RexInputRef> inputRefList;
+
+    public InputRefVisitor() {
+      super(true);
+      inputRefList = new ArrayList<>();
+    }
+
+    public Void visitInputRef(RexInputRef ref) {
+      inputRefList.add(ref);
+      return null;
+    }
+
+    public Void visitCall(RexCall call) {
+      for (RexNode operand : call.operands) {
+        operand.accept(this);
+      }
+      return null;
+    }
+
+    public List<RexInputRef> getInputRefs() {
+      return inputRefList;
+    }
+  }
+
+
+  /**
+   * RexFieldsTransformer is a utility class used to convert column refs in a RexNode
+   * based on inputRefMap (input to output ref map).
+   *
+   * This transformer can be used to find and replace the existing inputRef in a RexNode with a new inputRef.
+   */
+  public static class RexFieldsTransformer {
+    private final RexBuilder rexBuilder;
+    private final Map<Integer, Integer> inputRefMap;
+
+    public RexFieldsTransformer(
+      RexBuilder rexBuilder,
+      Map<Integer, Integer> inputRefMap) {
+      this.rexBuilder = rexBuilder;
+      this.inputRefMap = inputRefMap;
+    }
+
+    public RexNode go(RexNode rex) {
+      if (rex instanceof RexCall) {
+        ImmutableList.Builder<RexNode> builder = ImmutableList.builder();
+        final RexCall call = (RexCall) rex;
+        for (RexNode operand : call.operands) {
+          builder.add(go(operand));
+        }
+        return call.clone(call.getType(), builder.build());
+      } else if (rex instanceof RexInputRef) {
+        RexInputRef var = (RexInputRef) rex;
+        int index = var.getIndex();
+        return rexBuilder.makeInputRef(var.getType(), inputRefMap.get(index));
+      } else {
+        return rex;
+      }
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
index 52e603f..9f91818 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
@@ -46,7 +46,7 @@ public class DrillCorrelateRule extends RelOptRule {
 
     final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
     DrillLateralJoinRel lateralJoinRel = new DrillLateralJoinRel(correlate.getCluster(),
-        traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
+        traits, convertedLeft, convertedRight, false, correlate.getCorrelationId(),
         correlate.getRequiredColumns(), correlate.getJoinType());
     call.transformTo(lateralJoinRel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 035dae9..aa6ccb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -33,16 +33,16 @@ import java.util.List;
 
 public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements DrillRel {
 
-  protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+  protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean includeCorrelateVar,
                                 CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
-    super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+    super(cluster, traits, left, right, includeCorrelateVar, correlationId, requiredColumns, semiJoinType);
   }
 
   @Override
   public Correlate copy(RelTraitSet traitSet,
         RelNode left, RelNode right, CorrelationId correlationId,
         ImmutableBitSet requiredColumns, SemiJoinType joinType) {
-    return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
+    return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
         this.getJoinType());
   }
 
@@ -50,7 +50,7 @@ public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements Dril
   public LogicalOperator implement(DrillImplementor implementor) {
     final List<String> fields = getRowType().getFieldNames();
     assert DrillJoinRel.isUnique(fields);
-    final int leftCount = left.getRowType().getFieldCount();
+    final int leftCount = getInputSize(0,left);
 
     final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this);
     final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
new file mode 100644
index 0000000..5cb984a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rules.ProjectCorrelateTransposeRule;
+import org.apache.calcite.rel.rules.PushProjector;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+public class DrillProjectLateralJoinTransposeRule extends ProjectCorrelateTransposeRule {
+
+  public static final DrillProjectLateralJoinTransposeRule INSTANCE = new DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER);
+
+  public DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
+    super(preserveExprCondition, relFactory);
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    Correlate correlate = call.rel(1);
+
+
+    // No need to call ProjectCorrelateTransposeRule if the current lateralJoin contains excludeCorrelationColumn set to true.
+    // This is needed as the project push into Lateral join rule changes the output row type which will fail assertions in ProjectCorrelateTransposeRule.
+    if (correlate instanceof DrillLateralJoinRel &&
+        ((DrillLateralJoinRel)correlate).excludeCorrelateColumn) {
+      return false;
+    }
+
+    return true;
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
new file mode 100644
index 0000000..6a57c89
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.drill.exec.planner.StarColumnHelper;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class DrillProjectPushIntoLateralJoinRule extends RelOptRule {
+
+  public static final DrillProjectPushIntoLateralJoinRule INSTANCE =
+    new DrillProjectPushIntoLateralJoinRule(RelFactories.LOGICAL_BUILDER);
+
+
+  public DrillProjectPushIntoLateralJoinRule(RelBuilderFactory relFactory) {
+    super(operand(DrillProjectRel.class,
+        operand(DrillLateralJoinRel.class, any())),
+      relFactory, null);
+  }
+
+  public void onMatch(RelOptRuleCall call) {
+    DrillProjectRel origProj = call.rel(0);
+    final DrillLateralJoinRel corr = call.rel(1);
+
+    if (StarColumnHelper.containsStarColumn(origProj.getRowType()) ||
+        StarColumnHelper.containsStarColumn(corr.getRowType()) ||
+         corr.excludeCorrelateColumn) {
+      return;
+    }
+    DrillRelOptUtil.InputRefVisitor collectRefs = new DrillRelOptUtil.InputRefVisitor();
+    for (RexNode exp: origProj.getChildExps()) {
+      exp.accept(collectRefs);
+    }
+
+    int correlationIndex = corr.getRequiredColumns().nextSetBit(0);
+    for (RexInputRef inputRef : collectRefs.getInputRefs()) {
+      if (inputRef.getIndex() == correlationIndex) {
+        return;
+      }
+    }
+
+    final RelNode left = corr.getLeft();
+    final RelNode right = corr.getRight();
+    final RelNode convertedLeft = convert(left, left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+    final RelNode convertedRight = convert(right, right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+
+    final RelTraitSet traits = corr.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+    RelNode relNode = new DrillLateralJoinRel(corr.getCluster(),
+                            traits, convertedLeft, convertedRight, true, corr.getCorrelationId(),
+                            corr.getRequiredColumns(), corr.getJoinType());
+
+    if (!DrillRelOptUtil.isTrivialProject(origProj, true)) {
+      Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex);
+      List<RexNode> outputExprs = transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr);
+
+      relNode = new DrillProjectRel(origProj.getCluster(),
+                                    left.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+                                    relNode, outputExprs, origProj.getRowType());
+    }
+    call.transformTo(relNode);
+  }
+
+  private List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
+    List<RexNode> outputExprs = new ArrayList<>();
+    DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
+    for (RexNode expr : exprs) {
+      outputExprs.add(transformer.go(expr));
+    }
+    return outputExprs;
+  }
+
+  private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) {
+    int index = 0;
+    Map<Integer, Integer> result = new HashMap();
+    for (int i=0;i<corr.getRowType().getFieldList().size();i++) {
+      if (i == correlationIndex) {
+        continue;
+      } else {
+        result.put(i, index++);
+      }
+    }
+    return result;
+  }
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
index 565871b..b55076b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
@@ -30,6 +31,8 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.ListUtils;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.LateralJoinPOP;
 import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
@@ -38,21 +41,23 @@ import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
 import org.apache.drill.exec.record.BatchSchema;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
 public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
 
 
-  protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+  protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
                             CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
-    super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+    super(cluster, traits, left, right, excludeCorrelateCol, correlationId, requiredColumns, semiJoinType);
   }
+
   @Override
   public Correlate copy(RelTraitSet traitSet,
                         RelNode left, RelNode right, CorrelationId correlationId,
                         ImmutableBitSet requiredColumns, SemiJoinType joinType) {
-    return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
+    return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
         this.getJoinType());
   }
 
@@ -63,11 +68,22 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
     PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
 
     SemiJoinType jtype = this.getJoinType();
-
-    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType());
+    List<SchemaPath> excludedColumns = new ArrayList<>();
+    if (getColumn() != null) {
+      excludedColumns.add(getColumn());
+    }
+    LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), excludedColumns);
     return creator.addMetadata(this, ljoin);
   }
 
+  private SchemaPath getColumn() {
+    if (this.excludeCorrelateColumn) {
+      int index = this.getRequiredColumns().asList().get(0);
+      return  SchemaPath.getSimplePath(this.getInput(0).getRowType().getFieldNames().get(index));
+    }
+    return null;
+  }
+
   /**
    * Check to make sure that the fields of the inputs are the same as the output field names.
    * If not, insert a project renaming them.
@@ -76,8 +92,8 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
     Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType()));
     final List<String> fields = getRowType().getFieldNames();
     final List<String> inputFields = input.getRowType().getFieldNames();
-    final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
-    if (!outputFields.equals(inputFields)) {
+    final List<String> outputFields = fields.subList(offset, offset + getInputSize(offset, input));
+    if (ListUtils.subtract(outputFields, inputFields).size() != 0) {
       // Ensure that input field names are the same as output field names.
       // If there are duplicate field names on left and right, fields will get
       // lost.
@@ -105,6 +121,16 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
   }
 
   @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    if (this.excludeCorrelateColumn) {
+      return super.explainTerms(pw).item("column excluded from output: ", this.getColumn());
+    } else {
+      return super.explainTerms(pw);
+    }
+  }
+
+
+  @Override
   public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E {
     return visitor.visitLateral(this, value);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
index e531dca..10e247b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
@@ -48,7 +48,7 @@ public class LateralJoinPrule extends Prule {
 
     final LateralJoinPrel lateralJoinPrel = new LateralJoinPrel(lateralJoinRel.getCluster(),
                                   corrTraits,
-                                  convertedLeft, convertedRight, lateralJoinRel.getCorrelationId(),
+                                  convertedLeft, convertedRight, lateralJoinRel.excludeCorrelateColumn, lateralJoinRel.getCorrelationId(),
                                   lateralJoinRel.getRequiredColumns(),lateralJoinRel.getJoinType());
     call.transformTo(lateralJoinPrel);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
index d450c56..850f0bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
@@ -76,7 +76,7 @@ public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeEx
 
     List<RelNode> children = getChildren(prel);
 
-    final int leftCount = children.get(0).getRowType().getFieldCount();
+    final int leftCount = prel.getInputSize(0,children.get(0));
 
     List<RelNode> reNamedChildren = Lists.newArrayList();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
index 79a7bd4..caa8137 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestLateralJoinCorrectness.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.join;
 
+import avro.shaded.com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -107,7 +108,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
       .buildSchema();
     emptyRightRowSet = fixture.rowSetBuilder(rightSchema).build();
 
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
   }
 
   @AfterClass
@@ -1488,7 +1489,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1554,7 +1555,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1622,7 +1623,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1693,7 +1694,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT);
+    final LateralJoinPOP popConfig = new LateralJoinPOP(null, null, JoinRelType.LEFT, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch = new LateralJoinBatch(popConfig, fixture.getFragmentContext(),
       leftMockBatch, rightMockBatch);
@@ -1754,7 +1755,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch ljBatch_1 = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1863,7 +1864,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -1964,7 +1965,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2091,7 +2092,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2225,7 +2226,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2369,7 +2370,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
@@ -2723,7 +2724,7 @@ public class TestLateralJoinCorrectness extends SubOperatorTest {
     final CloseableRecordBatch rightMockBatch_1 = new MockRecordBatch(fixture.getFragmentContext(), operatorContext,
       rightContainer, rightOutcomes, rightContainer.get(0).getSchema());
 
-    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    final LateralJoinPOP popConfig_1 = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lowerLevelLateral = new LateralJoinBatch(popConfig_1, fixture.getFragmentContext(),
       leftMockBatch_1, rightMockBatch_1);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
index 53df9eb..8ff164f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestLateralPlans.java
@@ -58,6 +58,10 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testLateralSql() throws Exception {
     String Sql = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
         " unnest(t.orders) t2(ord) limit 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"},
+      new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -68,9 +72,16 @@ public class TestLateralPlans extends BaseTestQuery {
 
   @Test
   public void testExplainLateralSql() throws Exception {
-    String Sql = "explain plan without implementation for select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
+    String explainSql = "explain plan without implementation for select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
         " unnest(t.orders) t2(ord) limit 1";
-    test(Sql);
+
+    String Sql = "select t.c_name, t2.ord.o_shop as o_shop from cp.`lateraljoin/nested-customer.json` t," +
+      " unnest(t.orders) t2(ord) limit 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"},
+      new String[]{});
+
+    test(explainSql);
   }
 
   @Test
@@ -82,6 +93,9 @@ public class TestLateralPlans extends BaseTestQuery {
     PlanTestBase.testPlanMatchingPatterns(query, new String[]{"LateralJoin(.*[\n\r])+.*Filter(.*[\n\r])+.*Scan(.*[\n\r])+.*Filter"},
         new String[]{});
 
+    PlanTestBase.testPlanMatchingPatterns(query, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"},
+      new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(query)
@@ -94,6 +108,10 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testLateralSqlPlainCol() throws Exception {
     String Sql = "select t.c_name, t2.phone as c_phone from cp.`lateraljoin/nested-customer.json` t,"
         + " unnest(t.c_phone) t2(phone) limit 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`c_phone\\`\\]"},
+      new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -106,6 +124,9 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testLateralSqlStar() throws Exception {
     String Sql = "select * from cp.`lateraljoin/nested-customer.json` t, unnest(t.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{},
+      new String[]{"column excluded from output: =\\[\\`orders\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -118,6 +139,9 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testLateralSqlStar2() throws Exception {
     String Sql = "select c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{},
+      new String[]{"column excluded from output: =\\[\\`orders\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -130,6 +154,9 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testLateralSqlStar3() throws Exception {
     String Sql = "select Orders.*, c.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{},
+      new String[]{"column excluded from output: =\\[\\`orders\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -142,6 +169,8 @@ public class TestLateralPlans extends BaseTestQuery {
   public void testLateralSqlStar4() throws Exception {
     String Sql = "select Orders.* from cp.`lateraljoin/nested-customer.json` c, unnest(c.orders) Orders(ord) limit 0";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -158,11 +187,14 @@ public class TestLateralPlans extends BaseTestQuery {
         " (select c_name, flatten(orders) from cp" +
         ".`lateraljoin/nested-customer.parquet` ) as t2(name, orders) on t.c_name = t2.name";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
         .sqlBaselineQuery(baselineQuery)
         .go();
+
   }
 
   @Test
@@ -174,6 +206,8 @@ public class TestLateralPlans extends BaseTestQuery {
         " (select c_name, flatten(orders) from cp.`lateraljoin/nested-customer.parquet` ) as t2 (name, orders) on t.c_name = t2.name " +
         " inner join (select c_name, flatten(orders) from cp.`lateraljoin/nested-customer.parquet` ) as t3(name, orders) on t.c_name = t3.name";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -190,6 +224,9 @@ public class TestLateralPlans extends BaseTestQuery {
     String baselineQuery = "select t.c_name, t3.orders.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " +
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name ";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{"column excluded from output: =\\[\\`items\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -206,6 +243,9 @@ public class TestLateralPlans extends BaseTestQuery {
     String baselineQuery = "select t.c_name, t3.orders.items as items0, t3.items as items1 from cp.`lateraljoin/nested-customer.parquet` t " +
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1";
+
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]"}, new String[]{"column excluded from output: =\\[\\`items\\`\\]"});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
@@ -246,11 +286,14 @@ public class TestLateralPlans extends BaseTestQuery {
         " inner join (select c_name, f, flatten(t1.f.items) from (select c_name, flatten(orders) as f from cp.`lateraljoin/nested-customer.parquet`) as t1 ) " +
         "t3(name, orders, items) on t.c_name = t3.name where t.c_id > 1 group by t.c_id";
 
+    PlanTestBase.testPlanMatchingPatterns(Sql, new String[]{"column excluded from output: =\\[\\`orders\\`\\]", "column excluded from output: =\\[\\`items\\`\\]"}, new String[]{});
+
     testBuilder()
         .unOrdered()
         .sqlQuery(Sql)
         .sqlBaselineQuery(baselineQuery)
         .go();
+
   }
 
   @Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
index 03fd1c1..3a7f899 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.physical.impl.unnest;
 
+import com.google.common.collect.Lists;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.exceptions.DrillException;
@@ -69,7 +70,7 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
 
   @BeforeClass public static void setUpBeforeClass() throws Exception {
     mockPopConfig = new MockStorePOP(null);
-    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER);
+    ljPopConfig = new LateralJoinPOP(null, null, JoinRelType.INNER, Lists.newArrayList());
     operatorContext = fixture.newOperatorContext(mockPopConfig);
   }
 
@@ -875,8 +876,8 @@ public class TestUnnestWithLateralCorrectness extends SubOperatorTest {
     final ProjectRecordBatch projectBatch2 =
         new ProjectRecordBatch(projectPopConfig2, unnestBatch2, fixture.getFragmentContext());
 
-    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER);
-    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER);
+    final LateralJoinPOP ljPopConfig2 = new LateralJoinPOP(projectPopConfig1, projectPopConfig2, JoinRelType.INNER, Lists.newArrayList());
+    final LateralJoinPOP ljPopConfig1 = new LateralJoinPOP(mockPopConfig, ljPopConfig2, JoinRelType.INNER, Lists.newArrayList());
 
     final LateralJoinBatch lateralJoinBatch2 =
         new LateralJoinBatch(ljPopConfig2, fixture.getFragmentContext(), projectBatch1, projectBatch2);
diff --git a/pom.xml b/pom.xml
index 30b9129..d1d65eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
     <dep.guava.version>18.0</dep.guava.version>
     <forkCount>2</forkCount>
     <parquet.version>1.10.0</parquet.version>
-    <calcite.version>1.16.0-drill-r4</calcite.version>
+    <calcite.version>1.16.0-drill-r6</calcite.version>
     <avatica.version>1.11.0</avatica.version>
     <janino.version>2.7.6</janino.version>
     <sqlline.version>1.1.9-drill-r7</sqlline.version>


[drill] 06/06: DRILL-6537: Limit the batch size for buffering operators based on how much memory they get

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 482a63549e1bfe2b238ea9bdaf7d42312e1f51f6
Author: Padma Penumarthy <pa...@padmas-mbp.attlocal.net>
AuthorDate: Sun Jul 1 09:43:40 2018 -0700

    DRILL-6537: Limit the batch size for buffering operators based on how much memory they get
    
    closes #1342
---
 .../src/main/java/org/apache/drill/exec/ExecConstants.java     |  4 ++++
 .../apache/drill/exec/physical/impl/join/HashJoinBatch.java    | 10 +++++++---
 .../apache/drill/exec/server/options/SystemOptionManager.java  |  3 ++-
 exec/java-exec/src/main/resources/drill-module.conf            |  1 +
 4 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index bc16272..49f149b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -85,6 +85,10 @@ public final class ExecConstants {
   // need to produce very large batches that take up lot of memory.
   public static final LongValidator OUTPUT_BATCH_SIZE_VALIDATOR = new RangeLongValidator(OUTPUT_BATCH_SIZE, 128, 512 * 1024 * 1024);
 
+  // Based on available memory, adjust output batch size for buffered operators by this factor.
+  public static final String OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR = "drill.exec.memory.operator.output_batch_size_avail_mem_factor";
+  public static final DoubleValidator OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR = new RangeDoubleValidator(OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR, 0.01, 1.0);
+
   // External Sort Boot configuration
 
   public static final String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 428a47e..047c597 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -886,9 +886,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
     partitions = new HashPartition[0];
 
     // get the output batch size from config.
-    int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
-    batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
-    logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+    final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+    final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
+    int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
+    logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
+      configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
+
+    batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index e6368f5..a9c4742 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -233,6 +233,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
 
@@ -294,7 +295,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
    * Initializes this option manager.
    *
    * @return this option manager
-   * @throws IOException
+   * @throws Exception
    */
   public SystemOptionManager init() throws Exception {
     options = provider.getOrCreateStore(config);
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 23d59d3..2e8c2e7 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -438,6 +438,7 @@ drill.exec.options: {
     drill.exec.storage.implicit.suffix.column.label: "suffix",
     drill.exec.testing.controls: "{}",
     drill.exec.memory.operator.output_batch_size : 16777216, # 16 MB
+    drill.exec.memory.operator.output_batch_size_avail_mem_factor : 0.1,
     exec.bulk_load_table_list.bulk_size: 1000,
     exec.compile.scalar_replacement: false,
     exec.enable_bulk_load_table_list: false,


[drill] 02/06: DRILL-4580: Support for exporting storage plugin configurations

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7c22e35ef2a9ecc41cc15c5deefac9b306ea87a1
Author: agirish <ab...@gmail.com>
AuthorDate: Sun Apr 9 17:42:57 2017 -0700

    DRILL-4580: Support for exporting storage plugin configurations
    
    closes #1350
---
 .../org/apache/drill/exec/server/rest/StorageResources.java    | 10 ++++++++++
 exec/java-exec/src/main/resources/rest/storage/list.ftl        |  1 +
 exec/java-exec/src/main/resources/rest/storage/update.ftl      |  1 +
 3 files changed, 12 insertions(+)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index ca10860..b6f839b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -35,6 +35,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
@@ -134,6 +135,15 @@ public class StorageResources {
     }
   }
 
+  @GET
+  @Path("/storage/{name}/export")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response exportPlugin(@PathParam("name") String name) {
+    Response.ResponseBuilder response = Response.ok(getStoragePluginJSON(name));
+    response.header("Content-Disposition", String.format("attachment;filename=\"%s.json\"", name));
+    return response.build();
+  }
+
   @DELETE
   @Path("/storage/{name}.json")
   @Produces(MediaType.APPLICATION_JSON)
diff --git a/exec/java-exec/src/main/resources/rest/storage/list.ftl b/exec/java-exec/src/main/resources/rest/storage/list.ftl
index ca20063..7dfcf25 100644
--- a/exec/java-exec/src/main/resources/rest/storage/list.ftl
+++ b/exec/java-exec/src/main/resources/rest/storage/list.ftl
@@ -38,6 +38,7 @@
               <td style="border:none;">
                 <a class="btn btn-primary" href="/storage/${plugin.getName()}">Update</a>
                 <a class="btn btn-default" onclick="doEnable('${plugin.getName()}', false)">Disable</a>
+                <a class="btn btn-default" href="/storage/${plugin.getName()}/export"">Export</a>
               </td>
             </tr>
           </#if>
diff --git a/exec/java-exec/src/main/resources/rest/storage/update.ftl b/exec/java-exec/src/main/resources/rest/storage/update.ftl
index a15cc98..a30b65e 100644
--- a/exec/java-exec/src/main/resources/rest/storage/update.ftl
+++ b/exec/java-exec/src/main/resources/rest/storage/update.ftl
@@ -48,6 +48,7 @@
       <#else>
         <a id="enabled" class="btn btn-primary">Enable</a>
       </#if>
+      <a class="btn btn-default" href="/storage/${model.getName()}/export"">Export</a>
       <a id="del" class="btn btn-danger" onclick="deleteFunction()">Delete</a>
     </#if>
   </form>