You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/10/25 23:09:04 UTC

[drill] 08/08: DRILL-6381: Address code review comments (part 3).

This is an automated email from the ASF dual-hosted git repository.

amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7571d52eab9b961687df7d4fb845d0a297b228bb
Author: Aman Sinha <as...@maprtech.com>
AuthorDate: Sat Oct 13 23:38:17 2018 -0700

    DRILL-6381: Address code review comments (part 3).
    
    DRILL-6381: Add missing joinControl logic for INTERSECT_DISTINCT.
    
    - Modified HashJoin's probe phase to process INTERSECT_DISTINCT.
    
    - NOTE: For build phase, the functionality will be same as for SemiJoin when it is added later.
    
    DRILL-6381: Address code review comment for intersect_distinct.
    
    DRILL-6381: Rebase on latest master and fix compilation issues.
    
    DRILL-6381: Generate protobuf files for C++ native client.
    
    DRILL-6381: Use shaded Guava classes.  Add more comments and Javadoc.
---
 .../planner/index/MapRDBFunctionalIndexInfo.java   |  4 +-
 .../exec/planner/index/MapRDBIndexDiscover.java    | 12 ++---
 .../drill/exec/store/mapr/PluginConstants.java     |  3 ++
 .../exec/store/mapr/db/MapRDBFormatPlugin.java     |  6 ++-
 .../store/mapr/db/MapRDBPushFilterIntoScan.java    | 17 ++++---
 .../store/mapr/db/MapRDBPushLimitIntoScan.java     |  2 +-
 .../store/mapr/db/MapRDBPushProjectIntoScan.java   | 35 ++++++-------
 .../mapr/db/MapRDBRestrictedScanBatchCreator.java  |  4 +-
 .../db/json/JsonTableRangePartitionFunction.java   |  4 +-
 .../store/mapr/db/json/MaprDBJsonRecordReader.java |  2 +-
 .../mapr/db/json/RestrictedJsonTableGroupScan.java |  6 +--
 contrib/native/client/src/protobuf/BitData.pb.cc   | 55 ++++++++++++++++++---
 contrib/native/client/src/protobuf/BitData.pb.h    | 36 +++++++++++++-
 .../native/client/src/protobuf/UserBitShared.pb.cc | 57 +++++++++++-----------
 .../native/client/src/protobuf/UserBitShared.pb.h  |  7 +--
 .../exec/store/hbase/HBasePushFilterIntoScan.java  |  2 +-
 .../store/kafka/KafkaPushDownFilterIntoScan.java   |  4 +-
 .../store/mongo/MongoPushDownFilterForScan.java    |  2 +-
 .../apache/drill/exec/physical/impl/ScanBatch.java | 17 +++----
 .../exec/physical/impl/common/HashPartition.java   | 10 ++--
 .../physical/impl/join/HashJoinProbeTemplate.java  | 24 +++++++--
 .../exec/physical/impl/join/RowKeyJoinBatch.java   |  6 +++
 .../physical/impl/partitionsender/Partitioner.java |  3 ++
 .../RangePartitionRecordBatch.java                 |  7 +++
 .../exec/planner/common/DrillScanRelBase.java      |  2 +-
 .../index/InvalidIndexDefinitionException.java     |  4 ++
 .../generators/IndexIntersectPlanGenerator.java    | 18 +++----
 .../planner/logical/DrillMergeProjectRule.java     |  8 +++
 .../planner/physical/BroadcastExchangePrel.java    |  5 +-
 .../planner/physical/ConvertCountToDirectScan.java |  5 +-
 .../drill/exec/planner/physical/ScanPrel.java      |  9 ----
 .../drill/exec/planner/physical/ScanPrule.java     |  2 +-
 .../InfoSchemaPushFilterIntoRecordGenerator.java   |  2 +-
 .../exec/store/parquet/ParquetPushDownFilter.java  |  2 +-
 .../physical/impl/common/HashPartitionTest.java    |  2 +-
 35 files changed, 248 insertions(+), 136 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
index 67938f3..564a037 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.planner.index;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.drill.common.expression.CastExpression;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
index c231e11..aed3e04 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
@@ -100,7 +100,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
       for (IndexDesc idx : indexes) {
         DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx);
         if (hbaseIdx == null) {
-          //not able to build a valid index based on the index info from MFS
+          // not able to build a valid index based on the index info from MFS
           logger.error("Not able to build index for {}", idx.toString());
           continue;
         }
@@ -233,9 +233,9 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
   }
 
   private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException {
-    //get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax
+    // get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax
     String castTypeStr = getDrillTypeStr(type);
-    if(castTypeStr == null) {//no cast
+    if(castTypeStr == null) {  // no cast
       throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field);
     }
     try {
@@ -255,7 +255,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
   private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException {
     final String fieldName = desc.getFieldPath().asPathString();
     final String functionDef = desc.getFunctionName();
-    if ((functionDef != null)) {//this is a function
+    if ((functionDef != null)) {  // this is a function
       String[] tokens = functionDef.split("\\s+");
       if (tokens[0].equalsIgnoreCase("cast")) {
         if (tokens.length != 3) {
@@ -270,7 +270,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
         throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef);
       }
     }
-    //else it is a schemaPath
+    // else it is a schemaPath
     return fieldName2SchemaPath(fieldName);
   }
 
@@ -285,7 +285,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
 
   private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) {
     List<RelFieldCollation> fieldCollations = new ArrayList<>();
-    int i=0;
+    int i = 0;
     for (IndexFieldDesc field : descCollection) {
       RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ?
           RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ?
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
index 4239c5d..7a175a2 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/PluginConstants.java
@@ -53,6 +53,9 @@ public class PluginConstants {
 
   public static final int JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT = 32;
 
+  public static final int JSON_TABLE_SCAN_SIZE_MB_MIN = 32;
+  public static final int JSON_TABLE_SCAN_SIZE_MB_MAX = 8192;
+
   public static final String JSON_TABLE_SCAN_SIZE_MB = "format-maprdb.json.scanSizeMB";
   public static final int JSON_TABLE_SCAN_SIZE_MB_DEFAULT = 128;
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index 0d1bf04..fc8a057 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -66,13 +66,15 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
     connection = ConnectionFactory.createConnection(hbaseConf);
     jsonTableCache = new MapRDBTableCache(context.getConfig());
     int scanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_SCAN_SIZE_MB);
-    if (scanRangeSizeMBConfig < 32 || scanRangeSizeMBConfig > 8192) {
+    if (scanRangeSizeMBConfig < PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MIN ||
+        scanRangeSizeMBConfig > PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MAX) {
       logger.warn("Invalid scan size {} for MapR-DB tables, using default", scanRangeSizeMBConfig);
       scanRangeSizeMBConfig = PluginConstants.JSON_TABLE_SCAN_SIZE_MB_DEFAULT;
     }
 
     int restrictedScanRangeSizeMBConfig = context.getConfig().getInt(PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB);
-    if (restrictedScanRangeSizeMBConfig < 32 || restrictedScanRangeSizeMBConfig > 8192) {
+    if (restrictedScanRangeSizeMBConfig < PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MIN ||
+        restrictedScanRangeSizeMBConfig > PluginConstants.JSON_TABLE_SCAN_SIZE_MB_MAX) {
       logger.warn("Invalid restricted scan size {} for MapR-DB tables, using default", restrictedScanRangeSizeMBConfig);
       restrictedScanRangeSizeMBConfig = PluginConstants.JSON_TABLE_RESTRICTED_SCAN_SIZE_MB_DEFAULT;
     }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
index 511a111..a0f5536 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -51,8 +51,9 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
 
     @Override
     public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      final FilterPrel filter = (FilterPrel) call.rel(0);
+      final FilterPrel filter = call.rel(0);
+      final ScanPrel scan = call.rel(1);
+
       final RexNode condition = filter.getCondition();
 
       if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
@@ -80,9 +81,9 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
 
     @Override
     public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(2);
-      final ProjectPrel project = (ProjectPrel) call.rel(1);
-      final FilterPrel filter = (FilterPrel) call.rel(0);
+      final FilterPrel filter = call.rel(0);
+      final ProjectPrel project = call.rel(1);
+      final ScanPrel scan = call.rel(2);
 
       // convert the filter to one that references the child of the project
       final RexNode condition =  RelOptUtil.pushPastProject(filter.getCondition(), project);
@@ -134,13 +135,13 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
     final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp);
     final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree();
     if (newScanSpec == null) {
-      return; //no filter pushdown ==> No transformation.
+      return; // no filter pushdown ==> No transformation.
     }
 
     final JsonTableGroupScan newGroupsScan = (JsonTableGroupScan) groupScan.clone(newScanSpec);
     newGroupsScan.setFilterPushedDown(true);
 
-    final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
+    final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
 
     // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
     final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));
@@ -186,7 +187,7 @@ public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRul
                                                                         groupScan.getTableStats());
     newGroupsScan.setFilterPushedDown(true);
 
-    final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
+    final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
 
     // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
     final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
index a26bc80..28d59d0 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
@@ -47,8 +47,8 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
 
     @Override
     public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = call.rel(1);
       final LimitPrel limit = call.rel(0);
+      final ScanPrel scan = call.rel(1);
       doPushLimitIntoGroupScan(call, limit, null, scan, scan.getGroupScan());
     }
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
index 5215868..d8d0a2c 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.store.mapr.db;
 
-import com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
 import org.apache.calcite.plan.RelTrait;
@@ -30,18 +30,21 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.apache.drill.exec.planner.common.DrillRelOptUtil;
-import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
 import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
 import org.apache.drill.exec.util.Utilities;
 
 import java.util.List;
 
+/**
+ * Push a physical Project into Scan. Currently, this rule is only doing projection pushdown for MapRDB-JSON tables
+ * since it was needed for the secondary index feature which only applies to Json tables.
+ * For binary tables, note that the DrillPushProjectIntoScanRule is still applicable during the logical
+ * planning phase.
+ */
 public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRule {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushProjectIntoScan.class);
 
@@ -53,17 +56,10 @@ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRu
       RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushProjIntoScan:Proj_On_Scan") {
     @Override
     public void onMatch(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      final ProjectPrel project = (ProjectPrel) call.rel(0);
-      if (!(scan.getGroupScan() instanceof MapRDBGroupScan)) {
-        return;
-      }
-      doPushProjectIntoGroupScan(call, project, scan, (MapRDBGroupScan) scan.getGroupScan());
-      if (scan.getGroupScan() instanceof BinaryTableGroupScan) {
-        BinaryTableGroupScan groupScan = (BinaryTableGroupScan) scan.getGroupScan();
+      final ProjectPrel project = call.rel(0);
+      final ScanPrel scan = call.rel(1);
 
-      } else {
-        assert (scan.getGroupScan() instanceof JsonTableGroupScan);
+      if (scan.getGroupScan() instanceof JsonTableGroupScan) {
         JsonTableGroupScan groupScan = (JsonTableGroupScan) scan.getGroupScan();
 
         doPushProjectIntoGroupScan(call, project, scan, groupScan);
@@ -72,9 +68,10 @@ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRu
 
     @Override
     public boolean matches(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel) call.rel(1);
-      if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
-          scan.getGroupScan() instanceof JsonTableGroupScan) {
+      final ScanPrel scan = call.rel(1);
+
+      // See class level comments above for why only JsonGroupScan is considered
+      if (scan.getGroupScan() instanceof JsonTableGroupScan) {
         return super.matches(call);
       }
       return false;
@@ -82,12 +79,12 @@ public abstract class MapRDBPushProjectIntoScan extends StoragePluginOptimizerRu
   };
 
   protected void doPushProjectIntoGroupScan(RelOptRuleCall call,
-      ProjectPrel project, ScanPrel scan, MapRDBGroupScan groupScan) {
+      ProjectPrel project, ScanPrel scan, JsonTableGroupScan groupScan) {
     try {
 
       DrillRelOptUtil.ProjectPushInfo columnInfo =
           DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects());
-      if (columnInfo == null || Utilities.isStarQuery(columnInfo.getFields()) //
+      if (columnInfo == null || Utilities.isStarQuery(columnInfo.getFields())
           || !groupScan.canPushdownProjects(columnInfo.getFields())) {
         return;
       }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
index 89ce95d..cb3732a 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.store.mapr.db;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
index c0b73ee..ca508ca 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -30,8 +30,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import com.mapr.db.Table;
 import com.mapr.db.impl.ConditionImpl;
 import com.mapr.db.impl.IdCodec;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 63a9381..0be44e8 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -236,7 +236,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader {
           idOnly = (scannedFields == null);
         }
 
-        if(projectWholeDocument) {
+        if (projectWholeDocument) {
           projector = new FieldProjector(projectedFieldsSet);
         }
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
index 2f06d00..055c5a5 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
@@ -24,8 +24,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
@@ -57,7 +57,7 @@ public class RestrictedJsonTableGroupScan extends JsonTableGroupScan {
                             @JsonProperty("format") MapRDBFormatPlugin formatPlugin,
                             @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */
                             @JsonProperty("columns") List<SchemaPath> columns,
-                            @JsonProperty("")MapRDBStatistics statistics) {
+                            @JsonProperty("") MapRDBStatistics statistics) {
     super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics);
   }
 
diff --git a/contrib/native/client/src/protobuf/BitData.pb.cc b/contrib/native/client/src/protobuf/BitData.pb.cc
index b32509c..ddee323 100644
--- a/contrib/native/client/src/protobuf/BitData.pb.cc
+++ b/contrib/native/client/src/protobuf/BitData.pb.cc
@@ -99,13 +99,14 @@ void protobuf_AssignDesc_BitData_2eproto() {
       ::google::protobuf::MessageFactory::generated_factory(),
       sizeof(FragmentRecordBatch));
   RuntimeFilterBDef_descriptor_ = file->message_type(3);
-  static const int RuntimeFilterBDef_offsets_[6] = {
+  static const int RuntimeFilterBDef_offsets_[7] = {
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, query_id_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, major_fragment_id_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, minor_fragment_id_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, to_foreman_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, bloom_filter_size_in_bytes_),
     GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, probe_fields_),
+    GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(RuntimeFilterBDef, hj_op_id_),
   };
   RuntimeFilterBDef_reflection_ =
     new ::google::protobuf::internal::GeneratedMessageReflection(
@@ -177,16 +178,16 @@ void protobuf_AddDesc_BitData_2eproto() {
     " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022"
     "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de"
     "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013"
-    "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022"
+    "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022"
     "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022"
     "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag"
     "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo"
     "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f"
-    "ields\030\006 \003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n"
-    "\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020"
-    "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE"
-    "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD"
-    "ataH\001", 885);
+    "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005*n\n\007RpcType"
+    "\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n"
+    "\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n"
+    "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril"
+    "l.exec.protoB\007BitDataH\001", 903);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "BitData.proto", &protobuf_RegisterTypes);
   BitClientHandshake::default_instance_ = new BitClientHandshake();
@@ -1208,6 +1209,7 @@ const int RuntimeFilterBDef::kMinorFragmentIdFieldNumber;
 const int RuntimeFilterBDef::kToForemanFieldNumber;
 const int RuntimeFilterBDef::kBloomFilterSizeInBytesFieldNumber;
 const int RuntimeFilterBDef::kProbeFieldsFieldNumber;
+const int RuntimeFilterBDef::kHjOpIdFieldNumber;
 #endif  // !_MSC_VER
 
 RuntimeFilterBDef::RuntimeFilterBDef()
@@ -1231,6 +1233,7 @@ void RuntimeFilterBDef::SharedCtor() {
   major_fragment_id_ = 0;
   minor_fragment_id_ = 0;
   to_foreman_ = false;
+  hj_op_id_ = 0;
   ::memset(_has_bits_, 0, sizeof(_has_bits_));
 }
 
@@ -1273,6 +1276,7 @@ void RuntimeFilterBDef::Clear() {
     major_fragment_id_ = 0;
     minor_fragment_id_ = 0;
     to_foreman_ = false;
+    hj_op_id_ = 0;
   }
   bloom_filter_size_in_bytes_.Clear();
   probe_fields_.Clear();
@@ -1384,6 +1388,22 @@ bool RuntimeFilterBDef::MergePartialFromCodedStream(
           goto handle_uninterpreted;
         }
         if (input->ExpectTag(50)) goto parse_probe_fields;
+        if (input->ExpectTag(56)) goto parse_hj_op_id;
+        break;
+      }
+
+      // optional int32 hj_op_id = 7;
+      case 7: {
+        if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
+            ::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
+         parse_hj_op_id:
+          DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
+                   ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>(
+                 input, &hj_op_id_)));
+          set_has_hj_op_id();
+        } else {
+          goto handle_uninterpreted;
+        }
         if (input->ExpectAtEnd()) return true;
         break;
       }
@@ -1442,6 +1462,11 @@ void RuntimeFilterBDef::SerializeWithCachedSizes(
       6, this->probe_fields(i), output);
   }
 
+  // optional int32 hj_op_id = 7;
+  if (has_hj_op_id()) {
+    ::google::protobuf::internal::WireFormatLite::WriteInt32(7, this->hj_op_id(), output);
+  }
+
   if (!unknown_fields().empty()) {
     ::google::protobuf::internal::WireFormat::SerializeUnknownFields(
         unknown_fields(), output);
@@ -1487,6 +1512,11 @@ void RuntimeFilterBDef::SerializeWithCachedSizes(
       WriteStringToArray(6, this->probe_fields(i), target);
   }
 
+  // optional int32 hj_op_id = 7;
+  if (has_hj_op_id()) {
+    target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(7, this->hj_op_id(), target);
+  }
+
   if (!unknown_fields().empty()) {
     target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray(
         unknown_fields(), target);
@@ -1524,6 +1554,13 @@ int RuntimeFilterBDef::ByteSize() const {
       total_size += 1 + 1;
     }
 
+    // optional int32 hj_op_id = 7;
+    if (has_hj_op_id()) {
+      total_size += 1 +
+        ::google::protobuf::internal::WireFormatLite::Int32Size(
+          this->hj_op_id());
+    }
+
   }
   // repeated int32 bloom_filter_size_in_bytes = 5;
   {
@@ -1582,6 +1619,9 @@ void RuntimeFilterBDef::MergeFrom(const RuntimeFilterBDef& from) {
     if (from.has_to_foreman()) {
       set_to_foreman(from.to_foreman());
     }
+    if (from.has_hj_op_id()) {
+      set_hj_op_id(from.hj_op_id());
+    }
   }
   mutable_unknown_fields()->MergeFrom(from.unknown_fields());
 }
@@ -1611,6 +1651,7 @@ void RuntimeFilterBDef::Swap(RuntimeFilterBDef* other) {
     std::swap(to_foreman_, other->to_foreman_);
     bloom_filter_size_in_bytes_.Swap(&other->bloom_filter_size_in_bytes_);
     probe_fields_.Swap(&other->probe_fields_);
+    std::swap(hj_op_id_, other->hj_op_id_);
     std::swap(_has_bits_[0], other->_has_bits_[0]);
     _unknown_fields_.Swap(&other->_unknown_fields_);
     std::swap(_cached_size_, other->_cached_size_);
diff --git a/contrib/native/client/src/protobuf/BitData.pb.h b/contrib/native/client/src/protobuf/BitData.pb.h
index 8a0b60c..7ee0bc6 100644
--- a/contrib/native/client/src/protobuf/BitData.pb.h
+++ b/contrib/native/client/src/protobuf/BitData.pb.h
@@ -521,6 +521,13 @@ class RuntimeFilterBDef : public ::google::protobuf::Message {
   inline const ::google::protobuf::RepeatedPtrField< ::std::string>& probe_fields() const;
   inline ::google::protobuf::RepeatedPtrField< ::std::string>* mutable_probe_fields();
 
+  // optional int32 hj_op_id = 7;
+  inline bool has_hj_op_id() const;
+  inline void clear_hj_op_id();
+  static const int kHjOpIdFieldNumber = 7;
+  inline ::google::protobuf::int32 hj_op_id() const;
+  inline void set_hj_op_id(::google::protobuf::int32 value);
+
   // @@protoc_insertion_point(class_scope:exec.bit.data.RuntimeFilterBDef)
  private:
   inline void set_has_query_id();
@@ -531,6 +538,8 @@ class RuntimeFilterBDef : public ::google::protobuf::Message {
   inline void clear_has_minor_fragment_id();
   inline void set_has_to_foreman();
   inline void clear_has_to_foreman();
+  inline void set_has_hj_op_id();
+  inline void clear_has_hj_op_id();
 
   ::google::protobuf::UnknownFieldSet _unknown_fields_;
 
@@ -538,11 +547,12 @@ class RuntimeFilterBDef : public ::google::protobuf::Message {
   ::google::protobuf::int32 major_fragment_id_;
   ::google::protobuf::int32 minor_fragment_id_;
   ::google::protobuf::RepeatedField< ::google::protobuf::int32 > bloom_filter_size_in_bytes_;
-  ::google::protobuf::RepeatedPtrField< ::std::string> probe_fields_;
   bool to_foreman_;
+  ::google::protobuf::int32 hj_op_id_;
+  ::google::protobuf::RepeatedPtrField< ::std::string> probe_fields_;
 
   mutable int _cached_size_;
-  ::google::protobuf::uint32 _has_bits_[(6 + 31) / 32];
+  ::google::protobuf::uint32 _has_bits_[(7 + 31) / 32];
 
   friend void  protobuf_AddDesc_BitData_2eproto();
   friend void protobuf_AssignDesc_BitData_2eproto();
@@ -1043,6 +1053,28 @@ RuntimeFilterBDef::mutable_probe_fields() {
   return &probe_fields_;
 }
 
+// optional int32 hj_op_id = 7;
+inline bool RuntimeFilterBDef::has_hj_op_id() const {
+  return (_has_bits_[0] & 0x00000040u) != 0;
+}
+inline void RuntimeFilterBDef::set_has_hj_op_id() {
+  _has_bits_[0] |= 0x00000040u;
+}
+inline void RuntimeFilterBDef::clear_has_hj_op_id() {
+  _has_bits_[0] &= ~0x00000040u;
+}
+inline void RuntimeFilterBDef::clear_hj_op_id() {
+  hj_op_id_ = 0;
+  clear_has_hj_op_id();
+}
+inline ::google::protobuf::int32 RuntimeFilterBDef::hj_op_id() const {
+  return hj_op_id_;
+}
+inline void RuntimeFilterBDef::set_hj_op_id(::google::protobuf::int32 value) {
+  set_has_hj_op_id();
+  hj_op_id_ = value;
+}
+
 
 // @@protoc_insertion_point(namespace_scope)
 
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.cc b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
index d0e4aa5..8bb6e07 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.cc
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.cc
@@ -750,40 +750,40 @@ void protobuf_AddDesc_UserBitShared_2eproto() {
     "TATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020"
     "\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022"
     "\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005"
-    "\022\032\n\026CANCELLATION_REQUESTED\020\006*\367\010\n\020CoreOpe"
+    "\022\032\n\026CANCELLATION_REQUESTED\020\006*\222\t\n\020CoreOpe"
     "ratorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAS"
     "T_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE"
     "\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HAS"
     "H_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGI"
     "NG_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDE"
     "R\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013"
-    "\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECT"
-    "ION_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREG"
-    "ATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021"
-    "\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026"
-    "PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCA"
-    "N\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_S"
-    "CAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_"
-    "SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN"
-    "\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB"
-    "_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER"
-    "_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDO"
-    "W\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SC"
-    "AN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAFKA_SUB_SCA"
-    "N\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLATTEN\020(\022\020\n\014L"
-    "ATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(HIVE_DRILL_"
-    "NATIVE_PARQUET_ROW_GROUP_SCAN\020+\022\r\n\tJDBC_"
-    "SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017MAPRDB_SUB"
-    "_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n\013KUDU_WRIT"
-    "ER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017\n\013JSON_WRI"
-    "TER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022\022\n\016IMAGE_S"
-    "UB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PART"
-    "ITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RU"
-    "NTIME_FILTER\0208*g\n\nSaslStatus\022\020\n\014SASL_UNK"
-    "NOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRE"
-    "SS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B"
-    ".\n\033org.apache.drill.exec.protoB\rUserBitS"
-    "haredH\001", 5447);
+    "\022\032\n\026RANGE_PARTITION_SENDER\020\014\022\n\n\006SCREEN\020\r"
+    "\022\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAM"
+    "ING_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTER"
+    "NAL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_"
+    "SORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHI"
+    "VE_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\r"
+    "MOCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017D"
+    "IRECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEX"
+    "T_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_"
+    "SCHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025"
+    "\n\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020"
+    "!\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rA"
+    "VRO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%\022\022\n\016KAF"
+    "KA_SUB_SCAN\020&\022\021\n\rKUDU_SUB_SCAN\020\'\022\013\n\007FLAT"
+    "TEN\020(\022\020\n\014LATERAL_JOIN\020)\022\n\n\006UNNEST\020*\022,\n(H"
+    "IVE_DRILL_NATIVE_PARQUET_ROW_GROUP_SCAN\020"
+    "+\022\r\n\tJDBC_SCAN\020,\022\022\n\016REGEX_SUB_SCAN\020-\022\023\n\017"
+    "MAPRDB_SUB_SCAN\020.\022\022\n\016MONGO_SUB_SCAN\020/\022\017\n"
+    "\013KUDU_WRITER\0200\022\026\n\022OPEN_TSDB_SUB_SCAN\0201\022\017"
+    "\n\013JSON_WRITER\0202\022\026\n\022HTPPD_LOG_SUB_SCAN\0203\022"
+    "\022\n\016IMAGE_SUB_SCAN\0204\022\025\n\021SEQUENCE_SUB_SCAN"
+    "\0205\022\023\n\017PARTITION_LIMIT\0206\022\023\n\017PCAPNG_SUB_SC"
+    "AN\0207\022\022\n\016RUNTIME_FILTER\0208\022\017\n\013ROWKEY_JOIN\020"
+    "9*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSAS"
+    "L_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_"
+    "SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache"
+    ".drill.exec.protoB\rUserBitSharedH\001", 5474);
   ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
     "UserBitShared.proto", &protobuf_RegisterTypes);
   UserCredentials::default_instance_ = new UserCredentials();
@@ -961,6 +961,7 @@ bool CoreOperatorType_IsValid(int value) {
     case 54:
     case 55:
     case 56:
+    case 57:
       return true;
     default:
       return false;
diff --git a/contrib/native/client/src/protobuf/UserBitShared.pb.h b/contrib/native/client/src/protobuf/UserBitShared.pb.h
index 8494857..ab3063d 100644
--- a/contrib/native/client/src/protobuf/UserBitShared.pb.h
+++ b/contrib/native/client/src/protobuf/UserBitShared.pb.h
@@ -216,7 +216,7 @@ enum CoreOperatorType {
   ORDERED_PARTITION_SENDER = 9,
   PROJECT = 10,
   UNORDERED_RECEIVER = 11,
-  RANGE_SENDER = 12,
+  RANGE_PARTITION_SENDER = 12,
   SCREEN = 13,
   SELECTION_VECTOR_REMOVER = 14,
   STREAMING_AGGREGATE = 15,
@@ -260,11 +260,12 @@ enum CoreOperatorType {
   SEQUENCE_SUB_SCAN = 53,
   PARTITION_LIMIT = 54,
   PCAPNG_SUB_SCAN = 55,
-  RUNTIME_FILTER = 56
+  RUNTIME_FILTER = 56,
+  ROWKEY_JOIN = 57
 };
 bool CoreOperatorType_IsValid(int value);
 const CoreOperatorType CoreOperatorType_MIN = SINGLE_SENDER;
-const CoreOperatorType CoreOperatorType_MAX = RUNTIME_FILTER;
+const CoreOperatorType CoreOperatorType_MAX = ROWKEY_JOIN;
 const int CoreOperatorType_ARRAYSIZE = CoreOperatorType_MAX + 1;
 
 const ::google::protobuf::EnumDescriptor* CoreOperatorType_descriptor();
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 91ca787..3faa089 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -122,7 +122,7 @@ public abstract class HBasePushFilterIntoScan extends StoragePluginOptimizerRule
         newScanSpec, groupScan.getColumns());
     newGroupsScan.setFilterPushedDown(true);
 
-    final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
+    final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
 
     // Depending on whether is a project in the middle, assign either scan or copy of project to childRel.
     final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of(newScanPrel));
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
index 019a67e..002d043 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
@@ -66,8 +66,8 @@ public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule {
 
     logger.info("Partitions ScanSpec after pushdown: " + newScanSpec);
     GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec);
-    final ScanPrel newScanPrel = 
-      new ScanPrel(scan, filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
+    final ScanPrel newScanPrel =
+      new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
     call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(newScanPrel)));
   }
 
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
index 8ad84c1..be157b4 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoPushDownFilterForScan.java
@@ -77,7 +77,7 @@ public class MongoPushDownFilterForScan extends StoragePluginOptimizerRule {
     }
     newGroupsScan.setFilterPushedDown(true);
 
-    final ScanPrel newScanPrel = new ScanPrel(scan, filter.getTraitSet(),
+    final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(),
         newGroupsScan, scan.getRowType(), scan.getTable());
 
     if (mongoFilterBuilder.isAllExpressionsConverted()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 70b2852..a688f37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -182,11 +182,11 @@ public class ScanBatch implements CloseableRecordBatch {
     if(isRepeatableScan) {
       readers = readerList.iterator();
       return IterOutcome.NONE;
-    } 
-    else {
+    } else {
       releaseAssets(); // All data has been read. Release resource.
       done = true;
-      return IterOutcome.NONE;}
+      return IterOutcome.NONE;
+    }
   }
 
   /**
@@ -204,11 +204,10 @@ public class ScanBatch implements CloseableRecordBatch {
         return false;
       }
       return true;
-    }
-    else {// Regular scan
+    } else { // Regular scan
       currentReader.close();
       currentReader = null;
-      return true;// In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop
+      return true; // In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop
     }
   }
 
@@ -283,7 +282,7 @@ public class ScanBatch implements CloseableRecordBatch {
         }
       }
       lastOutcome = IterOutcome.STOP;
-      throw UserException.systemError(e)
+      throw UserException.internalError(e)
           .addContext("Setup failed for", currentReaderClassName)
           .build(logger);
     } catch (UserException ex) {
@@ -291,7 +290,7 @@ public class ScanBatch implements CloseableRecordBatch {
       throw ex;
     } catch (Exception ex) {
       lastOutcome = IterOutcome.STOP;
-      throw UserException.systemError(ex).build(logger);
+      throw UserException.internalError(ex).build(logger);
     } finally {
       oContext.getStats().stopProcessing();
     }
@@ -334,7 +333,7 @@ public class ScanBatch implements CloseableRecordBatch {
       }
     } catch(SchemaChangeException e) {
       // No exception should be thrown here.
-      throw UserException.systemError(e)
+      throw UserException.internalError(e)
           .addContext("Failure while allocating implicit vectors")
           .build(logger);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 1f7da38..86b870d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -378,7 +378,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
   public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException {
     return hashTable.probeForKey(recordsProcessed, hashCode);
   }
-  public int getStartIndex(int probeIndex) {
+  public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
     /* The current probe record has a key that matches. Get the index
      * of the first row in the build side that matches the current key
      */
@@ -387,15 +387,15 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
      * side. Set the bit corresponding to this index so if we are doing a FULL or RIGHT
      * join we keep track of which records we need to project at the end
      */
-    hjHelper.setRecordMatched(compositeIndex);
-    return compositeIndex;
+    boolean matchExists = hjHelper.setRecordMatched(compositeIndex);
+    return Pair.of(compositeIndex, matchExists);
   }
   public int getNextIndex(int compositeIndex) {
     // in case of iner rows with duplicate keys, get the next one
     return hjHelper.getNextIndex(compositeIndex);
   }
-  public void setRecordMatched(int compositeIndex) {
-    hjHelper.setRecordMatched(compositeIndex);
+  public boolean setRecordMatched(int compositeIndex) {
+    return hjHelper.setRecordMatched(compositeIndex);
   }
   public List<Integer> getNextUnmatchedIndex() {
     return hjHelper.getNextUnmatchedIndex();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c16812e..57b2d5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -47,8 +48,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
   // Join type, INNER, LEFT, RIGHT or OUTER
   private JoinRelType joinType;
 
-  //joinControl object derived from the int type joinControl passed from outgoingBatch(HashJoinBatch)
-  //so we can do different things in hashtable for INTERSECT_DISTINCT and INTERSECT_ALL
+  // joinControl determines how to handle INTERSECT_DISTINCT vs. INTERSECT_ALL
   private JoinControl joinControl;
 
   private HashJoinBatch outgoingJoinBatch = null;
@@ -325,16 +325,30 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
            * of the first row in the build side that matches the current key
            * (and record this match in the bitmap, in case of a FULL/RIGHT join)
            */
-          currentCompositeIdx = currPartition.getStartIndex(probeIndex);
+          Pair<Integer, Boolean> matchStatus = currPartition.getStartIndex(probeIndex);
+
+          boolean matchExists = matchStatus.getRight();
+
+          if (joinControl.isIntersectDistinct() && matchExists) {
+            // since it is intersect distinct and we already have one record matched, move to next probe row
+            recordsProcessed++;
+            continue;
+          }
+
+          currentCompositeIdx = matchStatus.getLeft();
 
           outputRecords =
             outputRow(currPartition.getContainers(), currentCompositeIdx,
               probeBatch.getContainer(), recordsProcessed);
 
           /* Projected single row from the build side with matching key but there
-           * may be more rows with the same key. Check if that's the case
+           * may be more rows with the same key. Check if that's the case as long as
+           * we are not doing intersect distinct since it only cares about
+           * distinct values.
            */
-          currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
+          currentCompositeIdx = joinControl.isIntersectDistinct() ? -1 :
+            currPartition.getNextIndex(currentCompositeIdx);
+
           if (currentCompositeIdx == -1) {
             /* We only had one row in the build side that matched the current key
              * from the probe side. Drain the next row in the probe side.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
index 3b5566b..941f321 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoinBatch.java
@@ -281,4 +281,10 @@ public class RowKeyJoinBatch extends AbstractRecordBatch<RowKeyJoinPOP> implemen
     super.close();
   }
 
+  @Override
+  public void dump() {
+    logger.error("RowKeyJoinBatch[container={}, left={}, right={}, hasRowKeyBatch={}, rkJoinState={}]",
+        container, left, right, hasRowKeyBatch, rkJoinState);
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 2e2e760..a2fc069 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -29,6 +29,9 @@ import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.record.RecordBatch;
 
 public interface Partitioner {
+  // Keep the recordCount as (2^x) - 1 to better utilize the memory allocation in ValueVectors; however
+  // other criteria such as batch sizing in terms of actual MBytes rather than record count could also be applied
+  // by the operator.
   int DEFAULT_RECORD_BATCH_SIZE = (1 << 10) - 1;
 
   void setup(ExchangeFragmentContext context,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
index d8fc94d..6a54828 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/rangepartitioner/RangePartitionRecordBatch.java
@@ -183,4 +183,11 @@ public class RangePartitionRecordBatch extends AbstractSingleRecordBatch<RangePa
     return counter;
   }
 
+
+  @Override
+  public void dump() {
+    logger.error("RangePartitionRecordBatch[container={}, numPartitions={}, recordCount={}, partitionIdVector={}]",
+        container, numPartitions, recordCount, partitionIdVector);
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index 6f8ee0e..6409c8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.util.Utilities;
  * Base class for logical/physical scan rel implemented in Drill.
  */
 public abstract class DrillScanRelBase extends TableScan implements DrillRelNode {
-  private GroupScan groupScan;
+  protected GroupScan groupScan;
   protected final DrillTable drillTable;
 
   public DrillScanRelBase(RelOptCluster cluster,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
index c17d09f..518aa67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
@@ -19,6 +19,10 @@ package org.apache.drill.exec.planner.index;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 
+/**
+ * An InvalidIndexDefinitionException may be thrown if Drill does not recognize the
+ * type or expression of the index during the index discovery phase
+ */
 public class InvalidIndexDefinitionException extends DrillRuntimeException {
   public InvalidIndexDefinitionException(String message) {
     super(message);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
index b380c28..11d7358 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
@@ -94,7 +94,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
         RelOptUtil.createEquiJoinCondition(left, leftJoinKeys,
             right, rightJoinKeys, builder);
 
-    if (isRowKeyJoin == true) {
+    if (isRowKeyJoin) {
       RelNode newRel;
       if (settings.isIndexUseHashJoinNonCovering()) {
         HashJoinPrel hjPrel = new HashJoinPrel(left.getCluster(), left.getTraitSet(), left,
@@ -109,7 +109,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
       // since there is a restricted Scan on left side, assume original project
       return buildOriginalProject(newRel);
     } else {
-      //there is no restricted scan on left, do a regular rowkey join
+      // there is no restricted scan on left, do a regular rowkey join
       HashJoinPrel hjPrel = new HashJoinPrel(left.getCluster(), left.getTraitSet(), left,
           right, joinCondition, JoinRelType.INNER, false /* no swap */, null /* no runtime filter */,
           isRowKeyJoin, htControl);
@@ -185,7 +185,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
         indexScanRowType, builder, functionInfo));
     // project the rowkey column from the index scan
     List<RexNode> indexProjectExprs = Lists.newArrayList();
-    int rowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), origScan);//indexGroupScan.getRowKeyOrdinal();
+    int rowKeyIndex = getRowKeyIndex(indexScanPrel.getRowType(), origScan);
     assert rowKeyIndex >= 0;
 
     indexProjectExprs.add(RexInputRef.of(rowKeyIndex, indexScanPrel.getRowType()));
@@ -204,7 +204,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
         indexFilterPrel, indexProjectExprs, indexProjectRowType);
 
     RelTraitSet rightSideTraits = newTraitSet().plus(Prel.DRILL_PHYSICAL);
-    //if build(right) side does not exist, this index scan is the right most.
+    // if build(right) side does not exist, this index scan is the right most.
     if (right == null) {
       if (partition == DrillDistributionTrait.RANDOM_DISTRIBUTED &&
           settings.getSliceTarget() < indexProjectPrel.getRows()) {
@@ -220,7 +220,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
       return converted;
     }
 
-    //if build(right) side exist, the plan we got in 'converted' is left (probe). Intersect with right(build) side
+    // if build(right) side exist, the plan we got in 'converted' is left (probe). Intersect with right(build) side
     RelNode finalRel = buildRowKeyJoin(converted, right, false, JoinControl.INTERSECT_DISTINCT);
 
     if (generateDistribution &&
@@ -310,7 +310,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
   public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException {
     Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap();
     boolean isAnyIndexAsync = false;
-    for(IndexDescriptor idx : indexInfoMap.keySet()) {
+    for (IndexDescriptor idx : indexInfoMap.keySet()) {
       idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition);
       if (!isAnyIndexAsync && idx.isAsyncIndex()) {
         isAnyIndexAsync = true;
@@ -322,7 +322,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
     int curIdx = 0;
     RexNode remnant = indexContext.getFilterCondition();
     for (Map.Entry<IndexDescriptor, RexNode> pair : idxConditionMap.entrySet()) {
-      //For the last index, the generated join is distributed using createRangeDistRight instead!
+      // For the last index, the generated join is distributed using createRangeDistRight instead!
       generateDistribution = (idxConditionMap.entrySet().size()-1-curIdx) > 0;
       indexPlan = buildIntersectPlan(pair, indexPlan, generateDistribution);
       remnant = indexInfoMap.get(pair.getKey()).remainderCondition;
@@ -333,12 +333,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
     final RelNode rangeDistRight = createRangeDistRight(indexPlan, rightRowKeyField,
         (DbGroupScan)IndexPlanUtils.getGroupScan(origScan));
 
-    //now with index plan constructed, build plan of left(probe) side to use restricted db scan
+    // now with index plan constructed, build plan of left(probe) side to use restricted db scan
 
     Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync);
 
     RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT);
-    if ( upperProject != null) {
+    if (upperProject != null) {
       ProjectPrel cap = new ProjectPrel(finalRel.getCluster(), finalRel.getTraitSet(),
           finalRel, IndexPlanUtils.getProjects(upperProject), upperProject.getRowType());
       finalRel = cap;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
index 3a3c409..10f9567 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java
@@ -169,6 +169,14 @@ public class DrillMergeProjectRule extends RelOptRule {
     return list;
   }
 
+  /**
+   * The purpose of the replace() method is to allow the caller to replace a 'top' and 'bottom' project with
+   * a single merged project with the assumption that caller knows exactly the semantics/correctness of merging
+   * the two projects. This is not applying the full fledged DrillMergeProjectRule.
+   * @param topProject
+   * @param bottomProject
+   * @return new project after replacement
+   */
   public static Project replace(Project topProject, Project bottomProject) {
     final List<RexNode> newProjects =
         RelOptUtil.pushPastProject(topProject.getProjects(), bottomProject);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
index 1ae375b..0155734 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/BroadcastExchangePrel.java
@@ -58,7 +58,10 @@ public class BroadcastExchangePrel extends ExchangePrel{
     final int  rowWidth = child.getRowType().getFieldCount() * DrillCostBase.AVG_FIELD_WIDTH;
     final double cpuCost = broadcastFactor * DrillCostBase.SVR_CPU_COST * inputRows;
 
-    //we assume localhost network cost is 1/10 of regular network cost
+    // We assume localhost network cost is 1/10 of regular network cost
+    //  ( c * num_bytes * (N - 1) ) + ( c * num_bytes * 0.1)
+    // = c * num_bytes * (N - 0.9)
+    // TODO: a similar adjustment should be made to HashExchangePrel
     final double networkCost = broadcastFactor * DrillCostBase.BYTE_NETWORK_COST * inputRows * rowWidth * (numEndPoints - 0.9);
 
     return new DrillCostBase(inputRows, cpuCost, 0, networkCost);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
index 5204495..d903165 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ConvertCountToDirectScan.java
@@ -128,8 +128,8 @@ public class ConvertCountToDirectScan extends Prule {
     final ScanStats scanStats = new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, 1, 1, scanRowType.getFieldCount());
     final GroupScan directScan = new MetadataDirectGroupScan(reader, oldGrpScan.getFiles(), scanStats);
 
-    final ScanPrel newScan = new ScanPrel(scan, scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), 
-        directScan, scanRowType);
+    final DirectScanPrel newScan = new DirectScanPrel(scan.getCluster(),
+        scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), directScan, scanRowType);
 
     final ProjectPrel newProject = new ProjectPrel(agg.getCluster(), agg.getTraitSet().plus(Prel.DRILL_PHYSICAL)
         .plus(DrillDistributionTrait.SINGLETON), newScan, prepareFieldExpressions(scanRowType), agg.getRowType());
@@ -158,7 +158,6 @@ public class ConvertCountToDirectScan extends Prule {
 
     for (int i = 0; i < agg.getAggCallList().size(); i++) {
       AggregateCall aggCall = agg.getAggCallList().get(i);
-    //for (AggregateCall aggCall : agg.getAggCallList()) {
       long cnt;
 
       // rule can be applied only for count function, return empty counts
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index 4db5ba2..8a46f86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -48,15 +48,6 @@ public class ScanPrel extends DrillScanRelBase implements Prel, HasDistributionA
 
   private final RelDataType rowType;
 
-  public ScanPrel(RelNode old, RelTraitSet traitSets, GroupScan scan, RelDataType rowType) {
-    this(old.getCluster(), traitSets, scan, rowType);
-  }
-
-  public ScanPrel(RelOptCluster cluster, RelTraitSet traits, GroupScan groupScan, RelDataType rowType) {
-    super(cluster, traits);
-    this.groupScan = getCopy(groupScan);
-  }
-
   public ScanPrel(RelOptCluster cluster, RelTraitSet traits,
                   GroupScan groupScan, RelDataType rowType, RelOptTable table) {
     super(cluster, traits, getCopy(groupScan), table);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index 6ee05eb..b4fada6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -44,7 +44,7 @@ public class ScanPrule extends Prule{
 
     final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition);
 
-    final ScanPrel newScan = new ScanPrel(scan, traits, groupScan, scan.getRowType(), scan.getTable());
+    final ScanPrel newScan = new ScanPrel(scan.getCluster(), traits, groupScan, scan.getRowType(), scan.getTable());
 
     call.transformTo(newScan);
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java
index 92332c3..44d2394 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java
@@ -99,7 +99,7 @@ public abstract class InfoSchemaPushFilterIntoRecordGenerator extends StoragePlu
     final InfoSchemaGroupScan newGroupsScan = new InfoSchemaGroupScan(groupScan.getTable(), infoSchemaFilter);
     newGroupsScan.setFilterPushedDown(true);
 
-    RelNode input = new ScanPrel(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
+    RelNode input = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
     if (project != null) {
       input = project.copy(project.getTraitSet(), input, project.getProjects(), filter.getRowType());
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index a8b981c..6efd44d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -167,7 +167,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
       return;
     }
 
-    RelNode newScan = new ScanPrel(scan, scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
+    RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
 
     if (project != null) {
       newScan = project.copy(project.getTraitSet(), ImmutableList.of(newScan));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
index aafcb4d..e2f80d8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/common/HashPartitionTest.java
@@ -132,7 +132,7 @@ public class HashPartitionTest {
 
         {
           int compositeIndex = hashPartition.probeForKey(1, 12);
-          int startIndex = hashPartition.getStartIndex(compositeIndex);
+          int startIndex = hashPartition.getStartIndex(compositeIndex).getLeft();
           int nextIndex = hashPartition.getNextIndex(startIndex);
 
           Assert.assertEquals(2, startIndex);