You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/10/25 23:09:00 UTC

[drill] 04/08: DRILL-6381: (Part 4) Enhance MapR-DB plugin to support querying secondary indexes

This is an automated email from the ASF dual-hosted git repository.

amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 2a9e51f8a68872a77e38ee91be107868f60fd334
Author: rebase <bu...@mapr.com>
AuthorDate: Fri Mar 16 14:24:20 2018 -0700

    DRILL-6381: (Part 4) Enhance MapR-DB plugin to support querying secondary indexes
    
      1. Implementation of the index descriptor for MapR-DB.
      2. MapR-DB specific costing for covering and non-covering indexes.
      3. Discovery componenent to discover the indexes available for a MapR-DB table including CAST functional indexes.
      4. Utility functions to build a canonical index descriptor.
      5. Statistics: fetch and initialize statistcs from MapR-DB for a query condition. Maintain a query-scoped cache for the statistics. Utility functions to compute selectivity.
      6. Range Partitioning: partitioning function that takes into account the tablet map to find out where a particular rowkey belongs.
      7. Restricted Scan: support doing restricted (i.e skip) scan through lookups on the rowkey. Added a group-scan and record reader for this.
      8. MD-3726: Simple Order by queries (without limit) when an index is used are showing regression.
      9. MD-3995: Do not pushdown limit 0 past project with CONVERT_FROMJSON
      10. MD-4259 : Account for limit during hashcode computation
    
    Co-authored-by: Aman Sinha <as...@maprtech.com>
    Co-authored-by: chunhui-shi <cs...@maprtech.com>
    Co-authored-by: Gautam Parai <gp...@maprtech.com>
    Co-authored-by: Padma Penumarthy <pp...@yahoo.com>
    Co-authored-by: Hanumath Rao Maduri <hm...@maprtech.com>
    
    Conflicts:
    	contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
    	contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
    	contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
    	exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
    	exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
    	exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
    
    Fix additional compilation issues.
---
 .../exec/planner/index/MapRDBIndexDescriptor.java  |  222 +++
 .../exec/planner/index/MapRDBIndexDiscover.java    |  374 +++++
 .../drill/exec/planner/index/MapRDBStatistics.java |  689 +++++++-
 .../exec/store/mapr/db/MapRDBFormatMatcher.java    |   42 +
 .../exec/store/mapr/db/MapRDBFormatPlugin.java     |    2 +-
 .../drill/exec/store/mapr/db/MapRDBGroupScan.java  |   12 +-
 .../store/mapr/db/MapRDBPushLimitIntoScan.java     |   50 +-
 .../store/mapr/db/MapRDBPushProjectIntoScan.java   |    3 +-
 .../store/mapr/db/json/JsonTableGroupScan.java     |  198 ++-
 .../db/json/JsonTableRangePartitionFunction.java   |  237 +++
 .../mapr/db/json/RestrictedJsonTableGroupScan.java |  184 +++
 .../maprdb/tests/index/IndexHintPlanTest.java      |  171 ++
 .../drill/maprdb/tests/index/IndexPlanTest.java    | 1715 ++++++++++++++++++++
 .../drill/maprdb/tests/index/LargeTableGen.java    |  176 ++
 .../maprdb/tests/index/LargeTableGenBase.java      |  186 +++
 .../drill/maprdb/tests/index/StatisticsTest.java   |  115 ++
 .../drill/maprdb/tests/index/TableIndexCmd.java    |  127 ++
 .../drill/exec/planner/common/OrderedRel.java      |   53 +
 .../drill/exec/planner/index/IndexCallContext.java |    4 +-
 .../planner/index/IndexLogicalPlanCallContext.java |    4 +-
 .../index/IndexPhysicalPlanCallContext.java        |   12 +-
 .../drill/exec/planner/index/IndexPlanUtils.java   |   17 +
 .../generators/AbstractIndexPlanGenerator.java     |   49 +-
 .../generators/CoveringPlanNoFilterGenerator.java  |   16 +-
 .../generators/NonCoveringIndexPlanGenerator.java  |    2 +-
 .../planner/index/rules/DbScanSortRemovalRule.java |   53 +-
 .../drill/exec/planner/logical/DrillSortRel.java   |   17 +-
 .../drill/exec/planner/physical/SortPrel.java      |   28 +-
 .../drill/exec/planner/physical/SortPrule.java     |   16 +-
 .../drill/exec/planner/physical/TopNPrel.java      |   31 +-
 .../visitor/ExcessiveExchangeIdentifier.java       |   28 +
 31 files changed, 4746 insertions(+), 87 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java
new file mode 100644
index 0000000..a57f5b5
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDescriptor.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.expr.CloneVisitor;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.index.IndexProperties;
+import org.apache.drill.exec.store.mapr.PluginConstants;
+import org.apache.drill.exec.util.EncodedSchemaPathSet;
+import org.apache.drill.common.expression.LogicalExpression;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+public class MapRDBIndexDescriptor extends DrillIndexDescriptor {
+
+  protected final Object desc;
+  protected final Set<LogicalExpression> allFields;
+  protected final Set<LogicalExpression> indexedFields;
+  protected MapRDBFunctionalIndexInfo functionalInfo;
+  protected PluginCost pluginCost;
+
+  public MapRDBIndexDescriptor(List<LogicalExpression> indexCols,
+                               CollationContext indexCollationContext,
+                               List<LogicalExpression> nonIndexCols,
+                               List<LogicalExpression> rowKeyColumns,
+                               String indexName,
+                               String tableName,
+                               IndexType type,
+                               Object desc,
+                               DbGroupScan scan,
+                               NullDirection nullsDirection) {
+    super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection);
+    this.desc = desc;
+    this.indexedFields = ImmutableSet.copyOf(indexColumns);
+    this.allFields = new ImmutableSet.Builder<LogicalExpression>()
+        .add(PluginConstants.DOCUMENT_SCHEMA_PATH)
+        .addAll(indexColumns)
+        .addAll(nonIndexColumns)
+        .build();
+    this.pluginCost = scan.getPluginCostModel();
+  }
+
+  public Object getOriginalDesc(){
+    return desc;
+  }
+
+  @Override
+  public boolean isCoveringIndex(List<LogicalExpression> expressions) {
+    List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions);
+    return columnsInIndexFields(decodedCols, allFields);
+  }
+
+  @Override
+  public boolean allColumnsIndexed(Collection<LogicalExpression> expressions) {
+    List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions);
+    return columnsInIndexFields(decodedCols, indexedFields);
+  }
+
+  @Override
+  public boolean someColumnsIndexed(Collection<LogicalExpression> columns) {
+    return columnsIndexed(columns, false);
+  }
+
+  private boolean columnsIndexed(Collection<LogicalExpression> expressions, boolean allColsIndexed) {
+    List<LogicalExpression> decodedCols = new DecodePathinExpr().parseExpressions(expressions);
+    if (allColsIndexed) {
+      return columnsInIndexFields(decodedCols, indexedFields);
+    } else {
+      return someColumnsInIndexFields(decodedCols, indexedFields);
+    }
+  }
+
+  public FunctionalIndexInfo getFunctionalInfo() {
+    if (this.functionalInfo == null) {
+      this.functionalInfo = new MapRDBFunctionalIndexInfo(this);
+    }
+    return this.functionalInfo;
+  }
+
+  /**
+   * Search through a LogicalExpression, finding all referenced schema paths
+   * and replace them with decoded paths.
+   * If one encoded path could be decoded to multiple paths, add these decoded paths to
+   * the end of returned list of expressions from parseExpressions.
+   */
+  private class DecodePathinExpr extends CloneVisitor {
+    Set<SchemaPath> schemaPathSet = Sets.newHashSet();
+
+    public List<LogicalExpression> parseExpressions(Collection<LogicalExpression> expressions) {
+      List<LogicalExpression> allCols = Lists.newArrayList();
+      Collection<SchemaPath> decoded;
+
+      for(LogicalExpression expr : expressions) {
+        LogicalExpression nonDecoded = expr.accept(this, null);
+        if(nonDecoded != null) {
+          allCols.add(nonDecoded);
+        }
+      }
+
+      if (schemaPathSet.size() > 0) {
+        decoded = EncodedSchemaPathSet.decode(schemaPathSet);
+        allCols.addAll(decoded);
+      }
+      return allCols;
+    }
+
+    @Override
+    public LogicalExpression visitSchemaPath(SchemaPath path, Void value) {
+      if (EncodedSchemaPathSet.isEncodedSchemaPath(path)) {
+        // if decoded size is not one, incoming path is encoded path thus there is no cast or other function applied on it,
+        // since users won't pass in encoded fields, so it is safe to return null,
+        schemaPathSet.add(path);
+        return null;
+      } else {
+        return path;
+      }
+    }
+
+  }
+
+  @Override
+  public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+      int numProjectedFields, GroupScan primaryTableGroupScan) {
+    Preconditions.checkArgument(primaryTableGroupScan instanceof DbGroupScan);
+    DbGroupScan dbGroupScan = (DbGroupScan) primaryTableGroupScan;
+    DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
+    double totalRows = indexProps.getTotalRows();
+    double leadRowCount = indexProps.getLeadingSelectivity() * totalRows;
+    double avgRowSize = indexProps.getAvgRowSize();
+    if (indexProps.isCovering()) { // covering index
+      // int numIndexCols = allFields.size();
+      // for disk i/o, all index columns are going to be read into memory
+      double numBlocks = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan));
+      double diskCost = numBlocks * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan);
+      // cpu cost is cost of filter evaluation for the remainder condition
+      double cpuCost = 0.0;
+      if (indexProps.getTotalRemainderFilter() != null) {
+        cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST;
+      }
+      double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost
+      return costFactory.makeCost(leadRowCount, cpuCost, diskCost, networkCost);
+
+    } else { // non-covering index
+      // int numIndexCols = allFields.size();
+      double numBlocksIndex = Math.ceil((leadRowCount * avgRowSize)/pluginCost.getBlockSize(primaryTableGroupScan));
+      double diskCostIndex = numBlocksIndex * pluginCost.getSequentialBlockReadCost(primaryTableGroupScan);
+      // for the primary table join-back each row may belong to a different block, so in general num_blocks = num_rows;
+      // however, num_blocks cannot exceed the total number of blocks of the table
+      double totalBlocksPrimary = Math.ceil((dbGroupScan.getColumns().size() *
+          pluginCost.getAverageColumnSize(primaryTableGroupScan) * totalRows)/
+          pluginCost.getBlockSize(primaryTableGroupScan));
+      double diskBlocksPrimary = Math.min(totalBlocksPrimary, leadRowCount);
+      double diskCostPrimary = diskBlocksPrimary * pluginCost.getRandomBlockReadCost(primaryTableGroupScan);
+      double diskCostTotal = diskCostIndex + diskCostPrimary;
+
+      // cpu cost of remainder condition evaluation over the selected rows
+      double cpuCost = 0.0;
+      if (indexProps.getTotalRemainderFilter() != null) {
+        cpuCost = leadRowCount * DrillCostBase.COMPARE_CPU_COST;
+      }
+      double networkCost = 0.0; // TODO: add network cost once full table scan also considers network cost
+      return costFactory.makeCost(leadRowCount, cpuCost, diskCostTotal, networkCost);
+    }
+  }
+
+  // Future use once full table scan also includes network cost
+  private double getNetworkCost(double leadRowCount, int numProjectedFields, boolean isCovering,
+      GroupScan primaryTableGroupScan) {
+    if (isCovering) {
+      // db server will send only the projected columns to the db client for the selected
+      // number of rows, so network cost is based on the number of actual projected columns
+      double networkCost = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan);
+      return networkCost;
+    } else {
+      // only the rowkey column is projected from the index and sent over the network
+      double networkCostIndex = leadRowCount * 1 * pluginCost.getAverageColumnSize(primaryTableGroupScan);
+
+      // after join-back to primary table, all projected columns are sent over the network
+      double networkCostPrimary = leadRowCount * numProjectedFields * pluginCost.getAverageColumnSize(primaryTableGroupScan);
+
+      return networkCostIndex + networkCostPrimary;
+    }
+
+  }
+
+  @Override
+  public PluginCost getPluginCostModel() {
+    return pluginCost;
+  }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
new file mode 100644
index 0000000..e1b8a61
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.index;
+
+import com.google.common.collect.Maps;
+import com.mapr.db.Admin;
+import com.mapr.db.MapRDB;
+import com.mapr.db.exceptions.DBException;
+import com.mapr.db.index.IndexDesc;
+import com.mapr.db.index.IndexDesc.MissingAndNullOrdering;
+import com.mapr.db.index.IndexFieldDesc;
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatMatcher;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.FieldPathHelper;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.ojai.FieldPath;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover {
+
+  static final String DEFAULT_STRING_CAST_LEN_STR = "256";
+
+  public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) {
+    super((AbstractDbGroupScan) inScan, scanRel);
+  }
+
+  public MapRDBIndexDiscover(GroupScan inScan, ScanPrel scanRel) {
+    super((AbstractDbGroupScan) inScan, scanRel);
+  }
+
+  @Override
+  public IndexCollection getTableIndex(String tableName) {
+    //return getTableIndexFromCommandLine(tableName);
+    return getTableIndexFromMFS(tableName);
+  }
+
+  /**
+   *
+   * @param tableName
+   * @return
+   */
+  private IndexCollection getTableIndexFromMFS(String tableName) {
+    try {
+      Set<DrillIndexDescriptor> idxSet = new HashSet<>();
+      Collection<IndexDesc> indexes = admin().getTableIndexes(new Path(tableName));
+      if (indexes.size() == 0 ) {
+        logger.error("No index returned from Admin.getTableIndexes for table {}", tableName);
+        return null;
+      }
+      for (IndexDesc idx : indexes) {
+        DrillIndexDescriptor hbaseIdx = buildIndexDescriptor(tableName, idx);
+        if (hbaseIdx == null) {
+          //not able to build a valid index based on the index info from MFS
+          logger.error("Not able to build index for {}", idx.toString());
+          continue;
+        }
+        idxSet.add(hbaseIdx);
+      }
+      if (idxSet.size() == 0) {
+        logger.error("No index found for table {}.", tableName);
+        return null;
+      }
+      return new DrillIndexCollection(getOriginalScanRel(), idxSet);
+    } catch (DBException ex) {
+      logger.error("Could not get table index from File system.", ex);
+    }
+    catch(InvalidIndexDefinitionException ex) {
+      logger.error("Invalid index definition detected.", ex);
+    }
+    return null;
+  }
+
+  FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException {
+    String tableName = idxDesc.getTableName();
+    String[] tablePath = tableName.split("/");
+    String tableParent = tableName.substring(0, tableName.lastIndexOf("/"));
+
+    return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false);
+  }
+
+  @Override
+  public DrillTable getNativeDrillTable(IndexDescriptor idxDescriptor) {
+
+    try {
+      final AbstractDbGroupScan origScan = getOriginalScan();
+      if (!(origScan instanceof MapRDBGroupScan)) {
+        return null;
+      }
+      MapRDBFormatPlugin maprFormatPlugin = ((MapRDBGroupScan) origScan).getFormatPlugin();
+      FileSystemPlugin fsPlugin = (FileSystemPlugin) (((MapRDBGroupScan) origScan).getStoragePlugin());
+
+      DrillFileSystem fs = ImpersonationUtil.createFileSystem(origScan.getUserName(), fsPlugin.getFsConf());
+      MapRDBFormatMatcher matcher = (MapRDBFormatMatcher) (maprFormatPlugin.getMatcher());
+      FileSelection fsSelection = deriveFSSelection(fs, idxDescriptor);
+      return matcher.isReadableIndex(fs, fsSelection, fsPlugin, fsPlugin.getName(),
+          origScan.getUserName(), idxDescriptor);
+
+    } catch (Exception e) {
+      logger.error("Failed to get native DrillTable.", e);
+    }
+    return null;
+  }
+
+  private SchemaPath fieldName2SchemaPath(String fieldName) {
+    if (fieldName.contains(":")) {
+      fieldName = fieldName.split(":")[1];
+    }
+    if (fieldName.contains(".")) {
+      return FieldPathHelper.fieldPath2SchemaPath(FieldPath.parseFrom(fieldName));
+    }
+    return SchemaPath.getSimplePath(fieldName);
+  }
+
+  String getDrillTypeStr(String maprdbTypeStr) {
+    String typeStr = maprdbTypeStr.toUpperCase();
+    String[] typeTokens = typeStr.split("[)(]");
+    String typeData = DEFAULT_STRING_CAST_LEN_STR;
+    if(typeTokens.length > 1) {
+      typeStr = typeTokens[0];
+      typeData = typeTokens[1];
+    }
+    switch(typeStr){
+      case "STRING":
+        // set default width since it is not specified
+        return "VARCHAR("+typeData+")";
+      case "LONG":
+        return "BIGINT";
+      case "INT":
+      case "INTEGER":
+        return "INT";
+      case "FLOAT":
+        return "FLOAT4";
+      case "DOUBLE":
+        return "FLOAT8";
+      case "INTERVAL_YEAR_MONTH":
+        return "INTERVALYEAR";
+      case "INTERVAL_DAY_TIME":
+        return "INTERVALDAY";
+      case "BOOLEAN":
+        return "BIT";
+      case "BINARY":
+        return "VARBINARY";
+      case "ANY":
+      case "DECIMAL":
+        return null;
+      default: return typeStr;
+    }
+
+  }
+
+  TypeProtos.MajorType getDrillType(String typeStr) {
+    switch(typeStr){
+      case "VARCHAR":
+      case "CHAR":
+      case "STRING":
+        // set default width since it is not specified
+        return
+            Types.required(TypeProtos.MinorType.VARCHAR).toBuilder().setWidth(
+                getOriginalScanRel().getCluster().getTypeFactory().createSqlType(SqlTypeName.VARCHAR).getPrecision()).build();
+      case "LONG":
+      case "BIGINT":
+        return Types.required(TypeProtos.MinorType.BIGINT);
+      case "INT":
+      case "INTEGER":
+        return Types.required(TypeProtos.MinorType.INT);
+      case "FLOAT":
+        return Types.required(TypeProtos.MinorType.FLOAT4);
+      case "DOUBLE":
+        return Types.required(TypeProtos.MinorType.FLOAT8);
+      case "INTERVAL_YEAR_MONTH":
+        return Types.required(TypeProtos.MinorType.INTERVALYEAR);
+      case "INTERVAL_DAY_TIME":
+        return Types.required(TypeProtos.MinorType.INTERVALDAY);
+      case "BOOLEAN":
+        return Types.required(TypeProtos.MinorType.BIT);
+      case "BINARY":
+        return Types.required(TypeProtos.MinorType.VARBINARY).toBuilder().build();
+      case "ANY":
+      case "DECIMAL":
+        return null;
+      default: return Types.required(TypeProtos.MinorType.valueOf(typeStr));
+    }
+  }
+
+  private LogicalExpression castFunctionSQLSyntax(String field, String type) throws InvalidIndexDefinitionException {
+    //get castTypeStr so we can construct SQL syntax string before MapRDB could provide such syntax
+    String castTypeStr = getDrillTypeStr(type);
+    if(castTypeStr == null) {//no cast
+      throw new InvalidIndexDefinitionException("cast function type not recognized: " + type + "for field " + field);
+    }
+    try {
+      String castFunc = String.format("cast( %s as %s)", field, castTypeStr);
+      final ExprLexer lexer = new ExprLexer(new ANTLRStringStream(castFunc));
+      final CommonTokenStream tokens = new CommonTokenStream(lexer);
+      final ExprParser parser = new ExprParser(tokens);
+      final ExprParser.parse_return ret = parser.parse();
+      logger.trace("{}, {}", tokens, ret);
+      return ret.e;
+    }catch(Exception ex) {
+      logger.error("parse failed{}", ex);
+    }
+    return null;
+  }
+
+  private LogicalExpression getIndexExpression(IndexFieldDesc desc) throws InvalidIndexDefinitionException {
+    final String fieldName = desc.getFieldPath().asPathString();
+    final String functionDef = desc.getFunctionName();
+    if ((functionDef != null)) {//this is a function
+      String[] tokens = functionDef.split("\\s+");
+      if (tokens[0].equalsIgnoreCase("cast")) {
+        if (tokens.length != 3) {
+          throw new InvalidIndexDefinitionException("cast function definition not recognized: " + functionDef);
+        }
+        LogicalExpression idxExpr = castFunctionSQLSyntax(fieldName, tokens[2]);
+        if (idxExpr == null) {
+          throw new InvalidIndexDefinitionException("got null expression for function definition: " + functionDef);
+        }
+        return idxExpr;
+      } else {
+        throw new InvalidIndexDefinitionException("function definition is not supported for indexing: " + functionDef);
+      }
+    }
+    //else it is a schemaPath
+    return fieldName2SchemaPath(fieldName);
+  }
+
+  private List<LogicalExpression> field2SchemaPath(Collection<IndexFieldDesc> descCollection)
+      throws InvalidIndexDefinitionException {
+    List<LogicalExpression> listSchema = new ArrayList<>();
+    for (IndexFieldDesc field : descCollection) {
+        listSchema.add(getIndexExpression(field));
+    }
+    return listSchema;
+  }
+
+  private List<RelFieldCollation> getFieldCollations(IndexDesc desc, Collection<IndexFieldDesc> descCollection) {
+    List<RelFieldCollation> fieldCollations = new ArrayList<>();
+    int i=0;
+    for (IndexFieldDesc field : descCollection) {
+      RelFieldCollation.Direction direction = (field.getSortOrder() == IndexFieldDesc.Order.Asc) ?
+          RelFieldCollation.Direction.ASCENDING : (field.getSortOrder() == IndexFieldDesc.Order.Desc ?
+              RelFieldCollation.Direction.DESCENDING : null);
+      if (direction != null) {
+        // assume null direction of NULLS UNSPECIFIED for now until MapR-DB adds that to the APIs
+        RelFieldCollation.NullDirection nulldir =
+            desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST :
+            (desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ?
+                NullDirection.LAST : NullDirection.UNSPECIFIED);
+        RelFieldCollation c = new RelFieldCollation(i++, direction, nulldir);
+        fieldCollations.add(c);
+      } else {
+        // if the direction is not present for a field, no need to examine remaining fields
+        break;
+      }
+    }
+    return fieldCollations;
+  }
+
+  private CollationContext buildCollationContext(List<LogicalExpression> indexFields,
+      List<RelFieldCollation> indexFieldCollations) {
+    assert indexFieldCollations.size() <= indexFields.size();
+    Map<LogicalExpression, RelFieldCollation> collationMap = Maps.newHashMap();
+    for (int i = 0; i < indexFieldCollations.size(); i++) {
+      collationMap.put(indexFields.get(i), indexFieldCollations.get(i));
+    }
+    CollationContext collationContext = new CollationContext(collationMap, indexFieldCollations);
+    return collationContext;
+  }
+
+  private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc)
+      throws InvalidIndexDefinitionException {
+    if (desc.isExternal()) {
+      //XX: not support external index
+      return null;
+    }
+
+    IndexDescriptor.IndexType idxType = IndexDescriptor.IndexType.NATIVE_SECONDARY_INDEX;
+    List<LogicalExpression> indexFields = field2SchemaPath(desc.getIndexedFields());
+    List<LogicalExpression> coveringFields = field2SchemaPath(desc.getIncludedFields());
+    coveringFields.add(SchemaPath.getSimplePath("_id"));
+    CollationContext collationContext = null;
+    if (!desc.isHashed()) { // hash index has no collation property
+      List<RelFieldCollation> indexFieldCollations = getFieldCollations(desc, desc.getIndexedFields());
+      collationContext = buildCollationContext(indexFields, indexFieldCollations);
+    }
+
+    DrillIndexDescriptor idx = new MapRDBIndexDescriptor (
+        indexFields,
+        collationContext,
+        coveringFields,
+        null,
+        desc.getIndexName(),
+        tableName,
+        idxType,
+        desc,
+        this.getOriginalScan(),
+        desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullFirst ? NullDirection.FIRST :
+            (desc.getMissingAndNullOrdering() == MissingAndNullOrdering.MissingAndNullLast ?
+                NullDirection.LAST : NullDirection.UNSPECIFIED));
+
+    String storageName = this.getOriginalScan().getStoragePlugin().getName();
+    materializeIndex(storageName, idx);
+    return idx;
+  }
+
+  @SuppressWarnings("deprecation")
+  private Admin admin() {
+    assert getOriginalScan() instanceof MapRDBGroupScan;
+
+    final MapRDBGroupScan dbGroupScan = (MapRDBGroupScan) getOriginalScan();
+    final UserGroupInformation currentUser = ImpersonationUtil.createProxyUgi(dbGroupScan.getUserName());
+    final Configuration conf = dbGroupScan.getFormatPlugin().getFsConf();
+
+    final Admin admin;
+    try {
+      admin = currentUser.doAs(new PrivilegedExceptionAction<Admin>() {
+        public Admin run() throws Exception {
+          return MapRDB.getAdmin(conf);
+        }
+      });
+    } catch (Exception e) {
+      throw new DrillRuntimeException("Failed to get Admin instance for user: " + currentUser.getUserName(), e);
+    }
+    return admin;
+  }
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
index 3b8de34..e129b96 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBStatistics.java
@@ -17,20 +17,49 @@
  */
 package org.apache.drill.exec.planner.index;
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.google.common.collect.Maps;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelMdUtil;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.ExpressionStringBuilder;
+import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.hbase.HBaseRegexParser;
+import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.hadoop.hbase.HConstants;
 import org.ojai.store.QueryCondition;
 
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 public class MapRDBStatistics implements Statistics {
@@ -260,16 +289,332 @@ public class MapRDBStatistics implements Statistics {
   }
 
   public boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context) {
-    //XXX to implement for complete secondary index framework
+    GroupScan scan = IndexPlanUtils.getGroupScan(scanRel);
+
+    PlannerSettings settings = PrelUtil.getPlannerSettings(scanRel.getCluster().getPlanner());
+    rowKeyJoinBackIOFactor = settings.getIndexRowKeyJoinCostFactor();
+    if (scan instanceof DbGroupScan) {
+      String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+      if (statsCache.get(conditionAsStr) == null) {
+        IndexCollection indexes = ((DbGroupScan)scan).getSecondaryIndexCollection(scanRel);
+        populateStats(condition, indexes, scanRel, context);
+        logger.info("index_plan_info: initialize: scanRel #{} and groupScan {} got fulltable {}, statsCache: {}, fiStatsCache: {}",
+            scanRel.getId(), System.identityHashCode(scan), fullTableScanPayload, statsCache, fIStatsCache);
+        return true;
+      }
+    }
     return false;
   }
 
+  /**
+   * This function computes statistics when there is no query condition
+   * @param jTabGrpScan - The current group scan
+   * @param indexes - The collection of indexes to use for getting statistics
+   * @param scanRel - The current scanRel
+   * @param context - The index plan call context
+   */
+  private void populateStatsForNoFilter(JsonTableGroupScan jTabGrpScan, IndexCollection indexes, RelNode scanRel,
+                                   IndexCallContext context) {
+    // Get the stats payload for full table (has total rows in the table)
+    StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel);
+    addToCache(null, null, context, ftsPayload, jTabGrpScan, scanRel, scanRel.getRowType());
+    addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload);
+    // Get the stats for all indexes
+    for (IndexDescriptor idx: indexes) {
+      StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, idx, scanRel);
+      StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx);
+      RelDataType newRowType;
+      FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
+      if (functionInfo.hasFunctional()) {
+        newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
+      } else {
+        newRowType = scanRel.getRowType();
+      }
+      addToCache(null, idx, context, idxPayload, jTabGrpScan, scanRel, newRowType);
+      addToCache(idx, idxRowSizePayload, ftsPayload);
+    }
+  }
+
+  /**
+   * This is the core statistics function for populating the statistics. The statistics populated correspond to the query
+   * condition. Based on different types of plans, we would need statistics for different combinations of predicates. Currently,
+   * we do not have a tree-walker for {@link QueryCondition}. Hence, instead of using the individual predicates stats, to construct
+   * the stats for the overall predicates, we rely on using the final predicates. Hence, this has a limitation(susceptible) to
+   * predicate modification post stats generation. Statistics computed/stored are rowcounts, leading rowcounts, average rowsize.
+   * Rowcounts and leading rowcounts (i.e. corresponding to predicates on the leading index columns) are stored in the statsCache.
+   * Average rowsizes are stored in the fiStatsCache (FI stands for Filter Independent).
+   *
+   * @param condition - The condition for which to obtain statistics
+   * @param indexes - The collection of indexes to use for getting statistics
+   * @param scanRel - The current scanRel
+   * @param context - The index plan call context
+   */
+  private void populateStats(RexNode condition, IndexCollection indexes, DrillScanRelBase scanRel,
+                               IndexCallContext context) {
+    JsonTableGroupScan jTabGrpScan;
+    Map<IndexDescriptor, IndexConditionInfo> firstKeyIdxConditionMap;
+    Map<IndexDescriptor, IndexConditionInfo> idxConditionMap;
+    /* Map containing the individual base conditions of an ANDed/ORed condition and their selectivities.
+     * This is used to compute the overall selectivity of a complex ANDed/ORed condition using its base
+     * conditions. Helps prevent over/under estimates and guessed selectivity for ORed predicates.
+     */
+    Map<String, Double> baseConditionMap;
+    GroupScan grpScan = IndexPlanUtils.getGroupScan(scanRel);
+
+    if ((scanRel instanceof DrillScanRel || scanRel instanceof ScanPrel) &&
+        grpScan instanceof JsonTableGroupScan) {
+      jTabGrpScan = (JsonTableGroupScan) grpScan;
+    } else {
+      logger.debug("Statistics: populateStats exit early - not an instance of JsonTableGroupScan!");
+      return;
+    }
+    if (condition == null) {
+      populateStatsForNoFilter(jTabGrpScan, indexes, scanRel, context);
+      statsAvailable = true;
+      return;
+    }
+
+    RexBuilder builder = scanRel.getCluster().getRexBuilder();
+    PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster());
+    // Get the stats payload for full table (has total rows in the table)
+    StatisticsPayload ftsPayload = jTabGrpScan.getFirstKeyEstimatedStats(null, null, scanRel);
+
+    // Get the average row size for table and all indexes
+    addToCache(null, jTabGrpScan.getAverageRowSizeStats(null), ftsPayload);
+    if (ftsPayload == null || ftsPayload.getRowCount() == 0) {
+      return;
+    }
+    for (IndexDescriptor idx : indexes) {
+      StatisticsPayload idxRowSizePayload = jTabGrpScan.getAverageRowSizeStats(idx);
+      addToCache(idx, idxRowSizePayload, ftsPayload);
+    }
+
+    /* Only use indexes with distinct first key */
+    IndexCollection distFKeyIndexes = distinctFKeyIndexes(indexes, scanRel);
+    IndexConditionInfo.Builder infoBuilder = IndexConditionInfo.newBuilder(condition,
+        distFKeyIndexes, builder, scanRel);
+    idxConditionMap = infoBuilder.getIndexConditionMap();
+    firstKeyIdxConditionMap = infoBuilder.getFirstKeyIndexConditionMap();
+    baseConditionMap = new HashMap<>();
+    for (IndexDescriptor idx : firstKeyIdxConditionMap.keySet()) {
+      if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) {
+        continue;
+      }
+      RexNode idxCondition = firstKeyIdxConditionMap.get(idx).indexCondition;
+      /* Use the pre-processed condition only for getting actual statistic from MapR-DB APIs. Use the
+       * original condition everywhere else (cache store/lookups) since the RexNode condition and its
+       * corresponding QueryCondition will be used to get statistics. e.g. we convert LIKE into RANGE
+       * condition to get statistics. However, statistics are always asked for LIKE and NOT the RANGE
+       */
+      RexNode preProcIdxCondition = convertToStatsCondition(idxCondition, idx, context, scanRel,
+          Arrays.asList(SqlKind.CAST, SqlKind.LIKE));
+      RelDataType newRowType;
+      FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
+      if (functionInfo.hasFunctional()) {
+        newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
+      } else {
+        newRowType = scanRel.getRowType();
+      }
+
+      QueryCondition queryCondition = jTabGrpScan.convertToQueryCondition(
+          convertToLogicalExpression(preProcIdxCondition, newRowType, settings, builder));
+      // Cap rows/size at total rows in case of issues with DB APIs
+      StatisticsPayload idxPayload = jTabGrpScan.getFirstKeyEstimatedStats(queryCondition, idx, scanRel);
+      double rowCount = Math.min(idxPayload.getRowCount(), ftsPayload.getRowCount());
+      double leadingRowCount = Math.min(idxPayload.getLeadingRowCount(), rowCount);
+      double avgRowSize = Math.min(idxPayload.getAvgRowSize(), ftsPayload.getAvgRowSize());
+      StatisticsPayload payload = new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize);
+      addToCache(idxCondition, idx, context, payload, jTabGrpScan, scanRel, newRowType);
+      addBaseConditions(idxCondition, payload, false, baseConditionMap, scanRel.getRowType());
+    }
+    /* Add the row count for index conditions on all indexes. Stats are only computed for leading
+     * keys but index conditions can be pushed and would be required for access path costing
+     */
+    for (IndexDescriptor idx : idxConditionMap.keySet()) {
+      if(IndexPlanUtils.conditionIndexed(context.getOrigMarker(), idx) == IndexPlanUtils.ConditionIndexed.NONE) {
+        continue;
+      }
+      Map<LogicalExpression, RexNode> leadingPrefixMap = Maps.newHashMap();
+      double rowCount, leadingRowCount, avgRowSize;
+      RexNode idxCondition = idxConditionMap.get(idx).indexCondition;
+      // Ignore conditions which always evaluate to true
+      if (idxCondition.isAlwaysTrue()) {
+        continue;
+      }
+      RexNode idxIncColCondition = idxConditionMap.get(idx).remainderCondition;
+      RexNode idxRemColCondition = IndexPlanUtils.getLeadingPrefixMap(leadingPrefixMap, idx.getIndexColumns(), infoBuilder, idxCondition);
+      RexNode idxLeadColCondition = IndexPlanUtils.getLeadingColumnsFilter(
+          IndexPlanUtils.getLeadingFilters(leadingPrefixMap, idx.getIndexColumns()), builder);
+      RexNode idxTotRemColCondition = IndexPlanUtils.getTotalRemainderFilter(idxRemColCondition, idxIncColCondition, builder);
+      RexNode idxTotColCondition = IndexPlanUtils.getTotalFilter(idxLeadColCondition, idxTotRemColCondition, builder);
+      FunctionalIndexInfo functionInfo = idx.getFunctionalInfo();
+      RelDataType newRowType = scanRel.getRowType();
+      if (functionInfo.hasFunctional()) {
+        newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(scanRel, context, functionInfo);
+      }
+      /* For non-covering plans we would need the index leading condition */
+      rowCount = ftsPayload.getRowCount() * computeSelectivity(idxLeadColCondition, idx,
+          ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+      leadingRowCount = rowCount;
+      avgRowSize = fIStatsCache.get(buildUniqueIndexIdentifier(idx)).getAvgRowSize();
+      addToCache(idxLeadColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
+          jTabGrpScan, scanRel, newRowType);
+      /* For covering plans we would need the full condition */
+      rowCount = ftsPayload.getRowCount() * computeSelectivity(idxTotColCondition, idx,
+          ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+      addToCache(idxTotColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
+          jTabGrpScan, scanRel, newRowType);
+      /* For intersect plans we would need the index condition */
+      rowCount = ftsPayload.getRowCount() * computeSelectivity(idxCondition, idx,
+          ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+      addToCache(idxCondition, idx, context, new MapRDBStatisticsPayload(rowCount, leadingRowCount, avgRowSize),
+          jTabGrpScan, scanRel, newRowType);
+      /* Add the rowCount for condition on only included columns - no leading columns here! */
+      if (idxIncColCondition != null) {
+        rowCount = ftsPayload.getRowCount() * computeSelectivity(idxIncColCondition, null,
+            ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+        addToCache(idxIncColCondition, idx, context, new MapRDBStatisticsPayload(rowCount, rowCount, avgRowSize),
+            jTabGrpScan, scanRel, newRowType);
+      }
+    }
+
+    // Add the rowCount for the complete condition - based on table
+    double rowCount = ftsPayload.getRowCount() * computeSelectivity(condition, null,
+        ftsPayload.getRowCount(), scanRel, baseConditionMap).left;
+    // Here, ftsLeadingKey rowcount is based on _id predicates
+    StatisticsPayload ftsLeadingKeyPayload = jTabGrpScan.getFirstKeyEstimatedStats(jTabGrpScan.convertToQueryCondition(
+        convertToLogicalExpression(condition, scanRel.getRowType(), settings, builder)), null, scanRel);
+    addToCache(condition, null, null, new MapRDBStatisticsPayload(rowCount, ftsLeadingKeyPayload.getRowCount(),
+        ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType());
+    // Add the full table rows while we are at it - represented by <NULL> RexNode, <NULL> QueryCondition.
+    // No ftsLeadingKey so leadingKeyRowcount = totalRowCount
+    addToCache(null, null, null, new MapRDBStatisticsPayload(ftsPayload.getRowCount(), ftsPayload.getRowCount(),
+        ftsPayload.getAvgRowSize()), jTabGrpScan, scanRel, scanRel.getRowType());
+    // mark stats has been statsAvailable
+    statsAvailable = true;
+  }
+
+  private boolean addBaseConditions(RexNode condition, StatisticsPayload payload, boolean redundant,
+      Map<String, Double> baseConditionMap, RelDataType rowType) {
+    boolean res = redundant;
+    if (condition.getKind() == SqlKind.AND) {
+      for(RexNode pred : RelOptUtil.conjunctions(condition)) {
+        res = addBaseConditions(pred, payload, res, baseConditionMap, rowType);
+      }
+    } else if (condition.getKind() == SqlKind.OR) {
+      for(RexNode pred : RelOptUtil.disjunctions(condition)) {
+        res = addBaseConditions(pred, payload, res, baseConditionMap, rowType);
+      }
+    } else {
+      // base condition
+      String conditionAsStr = convertRexToString(condition, rowType);
+      if (!redundant) {
+        baseConditionMap.put(conditionAsStr, payload.getRowCount());
+        return true;
+      } else {
+        baseConditionMap.put(conditionAsStr, -1.0);
+        return false;
+      }
+    }
+    return res;
+  }
   /*
- * Convert the given RexNode to a String representation while also replacing the RexInputRef references
- * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode
- * expressions may differ in the RexInputRef positions but otherwise the same.
- * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State)
- */
+   * Adds the statistic(row count) to the cache. Also adds the corresponding QueryCondition->RexNode
+   * condition mapping.
+   */
+  private void addToCache(RexNode condition, IndexDescriptor idx, IndexCallContext context,
+      StatisticsPayload payload, JsonTableGroupScan jTabGrpScan, RelNode scanRel, RelDataType rowType) {
+    if (condition != null
+        && !condition.isAlwaysTrue()) {
+      RexBuilder builder = scanRel.getCluster().getRexBuilder();
+      PlannerSettings settings = PrelUtil.getSettings(scanRel.getCluster());
+      String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+      if (statsCache.get(conditionAsStr) == null
+              && payload.getRowCount() != Statistics.ROWCOUNT_UNKNOWN) {
+        Map<String, StatisticsPayload> payloadMap = new HashMap<>();
+        payloadMap.put(buildUniqueIndexIdentifier(idx), payload);
+        statsCache.put(conditionAsStr, payloadMap);
+        logger.debug("Statistics: StatsCache:<{}, {}>",conditionAsStr, payload);
+        // Always pre-process CAST conditions - Otherwise queryCondition will not be generated correctly
+        RexNode preProcIdxCondition = convertToStatsCondition(condition, idx, context, scanRel,
+            Arrays.asList(SqlKind.CAST));
+        QueryCondition queryCondition =
+            jTabGrpScan.convertToQueryCondition(convertToLogicalExpression(preProcIdxCondition,
+                rowType, settings, builder));
+        if (queryCondition != null) {
+          String queryConditionAsStr = queryCondition.toString();
+          if (conditionRexNodeMap.get(queryConditionAsStr) == null) {
+            conditionRexNodeMap.put(queryConditionAsStr, conditionAsStr);
+            logger.debug("Statistics: QCRNCache:<{}, {}>",queryConditionAsStr, conditionAsStr);
+          }
+        } else {
+          logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr);
+          logger.debug("Statistics: QCRNCache: Unable to generate QueryCondition for {}", conditionAsStr);
+        }
+      } else {
+        Map<String, StatisticsPayload> payloadMap = statsCache.get(conditionAsStr);
+        if (payloadMap != null) {
+          if (payloadMap.get(buildUniqueIndexIdentifier(idx)) == null) {
+            payloadMap.put(buildUniqueIndexIdentifier(idx), payload);
+
+            // rowCount for the same condition should be the same on primary table or index,
+            // let us sync them to the smallest since currently both are over-estimated.
+            // DO NOT sync the leading rowCount since it is based on the leading condition and not the
+            // condition (key for this cache). Hence, for the same condition the leading condition and
+            // consequently the leading rowCount will vary with the index. Syncing them may lead to
+            // unintended side-effects e.g. given a covering index and full table scan and a condition
+            // on a non-id field which happens to be the leading key in the index, the leading rowcount
+            // for the full table scan should be the full table rowcount. Syncing them would incorrectly
+            // make the full table scan cheaper! If required, syncing should be only done based on
+            // leading condition and NOT the condition
+            double minimalRowCount = payload.getRowCount();
+            for (StatisticsPayload existing : payloadMap.values()) {
+              if (existing.getRowCount() < minimalRowCount) {
+                minimalRowCount = existing.getRowCount();
+              }
+            }
+            for (StatisticsPayload existing : payloadMap.values()) {
+              if (existing instanceof MapRDBStatisticsPayload) {
+                ((MapRDBStatisticsPayload)existing).rowCount = minimalRowCount;
+              }
+            }
+          } else {
+            logger.debug("Statistics: Filter row count already exists for filter: {}. Skip!", conditionAsStr);
+          }
+        } else {
+          logger.debug("Statistics: Filter row count is UNKNOWN for filter: {}", conditionAsStr);
+        }
+      }
+    } else if (condition == null && idx == null) {
+      fullTableScanPayload = new MapRDBStatisticsPayload(payload.getRowCount(),
+          payload.getLeadingRowCount(), payload.getAvgRowSize());
+      logger.debug("Statistics: StatsCache:<{}, {}>","NULL", fullTableScanPayload);
+    }
+  }
+
+  private void addToCache(IndexDescriptor idx, StatisticsPayload payload, StatisticsPayload ftsPayload) {
+    String tabIdxIdentifier = buildUniqueIndexIdentifier(idx);
+    if (fIStatsCache.get(tabIdxIdentifier) == null) {
+      if (ftsPayload.getAvgRowSize() >= payload.getAvgRowSize()) {
+        fIStatsCache.put(tabIdxIdentifier, payload);
+        logger.debug("Statistics: fIStatsCache:<{}, {}>",tabIdxIdentifier, payload);
+      } else {
+        StatisticsPayload cappedPayload =
+            new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, ftsPayload.getAvgRowSize());
+        fIStatsCache.put(tabIdxIdentifier,cappedPayload);
+        logger.debug("Statistics: fIStatsCache:<{}, {}> (Capped)",tabIdxIdentifier, cappedPayload);
+      }
+    } else {
+      logger.debug("Statistics: Average row size already exists for :<{}, {}>. Skip!",tabIdxIdentifier, payload);
+    }
+  }
+
+  /*
+   * Convert the given RexNode to a String representation while also replacing the RexInputRef references
+   * to actual column names. Since, we compare String representations of RexNodes, two equivalent RexNode
+   * expressions may differ in the RexInputRef positions but otherwise the same.
+   * e.g. $1 = 'CA' projection (State, Country) , $2 = 'CA' projection (Country, State)
+   */
   private String convertRexToString(RexNode condition, RelDataType rowType) {
     StringBuilder sb = new StringBuilder();
     if (condition == null) {
@@ -320,11 +665,11 @@ public class MapRDBStatistics implements Statistics {
   }
 
   /*
- * Generate the input reference to column mapping for reference replacement. Please
- * look at the usage in convertRexToString() to understand why this mapping is required.
- */
+   * Generate the input reference to column mapping for reference replacement. Please
+   * look at the usage in convertRexToString() to understand why this mapping is required.
+   */
   private void getInputRefMapping(RexNode condition, RelDataType rowType,
-                                  HashMap<String, String> mapping) {
+      HashMap<String, String> mapping) {
     if (condition instanceof RexCall) {
       for (RexNode op : ((RexCall) condition).getOperands()) {
         getInputRefMapping(op, rowType, mapping);
@@ -334,4 +679,328 @@ public class MapRDBStatistics implements Statistics {
           rowType.getFieldNames().get(condition.hashCode()));
     }
   }
+
+  /*
+   * Additional pre-processing may be required for LIKE/CAST predicates in order to compute statistics.
+   * e.g. A LIKE predicate should be converted to a RANGE predicate for statistics computation. MapR-DB
+   * does not yet support computing statistics for LIKE predicates.
+   */
+  private RexNode convertToStatsCondition(RexNode condition, IndexDescriptor index,
+      IndexCallContext context, RelNode scanRel, List<SqlKind>typesToProcess) {
+    RexBuilder builder = scanRel.getCluster().getRexBuilder();
+    if (condition.getKind() == SqlKind.AND) {
+      final List<RexNode> conditions = Lists.newArrayList();
+      for(RexNode pred : RelOptUtil.conjunctions(condition)) {
+        conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess));
+      }
+      return RexUtil.composeConjunction(builder, conditions, false);
+    } else if (condition.getKind() == SqlKind.OR) {
+      final List<RexNode> conditions = Lists.newArrayList();
+      for(RexNode pred : RelOptUtil.disjunctions(condition)) {
+        conditions.add(convertToStatsCondition(pred, index, context, scanRel, typesToProcess));
+      }
+      return RexUtil.composeDisjunction(builder, conditions, false);
+    } else if (condition instanceof RexCall) {
+      // LIKE operator - convert to a RANGE predicate, if possible
+      if (typesToProcess.contains(SqlKind.LIKE)
+          && ((RexCall) condition).getOperator().getKind() == SqlKind.LIKE) {
+        return convertLikeToRange((RexCall)condition, builder);
+      } else if (typesToProcess.contains(SqlKind.CAST)
+          && hasCastExpression(condition)) {
+        return convertCastForFIdx(((RexCall) condition), index, context, scanRel);
+      }
+      else {
+        return condition;
+      }
+    }
+    return condition;
+  }
+
+  /*
+   * Determines whether the given expression contains a CAST expression. Assumes that the
+   * given expression is a valid expression.
+   * Returns TRUE, if it finds at least one instance of CAST operator.
+   */
+  private boolean hasCastExpression(RexNode condition) {
+    if (condition instanceof RexCall) {
+      if (((RexCall) condition).getOperator().getKind() == SqlKind.CAST) {
+        return true;
+      }
+      for (RexNode op : ((RexCall) condition).getOperands()) {
+        if (hasCastExpression(op)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+  /*
+   * CAST expressions are not understood by MAPR-DB as-is. Hence, we must convert them before passing them
+   * onto MAPR-DB for statistics. Given a functional index, the given expression is converted into an
+   * expression on the `expression` column of the functional index.
+   */
+  private RexNode convertCastForFIdx(RexCall condition, IndexDescriptor index,
+                                     IndexCallContext context, RelNode origScan) {
+    if (index == null) {
+      return condition;
+    }
+    FunctionalIndexInfo functionInfo = index.getFunctionalInfo();
+    if (!functionInfo.hasFunctional()) {
+      return condition;
+    }
+    // The functional index has a different row-type than the original scan. Use the index row-type when
+    // converting the condition
+    RelDataType newRowType = FunctionalIndexHelper.rewriteFunctionalRowType(origScan, context, functionInfo);
+    RexBuilder builder = origScan.getCluster().getRexBuilder();
+    return FunctionalIndexHelper.convertConditionForIndexScan(condition,
+        origScan, newRowType, builder, functionInfo);
+  }
+
+  /*
+   * Helper function to perform additional pre-processing for LIKE predicates
+   */
+  private RexNode convertLikeToRange(RexCall condition, RexBuilder builder) {
+    Preconditions.checkArgument(condition.getOperator().getKind() == SqlKind.LIKE,
+        "Unable to convertLikeToRange: argument is not a LIKE condition!");
+    HBaseRegexParser parser = null;
+    RexNode arg = null;
+    RexLiteral pattern = null, escape = null;
+    String patternStr = null, escapeStr = null;
+    if (condition.getOperands().size() == 2) {
+      // No escape character specified
+      for (RexNode op : condition.getOperands()) {
+        if (op.getKind() == SqlKind.LITERAL) {
+          pattern = (RexLiteral) op;
+        } else {
+          arg = op;
+        }
+      }
+      // Get the PATTERN strings from the corresponding RexLiteral
+      if (pattern.getTypeName() == SqlTypeName.DECIMAL ||
+          pattern.getTypeName() == SqlTypeName.INTEGER) {
+        patternStr = pattern.getValue().toString();
+      } else if (pattern.getTypeName() == SqlTypeName.CHAR) {
+        patternStr = pattern.getValue2().toString();
+      }
+      if (patternStr != null) {
+        parser = new HBaseRegexParser(patternStr);
+      }
+    } else if (condition.getOperands().size() == 3) {
+      // Escape character specified
+      for (RexNode op : condition.getOperands()) {
+        if (op.getKind() == SqlKind.LITERAL) {
+          // Assume first literal specifies PATTERN and the second literal specifies the ESCAPE char
+          if (pattern == null) {
+            pattern = (RexLiteral) op;
+          } else {
+            escape = (RexLiteral) op;
+          }
+        } else {
+          arg = op;
+        }
+      }
+      // Get the PATTERN and ESCAPE strings from the corresponding RexLiteral
+      if (pattern.getTypeName() == SqlTypeName.DECIMAL ||
+          pattern.getTypeName() == SqlTypeName.INTEGER) {
+        patternStr = pattern.getValue().toString();
+      } else if (pattern.getTypeName() == SqlTypeName.CHAR) {
+        patternStr = pattern.getValue2().toString();
+      }
+      if (escape.getTypeName() == SqlTypeName.DECIMAL ||
+          escape.getTypeName() == SqlTypeName.INTEGER) {
+        escapeStr = escape.getValue().toString();
+      } else if (escape.getTypeName() == SqlTypeName.CHAR) {
+        escapeStr = escape.getValue2().toString();
+      }
+      if (patternStr != null && escapeStr != null) {
+        parser = new HBaseRegexParser(patternStr, escapeStr.toCharArray()[0]);
+      }
+    }
+    if (parser != null) {
+      parser.parse();
+      String prefix = parser.getPrefixString();
+      /*
+       * If there is a literal prefix, convert it into an EQUALITY or RANGE predicate
+       */
+      if (prefix != null) {
+        if (prefix.equals(parser.getLikeString())) {
+          // No WILDCARD present. This turns the LIKE predicate to EQUALITY predicate
+          if (arg != null) {
+            return builder.makeCall(SqlStdOperatorTable.EQUALS, arg, pattern);
+          }
+        } else {
+          // WILDCARD present. This turns the LIKE predicate to RANGE predicate
+          byte[] startKey = HConstants.EMPTY_START_ROW;
+          byte[] stopKey = HConstants.EMPTY_END_ROW;
+          startKey = prefix.getBytes(Charsets.UTF_8);
+          stopKey = startKey.clone();
+          boolean isMaxVal = true;
+          for (int i = stopKey.length - 1; i >= 0 ; --i) {
+            int nextByteValue = (0xff & stopKey[i]) + 1;
+            if (nextByteValue < 0xff) {
+              stopKey[i] = (byte) nextByteValue;
+              isMaxVal = false;
+              break;
+            } else {
+              stopKey[i] = 0;
+            }
+          }
+          if (isMaxVal) {
+            stopKey = HConstants.EMPTY_END_ROW;
+          }
+          try {
+            // TODO: This maybe a potential bug since we assume UTF-8 encoding. However, we follow the
+            // current DB implementation. See HBaseFilterBuilder.createHBaseScanSpec "like" CASE statement
+            RexLiteral startKeyLiteral = builder.makeLiteral(new String(startKey,
+                Charsets.UTF_8.toString()));
+            RexLiteral stopKeyLiteral = builder.makeLiteral(new String(stopKey,
+                Charsets.UTF_8.toString()));
+            if (arg != null) {
+              RexNode startPred = builder.makeCall(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL,
+                  arg, startKeyLiteral);
+              RexNode stopPred = builder.makeCall(SqlStdOperatorTable.LESS_THAN, arg, stopKeyLiteral);
+              return builder.makeCall(SqlStdOperatorTable.AND, startPred, stopPred);
+            }
+          } catch (UnsupportedEncodingException ex) {
+            // Encoding not supported - Do nothing!
+            logger.debug("Statistics: convertLikeToRange: Unsupported Encoding Exception -> {}", ex.getMessage());
+          }
+        }
+      }
+    }
+    // Could not convert - return condition as-is.
+    return condition;
+  }
+
+  /*
+   * Compute the selectivity of the given rowCondition. Retrieve the selectivity
+   * for index conditions from the cache
+   */
+  private Pair<Double, Boolean> computeSelectivity(RexNode condition, IndexDescriptor idx, double totalRows,
+      RelNode scanRel, Map<String, Double> baseConditionMap) {
+    double selectivity;
+    boolean guess = false;
+    if (totalRows <= 0) {
+      return new Pair<>(1.0, true);
+    }
+    String conditionAsStr = convertRexToString(condition, scanRel.getRowType());
+    if (condition.getKind() == SqlKind.AND) {
+      selectivity = 1.0;
+      for (RexNode pred : RelOptUtil.conjunctions(condition)) {
+        Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap);
+        if (selPayload.left > 0) {
+          // At least one AND branch is a guess
+          if (selPayload.right == true) {
+            guess = true;
+          }
+          selectivity *= selPayload.left;
+        }
+      }
+    } else if (condition.getKind() == SqlKind.OR) {
+      selectivity = 0.0;
+      for (RexNode pred : RelOptUtil.disjunctions(condition)) {
+        Pair<Double, Boolean> selPayload = computeSelectivity(pred, idx, totalRows, scanRel, baseConditionMap);
+        if (selPayload.left > 0.0) {
+          // At least one OR branch is a guess
+          if (selPayload.right == true) {
+            guess = true;
+          }
+          selectivity += selPayload.left;
+        }
+      }
+      // Cap selectivity of OR'ed predicates at 0.25 if at least one predicate is a guess (Calcite does the same)
+      if (guess && selectivity > 0.25) {
+        selectivity = 0.25;
+      }
+    } else {
+      guess = false;
+      if (baseConditionMap.get(conditionAsStr) != null) {
+        double rowCount = baseConditionMap.get(conditionAsStr);
+        if (rowCount != -1.0) {
+          selectivity = rowCount / totalRows;
+        } else {
+          // Ignore
+          selectivity = -1.0;
+          guess = true;
+        }
+      } else {
+        selectivity = RelMdUtil.guessSelectivity(condition);
+        guess = true;
+      }
+      return new Pair<>(selectivity, guess);
+    }
+    // Cap selectivity to be between 0.0 and 1.0
+    selectivity = Math.min(1.0, selectivity);
+    selectivity = Math.max(0.0, selectivity);
+    logger.debug("Statistics: computeSelectivity: Cache MISS: Computed {} -> {}", conditionAsStr, selectivity);
+    return new Pair<>(selectivity, guess);
+  }
+
+  /*
+   * Filters out indexes from the given collection based on the row key of indexes i.e. after filtering
+   * the given collection would contain only one index for each distinct row key in the collection
+   */
+  private IndexCollection distinctFKeyIndexes(IndexCollection indexes, RelNode scanRel) {
+    IndexCollection distinctIdxCollection = new DrillIndexCollection(scanRel, new HashSet<DrillIndexDescriptor>());
+    Iterator<IndexDescriptor> iterator = indexes.iterator();
+    Map<String, List<IndexDescriptor>> firstColIndexMap = new HashMap<>();
+    while (iterator.hasNext()) {
+      IndexDescriptor index = iterator.next();
+      // If index has columns - the first column is the leading column for the index
+      if (index.getIndexColumns() != null) {
+        List<IndexDescriptor> idxList;
+        String firstCol = convertLExToStr(index.getIndexColumns().get(0));
+        if (firstColIndexMap.get(firstCol) != null) {
+          idxList = firstColIndexMap.get(firstCol);
+        } else {
+          idxList = new ArrayList<>();
+        }
+        idxList.add(index);
+        firstColIndexMap.put(firstCol, idxList);
+      }
+    }
+    for (String firstCol : firstColIndexMap.keySet()) {
+      List<IndexDescriptor> indexesSameFirstCol = firstColIndexMap.get(firstCol);
+      double maxAvgRowSize = -1.0;
+      IndexDescriptor selectedIdx = null;
+      for (IndexDescriptor idx : indexesSameFirstCol) {
+        String tabIdxIdentifier = buildUniqueIndexIdentifier(idx);
+        double idxRowSize = fIStatsCache.get(tabIdxIdentifier).getAvgRowSize();
+        // Prefer index with largest average row-size, breaking ties lexicographically
+        if (idxRowSize > maxAvgRowSize
+            || (idxRowSize == maxAvgRowSize
+                && (selectedIdx == null || idx.getIndexName().compareTo(selectedIdx.getIndexName()) < 0))) {
+          maxAvgRowSize = idxRowSize;
+          selectedIdx = idx;
+        }
+      }
+      assert (selectedIdx != null);
+      distinctIdxCollection.addIndex(selectedIdx);
+    }
+    return distinctIdxCollection;
+  }
+
+  /*
+   * Returns the String representation for the given Logical Expression
+   */
+  private String convertLExToStr(LogicalExpression lex) {
+    StringBuilder sb = new StringBuilder();
+    ExpressionStringBuilder esb = new ExpressionStringBuilder();
+    lex.accept(esb, sb);
+    return sb.toString();
+  }
+
+  /*
+   * Converts the given RexNode condition into a Drill logical expression.
+   */
+  private LogicalExpression convertToLogicalExpression(RexNode condition,
+      RelDataType type, PlannerSettings settings, RexBuilder builder) {
+    LogicalExpression conditionExp;
+    try {
+      conditionExp = DrillOptiq.toDrill(new DrillParseContext(settings), type, builder, condition);
+    } catch (ClassCastException e) {
+      return null;
+    }
+    return conditionExp;
+  }
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index ee35a68..f982278 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -20,7 +20,10 @@ package org.apache.drill.exec.store.mapr.db;
 import java.io.IOException;
 
 import com.mapr.fs.MapRFileStatus;
+import com.mapr.db.index.IndexDesc;
 import com.mapr.fs.tables.TableProperties;
+import org.apache.drill.exec.planner.index.IndexDescriptor;
+import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -33,6 +36,7 @@ import org.apache.drill.exec.store.mapr.TableFormatMatcher;
 import org.apache.drill.exec.store.mapr.TableFormatPlugin;
 
 import org.apache.drill.exec.store.mapr.db.binary.MapRDBBinaryTable;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 public class MapRDBFormatMatcher extends TableFormatMatcher {
@@ -49,6 +53,44 @@ public class MapRDBFormatMatcher extends TableFormatMatcher {
         .getIsMarlinTable();
   }
 
+
+  /**
+   * Get an instance of DrillTable for a particular native secondary index
+   * @param fs
+   * @param selection
+   * @param fsPlugin
+   * @param storageEngineName
+   * @param userName
+   * @param secondaryIndexDesc
+   * @return
+   * @throws IOException
+   */
+  public DrillTable isReadableIndex(DrillFileSystem fs,
+                                    FileSelection selection, FileSystemPlugin fsPlugin,
+                                    String storageEngineName, String userName,
+                                    IndexDescriptor secondaryIndexDesc) throws IOException {
+    FileStatus status = selection.getFirstPath(fs);
+
+    if (!isFileReadable(fs, status)) {
+      return null;
+    }
+
+    MapRDBFormatPlugin fp = (MapRDBFormatPlugin) getFormatPlugin();
+    DrillTable dt = new DynamicDrillTable(fsPlugin,
+        storageEngineName,
+        userName,
+        new FormatSelection(fp.getConfig(),
+            selection));
+
+    // TODO:  Create groupScan using index descriptor
+    dt.setGroupScan(fp.getGroupScan(userName,
+        selection,
+        null /* columns */,
+        (IndexDesc) ((MapRDBIndexDescriptor) secondaryIndexDesc).getOriginalDesc()));
+
+    return dt;
+  }
+
   @Override
   public DrillTable isReadable(DrillFileSystem fs,
                                FileSelection selection, FileSystemPlugin fsPlugin,
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index da4829f..0d1bf04 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -108,7 +108,7 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
   public Set<StoragePluginOptimizerRule> getOptimizerRules() {
     return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT,
         MapRDBPushProjectIntoScan.PROJECT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_PROJECT,
-        MapRDBPushLimitIntoScan.LIMIT_ON_SCAN);
+        MapRDBPushLimitIntoScan.LIMIT_ON_SCAN, MapRDBPushLimitIntoScan.LIMIT_ON_RKJOIN);
   }
 
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
index 1e6bcec..422a269 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -39,6 +39,9 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.drill.exec.planner.index.IndexCollection;
 
 import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.index.IndexDiscover;
+import org.apache.drill.exec.planner.index.IndexDiscoverFactory;
+import org.apache.drill.exec.planner.index.MapRDBIndexDiscover;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 
@@ -307,8 +310,13 @@ public abstract class MapRDBGroupScan extends AbstractDbGroupScan {
 
   @Override
   public IndexCollection getSecondaryIndexCollection(RelNode scanRel) {
-    //XXX to implement for complete secondary index framework
-    return null;
+    IndexDiscover discover = IndexDiscoverFactory.getIndexDiscover(
+        getStorageConfig(), this, scanRel, MapRDBIndexDiscover.class);
+
+    if (discover == null) {
+      logger.error("Null IndexDiscover was found for {}!", scanRel);
+    }
+    return discover.getTableIndex(getTableName());
   }
 
   @JsonIgnore
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
index 1f4b8c9..a26bc80 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
@@ -26,11 +26,13 @@ import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
 import org.apache.drill.exec.planner.physical.LimitPrel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.RowKeyJoinPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hbase.HBaseScanSpec;
 import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
 import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.drill.exec.store.mapr.db.json.RestrictedJsonTableGroupScan;
 
 public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushLimitIntoScan.class);
@@ -59,8 +61,8 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
       if (scan.getGroupScan().supportsLimitPushdown()
             && !limit.isPushDown() && limit.getFetch() != null) {
         if ((scan.getGroupScan() instanceof JsonTableGroupScan
-              && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan()) ) {
-            //|| (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) {
+              && ((JsonTableGroupScan) scan.getGroupScan()).isIndexScan())
+            || (scan.getGroupScan() instanceof RestrictedJsonTableGroupScan)) {
           return true;
         }
       }
@@ -111,6 +113,26 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
     }
   };
 
+  public static final StoragePluginOptimizerRule LIMIT_ON_RKJOIN =
+      new MapRDBPushLimitIntoScan(RelOptHelper.some(LimitPrel.class, RelOptHelper.any(RowKeyJoinPrel.class)),
+          "MapRDBPushLimitIntoScan:Limit_On_RKJoin") {
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+      final RowKeyJoinPrel join = call.rel(1);
+      final LimitPrel limit = call.rel(0);
+      doPushLimitIntoRowKeyJoin(call, limit, null, join);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+      final LimitPrel limit = call.rel(0);
+      // We do not fire this rule if fetch() is null (indicating we have to fetch all the
+      // remaining rows starting from offset.
+      return !limit.isPushDown() && limit.getFetch() != null;
+    }
+  };
+
   protected void doPushLimitIntoGroupScan(RelOptRuleCall call,
       LimitPrel limit, final ProjectPrel project, ScanPrel scan, GroupScan groupScan) {
     try {
@@ -153,5 +175,29 @@ public abstract class MapRDBPushLimitIntoScan extends StoragePluginOptimizerRule
     }
     return null;
   }
+
+  protected void doPushLimitIntoRowKeyJoin(RelOptRuleCall call,
+    LimitPrel limit, final ProjectPrel project, RowKeyJoinPrel join) {
+    final RelNode newChild;
+    try {
+      RelNode left = join.getLeft();
+      RelNode right = join.getRight();
+      final RelNode limitOnLeft = new LimitPrel(left.getCluster(), left.getTraitSet(), left,
+          limit.getOffset(), limit.getFetch());
+      RowKeyJoinPrel newJoin = new RowKeyJoinPrel(join.getCluster(), join.getTraitSet(), limitOnLeft, right,
+          join.getCondition(), join.getJoinType());
+      if (project != null) {
+        final ProjectPrel newProject = new ProjectPrel(project.getCluster(), project.getTraitSet(), newJoin,
+            project.getProjects(), project.getRowType());
+        newChild = newProject;
+      } else {
+        newChild = newJoin;
+      }
+      call.transformTo(newChild);
+      logger.debug("pushLimitIntoRowKeyJoin: Pushed limit on left side of Join " + join.toString());
+    } catch (Exception e) {
+      logger.warn("pushLimitIntoRowKeyJoin: Exception while trying limit pushdown!", e);
+    }
+  }
 }
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
index 2eb84e7..5215868 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushProjectIntoScan.java
@@ -30,8 +30,9 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index a269256..b545262 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -44,10 +44,14 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.planner.index.IndexDescriptor;
+import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor;
+import org.apache.drill.exec.planner.index.MapRDBStatisticsPayload;
 import org.apache.drill.exec.planner.index.Statistics;
 import org.apache.drill.exec.planner.index.MapRDBStatistics;
 import org.apache.drill.exec.planner.cost.PluginCost;
 import org.apache.drill.exec.planner.physical.PartitionFunction;
+import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
@@ -296,6 +300,9 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
 
   @Override
   public ScanStats getScanStats() {
+    if (isIndexScan()) {
+      return indexScanStats();
+    }
     return fullTableScanStats();
   }
 
@@ -359,6 +366,57 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
     }
   }
 
+  private ScanStats indexScanStats() {
+    if (!this.getIndexHint().equals("") &&
+        this.getIndexHint().equals(getIndexDesc().getIndexName())) {
+      logger.debug("JsonIndexGroupScan:{} forcing index {} by making tiny cost", this, this.getIndexHint());
+      return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, 1,1, 0);
+    }
+
+    int totalColNum = STAR_COLS;
+    PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
+    final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
+    boolean filterPushed = (scanSpec.getSerializedFilter() != null);
+    if(scanSpec != null && scanSpec.getIndexDesc() != null) {
+      totalColNum = scanSpec.getIndexDesc().getIncludedFields().size()
+          + scanSpec.getIndexDesc().getIndexedFields().size() + 1;
+    }
+    int numColumns = (columns == null || columns.isEmpty()) ?  totalColNum: columns.size();
+    String idxIdentifier = stats.buildUniqueIndexIdentifier(scanSpec.getIndexDesc().getPrimaryTablePath(),
+        scanSpec.getIndexDesc().getIndexName());
+    double rowCount = stats.getRowCount(scanSpec.getCondition(), idxIdentifier);
+    // rowcount based on index leading columns predicate.
+    double leadingRowCount = stats.getLeadingRowCount(scanSpec.getCondition(), idxIdentifier);
+    double avgRowSize = stats.getAvgRowSize(idxIdentifier, false);
+    // If UNKNOWN, use defaults
+    if (rowCount == ROWCOUNT_UNKNOWN || rowCount == 0) {
+      rowCount = (filterPushed ? 0.0005f : 0.001f) * fullTableRowCount / scanSpec.getIndexDesc().getIndexedFields().size();
+    }
+    // If limit pushdown has occurred - factor it in the rowcount
+    if (this.maxRecordsToRead > 0) {
+      rowCount = Math.min(rowCount, this.maxRecordsToRead);
+    }
+    if (leadingRowCount == ROWCOUNT_UNKNOWN || leadingRowCount == 0) {
+      leadingRowCount = rowCount;
+    }
+    if (avgRowSize == AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
+      avgRowSize = avgColumnSize * numColumns;
+    }
+    double rowsFromDisk = leadingRowCount;
+    if (!filterPushed) {
+      // both start and stop rows are empty, indicating this is a full scan so
+      // use the total rows for calculating disk i/o
+      rowsFromDisk = fullTableRowCount;
+    }
+    double totalBlocks = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this));
+    double numBlocks = Math.ceil(((avgRowSize * rowsFromDisk)/pluginCostModel.getBlockSize(this)));
+    numBlocks = Math.min(totalBlocks, numBlocks);
+    double diskCost = numBlocks * pluginCostModel.getSequentialBlockReadCost(this);
+    logger.debug("index_plan_info: JsonIndexGroupScan:{} - indexName:{}: rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, rowsFromDisk {}, diskCost:{}",
+        System.identityHashCode(this), scanSpec.getIndexDesc().getIndexName(), rowCount, avgRowSize, numBlocks, totalBlocks, rowsFromDisk, diskCost);
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
+  }
+
   @Override
   @JsonIgnore
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
@@ -412,6 +470,142 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
     return true;
   }
 
+
+  @Override
+  public RestrictedJsonTableGroupScan getRestrictedScan(List<SchemaPath> columns) {
+    RestrictedJsonTableGroupScan newScan =
+        new RestrictedJsonTableGroupScan(this.getUserName(),
+            (FileSystemPlugin) this.getStoragePlugin(),
+            this.getFormatPlugin(),
+            this.getScanSpec(),
+            this.getColumns(),
+            this.getStatistics());
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  /**
+   * Get the estimated average rowsize. DO NOT call this API directly.
+   * Call the stats API instead which modifies the counts based on preference options.
+   * @param index, to use for generating the estimate
+   * @return row count post filtering
+   */
+  public MapRDBStatisticsPayload getAverageRowSizeStats(IndexDescriptor index) {
+    IndexDesc indexDesc = null;
+    double avgRowSize = AVG_ROWSIZE_UNKNOWN;
+
+    if (index != null) {
+      indexDesc = (IndexDesc)((MapRDBIndexDescriptor)index).getOriginalDesc();
+    }
+    // If no index is specified, get it from the primary table
+    if(indexDesc == null && scanSpec.isSecondaryIndex()) {
+      throw new UnsupportedOperationException("getAverageRowSizeStats should be invoked on primary table");
+    }
+
+    // Get the index table or primary table and use the DB API to get the estimated number of rows. For size estimates,
+    // we assume that all the columns would be read from the disk.
+    final Table table = this.formatPlugin.getJsonTableCache().getTable(scanSpec.getTableName(), indexDesc, getUserName());
+
+    if (table != null) {
+      final MetaTable metaTable = table.getMetaTable();
+      if (metaTable != null) {
+        avgRowSize = metaTable.getAverageRowSize();
+      }
+    }
+    logger.debug("index_plan_info: getEstimatedRowCount obtained from DB Client for {}: indexName: {}, indexInfo: {}, " +
+            "avgRowSize: {}, estimatedSize {}", this, (indexDesc == null ? "null" : indexDesc.getIndexName()),
+        (indexDesc == null ? "null" : indexDesc.getIndexInfo()), avgRowSize);
+    return new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, avgRowSize);
+  }
+
+  /**
+   * Get the estimated statistics after applying the {@link RexNode} condition. DO NOT call this API directly.
+   * Call the stats API instead which modifies the counts based on preference options.
+   * @param condition, filter to apply
+   * @param index, to use for generating the estimate
+   * @return row count post filtering
+   */
+  public MapRDBStatisticsPayload getFirstKeyEstimatedStats(QueryCondition condition, IndexDescriptor index, RelNode scanRel) {
+    IndexDesc indexDesc = null;
+    if (index != null) {
+      indexDesc = (IndexDesc)((MapRDBIndexDescriptor)index).getOriginalDesc();
+    }
+    return getFirstKeyEstimatedStatsInternal(condition, indexDesc, scanRel);
+  }
+
+  /**
+   * Get the estimated statistics after applying the {@link QueryCondition} condition
+   * @param condition, filter to apply
+   * @param index, to use for generating the estimate
+   * @return {@link MapRDBStatisticsPayload} statistics
+   */
+  private MapRDBStatisticsPayload getFirstKeyEstimatedStatsInternal(QueryCondition condition, IndexDesc index, RelNode scanRel) {
+    // double totalRows = getRowCount(null, scanPrel);
+
+    // If no index is specified, get it from the primary table
+    if(index == null && scanSpec.isSecondaryIndex()) {
+      // If stats not cached get it from the table.
+      //table = MapRDB.getTable(scanSpec.getPrimaryTablePath());
+      throw new UnsupportedOperationException("getFirstKeyEstimatedStats should be invoked on primary table");
+    }
+
+    // Get the index table or primary table and use the DB API to get the estimated number of rows. For size estimates,
+    // we assume that all the columns would be read from the disk.
+    final Table table = this.formatPlugin.getJsonTableCache().getTable(scanSpec.getTableName(), index, getUserName());
+
+    if (table != null) {
+      // Factor reflecting confidence in the DB estimates. If a table has few tablets, the tablet-level stats
+      // might be off. The decay scalingFactor will reduce estimates when one tablet represents a significant percentage
+      // of the entire table.
+      double scalingFactor = 1.0;
+      boolean isFullScan = false;
+      final MetaTable metaTable = table.getMetaTable();
+      com.mapr.db.scan.ScanStats stats = (condition == null)
+          ? metaTable.getScanStats() : metaTable.getScanStats(condition);
+      if (index == null && condition != null) {
+        // Given table condition might not be on leading column. Check if the rowcount matches full table rows.
+        // In that case no leading key present or does not prune enough. Treat it like so.
+        com.mapr.db.scan.ScanStats noConditionPTabStats = metaTable.getScanStats();
+        if (stats.getEstimatedNumRows() == noConditionPTabStats.getEstimatedNumRows()) {
+          isFullScan = true;
+        }
+      }
+      // Use the scalingFactor only when a condition filters out rows from the table. If no condition is present, all rows
+      // should be selected. So the scalingFactor should not reduce the returned rows
+      if (condition != null && !isFullScan) {
+        double forcedScalingFactor = PrelUtil.getSettings(scanRel.getCluster()).getIndexStatsRowCountScalingFactor();
+        // Accuracy is defined as 1 - Error where Error = # Boundary Tablets (2) / # Total Matching Tablets.
+        // For 2 or less matching tablets, the error is assumed to be 50%. The Sqrt gives the decaying scalingFactor
+        if (stats.getTabletCount() > 2) {
+          double accuracy = 1.0 - (2.0/stats.getTabletCount());
+          scalingFactor = Math.min(1.0, 1.0 / Math.sqrt(1.0 / accuracy));
+        } else {
+          scalingFactor = 0.5;
+        }
+        if (forcedScalingFactor < 1.0
+            && metaTable.getScanStats().getTabletCount() < PluginConstants.JSON_TABLE_NUM_TABLETS_PER_INDEX_DEFAULT) {
+          // User forced confidence scalingFactor for small tables (assumed as less than 32 tablets (~512 MB))
+          scalingFactor = forcedScalingFactor;
+        }
+      }
+      logger.info("index_plan_info: getEstimatedRowCount obtained from DB Client for {}: indexName: {}, indexInfo: {}, " +
+              "condition: {} rowCount: {}, avgRowSize: {}, estimatedSize {}, tabletCount {}, totalTabletCount {}, " +
+              "scalingFactor {}",
+          this, (index == null ? "null" : index.getIndexName()), (index == null ? "null" : index.getIndexInfo()),
+          (condition == null ? "null" : condition.toString()), stats.getEstimatedNumRows(),
+          (stats.getEstimatedNumRows() == 0 ? 0 : stats.getEstimatedSize()/stats.getEstimatedNumRows()),
+          stats.getEstimatedSize(), stats.getTabletCount(), metaTable.getScanStats().getTabletCount(), scalingFactor);
+      return new MapRDBStatisticsPayload(scalingFactor * stats.getEstimatedNumRows(), scalingFactor * stats.getEstimatedNumRows(),
+          ((stats.getEstimatedNumRows() == 0 ? 0 : (double)stats.getEstimatedSize()/stats.getEstimatedNumRows())));
+    } else {
+      logger.info("index_plan_info: getEstimatedRowCount: {} indexName: {}, indexInfo: {}, " +
+              "condition: {} rowCount: UNKNOWN, avgRowSize: UNKNOWN", this, (index == null ? "null" : index.getIndexName()),
+          (index == null ? "null" : index.getIndexInfo()), (condition == null ? "null" : condition.toString()));
+      return new MapRDBStatisticsPayload(ROWCOUNT_UNKNOWN, ROWCOUNT_UNKNOWN, AVG_ROWSIZE_UNKNOWN);
+    }
+  }
+
+
   /**
    * Set the row count resulting from applying the {@link RexNode} condition. Forced row counts will take
    * precedence over stats row counts
@@ -518,9 +712,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan implements IndexGroupSca
   @Override
   @JsonIgnore
   public PartitionFunction getRangePartitionFunction(List<FieldReference> refList) {
-
-    return null;
-    //new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin());
+    return new JsonTableRangePartitionFunction(refList, scanSpec.getTableName(), this.getUserName(), this.getFormatPlugin());
   }
 
   /**
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
new file mode 100644
index 0000000..acaa6ca
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.planner.physical.AbstractRangePartitionFunction;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.vector.ValueVector;
+import org.ojai.store.QueryCondition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.mapr.db.Table;
+import com.mapr.db.impl.ConditionImpl;
+import com.mapr.db.impl.IdCodec;
+import com.mapr.db.impl.ConditionNode.RowkeyRange;
+import com.mapr.db.scan.ScanRange;
+import com.mapr.fs.jni.MapRConstants;
+import com.mapr.org.apache.hadoop.hbase.util.Bytes;
+
+@JsonTypeName("jsontable-range-partition-function")
+public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunction {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonTableRangePartitionFunction.class);
+
+  @JsonProperty("refList")
+  protected List<FieldReference> refList;
+
+  @JsonProperty("tableName")
+  protected String tableName;
+
+  @JsonIgnore
+  protected String userName;
+
+  @JsonIgnore
+  protected ValueVector partitionKeyVector = null;
+
+  // List of start keys of the scan ranges for the table.
+  @JsonProperty
+  protected List<byte[]> startKeys = null;
+
+  // List of stop keys of the scan ranges for the table.
+  @JsonProperty
+  protected List<byte[]> stopKeys = null;
+
+  @JsonCreator
+  public JsonTableRangePartitionFunction(
+      @JsonProperty("refList") List<FieldReference> refList,
+      @JsonProperty("tableName") String tableName,
+      @JsonProperty("startKeys") List<byte[]> startKeys,
+      @JsonProperty("stopKeys") List<byte[]> stopKeys) {
+    this.refList = refList;
+    this.tableName = tableName;
+    this.startKeys = startKeys;
+    this.stopKeys = stopKeys;
+  }
+
+  public JsonTableRangePartitionFunction(List<FieldReference> refList,
+      String tableName, String userName, MapRDBFormatPlugin formatPlugin) {
+    this.refList = refList;
+    this.tableName = tableName;
+    this.userName = userName;
+    initialize(formatPlugin);
+  }
+
+  @JsonProperty("refList")
+  @Override
+  public List<FieldReference> getPartitionRefList() {
+    return refList;
+  }
+
+  @Override
+  public void setup(List<VectorWrapper<?>> partitionKeys) {
+    if (partitionKeys.size() != 1) {
+      throw new UnsupportedOperationException(
+          "Range partitioning function supports exactly one partition column; encountered " + partitionKeys.size());
+    }
+
+    VectorWrapper<?> v = partitionKeys.get(0);
+
+    partitionKeyVector = v.getValueVector();
+
+    Preconditions.checkArgument(partitionKeyVector != null, "Found null partitionKeVector.") ;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj instanceof JsonTableRangePartitionFunction) {
+      JsonTableRangePartitionFunction rpf = (JsonTableRangePartitionFunction) obj;
+      List<FieldReference> thisPartRefList = this.getPartitionRefList();
+      List<FieldReference> otherPartRefList = rpf.getPartitionRefList();
+      if (thisPartRefList.size() != otherPartRefList.size()) {
+        return false;
+      }
+      for (int refIdx=0; refIdx<thisPartRefList.size(); refIdx++) {
+        if (!thisPartRefList.get(refIdx).equals(otherPartRefList.get(refIdx))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int eval(int index, int numPartitions) {
+
+	  String key = partitionKeyVector.getAccessor().getObject(index).toString();
+	  byte[] encodedKey = IdCodec.encodeAsBytes(key);
+
+    int tabletId = -1;
+
+    // Do a 'range' binary search through the list of start-stop keys to find nearest range.  Assumption is
+    // that the list of start keys is ordered (this is ensured by MetaTable.getScanRanges())
+    // TODO: This search should ideally be delegated to MapR-DB once an appropriate API is available
+    // to optimize this
+    int low = 0, high = startKeys.size() - 1;
+    while (low <= high) {
+      int mid = low + (high-low)/2;
+
+      byte[] start = startKeys.get(mid);
+      byte[] stop  = stopKeys.get(mid);
+
+      // Check if key is present in the mid interval of [start, stop].
+      // Account for empty byte array start/stop
+      if ( (Bytes.compareTo(encodedKey, start) >= 0 ||
+             Bytes.equals(start, MapRConstants.EMPTY_BYTE_ARRAY)
+           ) &&
+           (Bytes.compareTo(encodedKey, stop) < 0 ||
+             Bytes.equals(stop, MapRConstants.EMPTY_BYTE_ARRAY)
+           )
+         ) {
+        tabletId = mid;
+        break;
+      }
+
+      if (Bytes.compareTo(encodedKey, start) >= 0) {
+        // key is greater, ignore left side
+        low = mid + 1;
+      } else {
+        // key is smaller, ignore right side
+        high = mid - 1;
+      }
+    }
+
+    if (tabletId < 0) {
+      tabletId = 0;
+      logger.warn("Key {} was not found in any of the start-stop ranges. Using default tabletId {}", key, tabletId);
+    }
+
+    int partitionId = tabletId % numPartitions;
+
+    logger.trace("Key = {}, tablet id = {}, partition id = {}", key, tabletId, partitionId);
+
+    return partitionId;
+  }
+
+
+  public void initialize(MapRDBFormatPlugin plugin) {
+
+    // get the table handle from the table cache
+    Table table = plugin.getJsonTableCache().getTable(tableName, userName);
+
+    // Set the condition to null such that all scan ranges are retrieved for the primary table.
+    // The reason is the row keys could typically belong to any one of the tablets of the table, so
+    // there is no use trying to get only limited set of scan ranges.
+    // NOTE: here we use the restrictedScanRangeSizeMB because the range partitioning should be parallelized
+    // based on the number of scan ranges on the RestrictedJsonTableGroupScan.
+    List<ScanRange> ranges = table.getMetaTable().getScanRanges(plugin.getRestrictedScanRangeSizeMB());
+
+    this.startKeys = Lists.newArrayList();
+    this.stopKeys = Lists.newArrayList();
+
+    logger.debug("Num scan ranges for table {} = {}", table.getName(), ranges.size());
+
+    int count = 0;
+    for (ScanRange r : ranges) {
+      QueryCondition condition = r.getCondition();
+      List<RowkeyRange> rowkeyRanges =  ((ConditionImpl)condition).getRowkeyRanges();
+      byte[] start = rowkeyRanges.get(0).getStartRow();
+      byte[] stop  = rowkeyRanges.get(rowkeyRanges.size() - 1).getStopRow();
+
+      Preconditions.checkNotNull(start, String.format("Encountered a null start key at position %d for scan range condition %s.", count, condition.toString()));
+      Preconditions.checkNotNull(stop, String.format("Encountered a null stop key at position %d for scan range condition %s.", count, condition.toString()));
+
+      if (count > 0) {
+        // after the first start key, rest should be non-empty
+        Preconditions.checkState( !(Bytes.equals(start, MapRConstants.EMPTY_BYTE_ARRAY)), String.format("Encountered an empty start key at position %d", count));
+      }
+
+      if (count < ranges.size() - 1) {
+        // except for the last stop key, rest should be non-empty
+        Preconditions.checkState( !(Bytes.equals(stop, MapRConstants.EMPTY_BYTE_ARRAY)), String.format("Encountered an empty stop key at position %d", count));
+      }
+
+      startKeys.add(start);
+      stopKeys.add(stop);
+      count++;
+    }
+
+    // check validity; only need to check one of the lists since they are populated together
+    Preconditions.checkArgument(startKeys.size() > 0, "Found empty list of start/stopKeys.");
+
+    Preconditions.checkState(startKeys.size() == ranges.size(),
+        String.format("Mismatch between the lengths: num start keys = %d, num scan ranges = %d", startKeys.size(), ranges.size()));
+
+    Preconditions.checkState(stopKeys.size() == ranges.size(),
+        String.format("Mismatch between the lengths: num stop keys = %d, num scan ranges = %d", stopKeys.size(), ranges.size()));
+
+  }
+
+}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
new file mode 100644
index 0000000..48ad96d
--- /dev/null
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.mapr.db.json;
+
+import java.util.List;
+import java.util.NavigableMap;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.planner.index.MapRDBStatistics;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.index.Statistics;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScan;
+import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec;
+import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
+
+/**
+ * A RestrictedJsonTableGroupScan encapsulates (along with a subscan) the functionality
+ * for doing restricted (i.e skip) scan rather than sequential scan.  The skipping is based
+ * on a supplied set of row keys (primary keys) from a join operator.
+ */
+@JsonTypeName("restricted-json-scan")
+public class RestrictedJsonTableGroupScan extends JsonTableGroupScan {
+
+  @JsonCreator
+  public RestrictedJsonTableGroupScan(@JsonProperty("userName") String userName,
+                            @JsonProperty("storage") FileSystemPlugin storagePlugin,
+                            @JsonProperty("format") MapRDBFormatPlugin formatPlugin,
+                            @JsonProperty("scanSpec") JsonScanSpec scanSpec, /* scan spec of the original table */
+                            @JsonProperty("columns") List<SchemaPath> columns,
+                            @JsonProperty("")MapRDBStatistics statistics) {
+    super(userName, storagePlugin, formatPlugin, scanSpec, columns, statistics);
+  }
+
+  // TODO:  this method needs to be fully implemented
+  protected RestrictedMapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) {
+    JsonScanSpec spec = scanSpec;
+    RestrictedMapRDBSubScanSpec subScanSpec =
+        new RestrictedMapRDBSubScanSpec(
+        spec.getTableName(),
+        getRegionsToScan().get(tfi), spec.getSerializedFilter(), getUserName());
+    return subScanSpec;
+  }
+
+  protected NavigableMap<TabletFragmentInfo, String> getRegionsToScan() {
+    return getRegionsToScan(formatPlugin.getRestrictedScanRangeSizeMB());
+  }
+
+  @Override
+  public MapRDBSubScan getSpecificScan(int minorFragmentId) {
+    assert minorFragmentId < endpointFragmentMapping.size() : String.format(
+        "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
+        minorFragmentId);
+    RestrictedMapRDBSubScan subscan =
+        new RestrictedMapRDBSubScan(getUserName(), formatPlugin,
+        getEndPointFragmentMapping(minorFragmentId), columns, maxRecordsToRead, TABLE_JSON);
+
+    return subscan;
+  }
+
+  private List<RestrictedMapRDBSubScanSpec> getEndPointFragmentMapping(int minorFragmentId) {
+    List<RestrictedMapRDBSubScanSpec> restrictedSubScanSpecList = Lists.newArrayList();
+    List<MapRDBSubScanSpec> subScanSpecList = endpointFragmentMapping.get(minorFragmentId);
+    for(MapRDBSubScanSpec s : subScanSpecList) {
+      restrictedSubScanSpecList.add((RestrictedMapRDBSubScanSpec) s);
+    }
+    return restrictedSubScanSpecList;
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The RestrictedJsonTableGroupScan to clone
+   */
+  private RestrictedJsonTableGroupScan(RestrictedJsonTableGroupScan that) {
+    super(that);
+  }
+
+  @Override
+  public GroupScan clone(JsonScanSpec scanSpec) {
+    RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
+    newScan.scanSpec = scanSpec;
+    newScan.resetRegionsToScan(); // resetting will force recalculation
+    return newScan;
+  }
+
+  @Override
+  public GroupScan clone(List<SchemaPath> columns) {
+    RestrictedJsonTableGroupScan newScan = new RestrictedJsonTableGroupScan(this);
+    newScan.columns = columns;
+    return newScan;
+  }
+
+  @Override
+  @JsonIgnore
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new RestrictedJsonTableGroupScan(this);
+  }
+
+  @Override
+  public ScanStats getScanStats() {
+    //TODO: ideally here we should use the rowcount from index scan, and multiply a factor of restricted scan
+    double rowCount;
+    PluginCost pluginCostModel = formatPlugin.getPluginCostModel();
+    final int avgColumnSize = pluginCostModel.getAverageColumnSize(this);
+    int numColumns = (columns == null || columns.isEmpty()) ?  STAR_COLS: columns.size();
+    // Get the restricted group scan row count - same as the right side index rows
+    rowCount = computeRestrictedScanRowcount();
+    // Get the average row size of the primary table
+    double avgRowSize = stats.getAvgRowSize(null, true);
+    if (avgRowSize == Statistics.AVG_ROWSIZE_UNKNOWN || avgRowSize == 0) {
+      avgRowSize = avgColumnSize * numColumns;
+    }
+    // restricted scan does random lookups and each row may belong to a different block, with the number
+    // of blocks upper bounded by the total num blocks in the primary table
+    double totalBlocksPrimary = Math.ceil((avgRowSize * fullTableRowCount)/pluginCostModel.getBlockSize(this));
+    double numBlocks = Math.min(totalBlocksPrimary, rowCount);
+    double diskCost = numBlocks * pluginCostModel.getRandomBlockReadCost(this);
+    // For non-covering plans, the dominating cost would be of the join back. Reduce it using the factor
+    // for biasing towards non-covering plans.
+    diskCost *= stats.getRowKeyJoinBackIOFactor();
+    logger.debug("RestrictedJsonGroupScan:{} rowCount:{}, avgRowSize:{}, blocks:{}, totalBlocks:{}, diskCost:{}",
+        System.identityHashCode(this), rowCount, avgRowSize, numBlocks, totalBlocksPrimary, diskCost);
+    return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost);
+  }
+
+  private double computeRestrictedScanRowcount() {
+    double rowCount = Statistics.ROWCOUNT_UNKNOWN;
+    // The rowcount should be the same as the build side which was FORCED by putting it in forcedRowCountMap
+    if (forcedRowCountMap.get(null) != null) {
+      rowCount = forcedRowCountMap.get(null);
+    }
+    // If limit pushdown has occurred - factor it in the rowcount
+    if (rowCount == Statistics.ROWCOUNT_UNKNOWN || rowCount == 0) {
+      rowCount = (0.001f * fullTableRowCount);
+    }
+    if (this.maxRecordsToRead > 0) {
+      rowCount = Math.min(rowCount, this.maxRecordsToRead);
+    }
+    return rowCount;
+  }
+
+  @Override
+  public boolean isRestrictedScan() {
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "RestrictedJsonTableGroupScan [ScanSpec=" + scanSpec + ", columns=" + columns
+        + ", rowcount=" + computeRestrictedScanRowcount()
+        + (maxRecordsToRead>0? ", limit=" + maxRecordsToRead : "")
+        + (getMaxParallelizationWidth()>0? ", maxwidth=" + getMaxParallelizationWidth() : "") + "]";
+  }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java
new file mode 100644
index 0000000..9ac27b4
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexHintPlanTest.java
@@ -0,0 +1,171 @@
+package com.mapr.drill.maprdb.tests.index;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.mapr.drill.maprdb.tests.json.BaseJsonTest;
+import com.mapr.tests.annotations.ClusterTest;
+import org.apache.drill.PlanTestBase;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category(ClusterTest.class)
+public class IndexHintPlanTest extends IndexPlanTest {
+
+    private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`";
+
+    @Test
+    // A simple testcase with index hint on a table which has only one index for a column t.id.ssn;
+    // This should pick i_ssn index for the query
+    public void testSimpleIndexHint() throws Exception {
+        String hintquery = "SELECT  t.id.ssn as ssn FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_ssn')) as t " +
+                " where t.id.ssn = '100007423'";
+
+        String query = "SELECT t.id.ssn as ssn FROM hbase.`index_test_primary` as t where t.id.ssn = '100007423'";
+        test(defaultHavingIndexPlan);
+        PlanTestBase.testPlanMatchingPatterns(hintquery,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        //default plan picked by optimizer.
+        PlanTestBase.testPlanMatchingPatterns(query,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+                new String[]{"RowKeyJoin"}
+        );
+        testBuilder()
+                .sqlQuery(hintquery)
+                .ordered()
+                .baselineColumns("ssn").baselineValues("100007423")
+                .go();
+
+    }
+
+
+    @Test
+    // A testcase where there are multiple index to pick from but only picks the index provided as hint.
+    // A valid index is provided as hint and it is useful during the index selection process, hence it will be selected.
+    public void testHintCaseWithMultipleIndexes_1() throws Exception {
+
+        String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_city')) as t " +
+                " where t.address.state = 'pc'";
+
+        String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'";
+        test(defaultHavingIndexPlan);
+        PlanTestBase.testPlanMatchingPatterns(hintquery,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_city"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        //default plan picked by optimizer
+        PlanTestBase.testPlanMatchingPatterns(query,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        return;
+    }
+
+    @Test
+    // A testcase where there are multiple index to pick from but only picks the index provided as hint.
+    // A valid index is provided as hint and it is useful during the index selection process, hence it will be selected.
+    // Difference between this testcase and the one before this is that index name is switched. This shows that index hint makes sure to select only one
+    // valid index specified as hint.
+    public void testHintCaseWithMultipleIndexes_2() throws Exception {
+
+        String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_age_phone')) as t " +
+                " where t.address.state = 'pc'";
+
+        String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'";
+        test(defaultHavingIndexPlan);
+        PlanTestBase.testPlanMatchingPatterns(hintquery,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        //default plan picked by query optimizer.
+        PlanTestBase.testPlanMatchingPatterns(query,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        return;
+    }
+
+    //Negative cases
+
+    @Test
+    // A testcase where there are multiple index to pick from but none of them equals to the index provided as hint (index hint is wrong).
+    //In this index is not at all present in the table hence it falls back to the case where the index itself is not given.
+    //Hence here one of the i_state_city or i_state_age_lic will be selected depending upon the cost.
+    public void testWithMultipleIndexesButNoIndexWithHint() throws Exception {
+
+        String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_state_and_city')) as t " +
+                " where t.address.state = 'pc'";
+        test(defaultHavingIndexPlan);
+        PlanTestBase.testPlanMatchingPatterns(hintquery,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        return;
+    }
+
+    @Test
+    // A testcase where there are multiple index to pick from but none of them equals to the index provided as hint and the hint index is valid.
+    // Here the index name given is valid (i.e it is present in the table) but it is not useful.
+    // This case falls back to full table scan.
+    public void testWithMultipleIndexesButNoIndexWithValidHint() throws Exception {
+
+        String hintquery = "SELECT t.`address`.`state` AS `state` FROM table(hbase.`index_test_primary`(type => 'maprdb', index => 'i_ssn')) as t " +
+                " where t.address.state = 'pc'";
+
+        String query = "SELECT t.`address`.`state` AS `state` FROM hbase.`index_test_primary` as t where t.address.state = 'pc'";
+        test(defaultHavingIndexPlan);
+        PlanTestBase.testPlanMatchingPatterns(hintquery,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary"},
+                new String[]{"RowKeyJoin", "indexName="}
+        );
+
+        PlanTestBase.testPlanMatchingPatterns(query,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=(i_state_city|i_state_age_phone)"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        return;
+    }
+
+
+    @Test
+    //Covering index should be generated for a simple query instead of a RowKeyJoin.
+    public void testSimpleNoRowKeyJoin() throws Exception {
+        String query = "SELECT `reverseid` from table(hbase.`index_test_primary`(type => 'maprdb', index => 'hash_i_reverseid'))  " +
+                "where `reverseid` = 1234";
+
+        test(defaultHavingIndexPlan);
+        PlanTestBase.testPlanMatchingPatterns(query,
+                new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=hash_i_reverseid"},
+                new String[]{"RowKeyJoin"}
+        );
+
+        return;
+   }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
new file mode 100644
index 0000000..c0ea2a0
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
@@ -0,0 +1,1715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import com.mapr.db.Admin;
+import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
+import com.mapr.drill.maprdb.tests.json.BaseJsonTest;
+import com.mapr.tests.annotations.ClusterTest;
+import org.apache.drill.PlanTestBase;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.apache.drill.common.config.DrillConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+import java.util.Properties;
+
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category(ClusterTest.class)
+public class IndexPlanTest extends BaseJsonTest {
+
+  final static String PRIMARY_TABLE_NAME = "/tmp/index_test_primary";
+
+  final static int PRIMARY_TABLE_SIZE = 10000;
+  private static final String sliceTargetSmall = "alter session set `planner.slice_target` = 1";
+  private static final String sliceTargetDefault = "alter session reset `planner.slice_target`";
+  private static final String noIndexPlan = "alter session set `planner.enable_index_planning` = false";
+  private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`";
+  private static final String disableHashAgg = "alter session set `planner.enable_hashagg` = false";
+  private static final String enableHashAgg =  "alter session set `planner.enable_hashagg` = true";
+  private static final String defaultnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.025";
+  private static final String incrnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.25";
+  private static final String disableFTS = "alter session set `planner.disable_full_table_scan` = true";
+  private static final String enableFTS = "alter session reset `planner.disable_full_table_scan`";
+  private static final String preferIntersectPlans = "alter session set `planner.index.prefer_intersect_plans` = true";
+  private static final String defaultIntersectPlans = "alter session reset `planner.index.prefer_intersect_plans`";
+  private static final String lowRowKeyJoinBackIOFactor
+      = "alter session set `planner.index.rowkeyjoin_cost_factor` = 0.01";
+  private static final String defaultRowKeyJoinBackIOFactor
+      = "alter session reset `planner.index.rowkeyjoin_cost_factor`";
+
+  /**
+   *  A sample row of this 10K table:
+   ------------------+-----------------------------+--------+
+   | 1012  | {"city":"pfrrs","state":"pc"}  | {"email":"KfFzKUZwNk@gmail.com","phone":"6500005471"}  |
+   {"ssn":"100007423"}  | {"fname":"KfFzK","lname":"UZwNk"}  | {"age":53.0,"income":45.0}  | 1012   |
+   *
+   * This test suite generate random content to fill all the rows, since the random function always start from
+   * the same seed for different runs, when the row count is not changed, the data in table will always be the same,
+   * thus the query result could be predicted and verified.
+   */
+
+  @BeforeClass
+  public static void setupTableIndexes() throws Exception {
+
+    Properties overrideProps = new Properties();
+    overrideProps.setProperty("format-maprdb.json.useNumRegionsForDistribution", "true");
+    updateTestCluster(1, DrillConfig.create(overrideProps));
+
+    MaprDBTestsSuite.setupTests();
+    MaprDBTestsSuite.createPluginAndGetConf(getDrillbitContext());
+
+    test(incrnonCoveringSelectivityThreshold);
+
+    System.out.print("setupTableIndexes begins");
+    Admin admin = MaprDBTestsSuite.getAdmin();
+    if (admin != null) {
+      if (admin.tableExists(PRIMARY_TABLE_NAME)) {
+        admin.deleteTable(PRIMARY_TABLE_NAME);
+      }
+    }
+
+    LargeTableGen gen = new LargeTableGen(MaprDBTestsSuite.getAdmin());
+    /**
+     * indexDef is an array of string, LargeTableGen.generateTableWithIndex will take it as parameter to generate indexes
+     * for primary table.
+     * indexDef[3*i] defines i-th index's indexName, NOTE: IF the name begins with "hash", it is a hash index
+     * indexDef[3*i+1] indexed field,
+     * and indexDef[3*i+2] defines i-th index's non-indexed fields
+     */
+    final String[] indexDef = //null;
+        {"i_ssn", "id.ssn", "contact.phone",
+            "i_state_city", "address.state,address.city", "name.fname,name.lname",//mainly for composite key test
+            "i_age", "personal.age", "",
+            "i_income", "personal.income", "",
+            "i_lic", "driverlicense", "reverseid",
+            "i_state_city_dl", "address.state,address.city", "driverlicense",
+            "i_cast_int_ssn", "$CAST(id.ssn@INT)", "contact.phone",
+            "i_cast_vchar_lic", "$CAST(driverlicense@STRING)","contact.email",
+            "i_state_age_phone", "address.state,personal.age,contact.phone", "name.fname",
+            "i_cast_age_income_phone", "$CAST(personal.age@INT),$CAST(personal.income@INT),contact.phone", "name.fname",
+            "i_age_with_fname", "personal.age", "name.fname",
+            "hash_i_reverseid", "reverseid", "",
+            "hash_i_cast_timestamp_firstlogin", "$CAST(activity.irs.firstlogin@TIMESTAMP)", "id.ssn"
+        };
+    gen.generateTableWithIndex(PRIMARY_TABLE_NAME, PRIMARY_TABLE_SIZE, indexDef);
+  }
+
+  @AfterClass
+  public static void cleanupTableIndexes() throws Exception {
+    Admin admin = MaprDBTestsSuite.getAdmin();
+    if (admin != null) {
+      if (admin.tableExists(PRIMARY_TABLE_NAME)) {
+   //     admin.deleteTable(PRIMARY_TABLE_NAME);
+      }
+    }
+    test(defaultnonCoveringSelectivityThreshold);
+  }
+
+  @Test
+  public void CTASTestTable() throws Exception {
+    String ctasQuery = "CREATE TABLE hbase.tmp.`backup_index_test_primary` " +
+        "AS SELECT * FROM hbase.`index_test_primary` as t ";
+    test(ctasQuery);
+    test("DROP TABLE IF EXISTS hbase.tmp.`backup_index_test_primary`");
+  }
+
+  @Test
+  public void CoveringPlanWithNonIndexedField() throws Exception {
+
+    String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn = '100007423'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    System.out.println("Covering Plan Verified!");
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("phone").baselineValues("6500005471")
+        .go();
+    return;
+
+  }
+
+  @Test
+  public void CoveringPlanWithOnlyIndexedField() throws Exception {
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn = '100007423'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    System.out.println("Covering Plan Verified!");
+
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("ssn").baselineValues("100007423")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void NoIndexPlanForNonIndexField() throws Exception {
+
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.contact.phone = '6500005471'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary"},
+        new String[]{"RowKeyJoin", "indexName="}
+    );
+
+    System.out.println("No Index Plan Verified!");
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("ssn").baselineValues("100007423")
+        .baselineColumns("ssn").baselineValues("100007632")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void NonCoveringPlan() throws Exception {
+
+    String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn = '100007423'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary,",
+           ".*JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_ssn"},
+        new String[]{}
+    );
+
+    System.out.println("Non-Covering Plan Verified!");
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("fname").baselineValues("KfFzK")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void RangeConditionIndexPlan() throws Exception {
+    String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age > 52 AND t.name.fname='KfFzK'";
+    try {
+      test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";");
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary,",
+                      ".*JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=(i_age|i_age_with_fname)"},
+              new String[]{}
+      );
+      testBuilder()
+              .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+              .optionSettingQueriesForTestQuery(lowRowKeyJoinBackIOFactor)
+              .optionSettingQueriesForBaseline(noIndexPlan)
+              .unOrdered()
+              .sqlQuery(query)
+              .sqlBaselineQuery(query)
+              .build()
+              .run();
+
+      testBuilder()
+              .optionSettingQueriesForTestQuery(sliceTargetSmall)
+              .optionSettingQueriesForBaseline(sliceTargetDefault)
+              .unOrdered()
+              .sqlQuery(query)
+              .sqlBaselineQuery(query)
+              .build()
+              .run();
+    } finally {
+      test(defaultRowKeyJoinBackIOFactor);
+    }
+  }
+
+  @Test
+  public void CoveringWithSimpleFieldsOnly() throws Exception {
+
+    String query = "SELECT t._id AS `tid` FROM hbase.`index_test_primary` as t " +
+        " where t.driverlicense = 100007423";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_lic"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("tid").baselineValues("1012")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void NonCoveringWithSimpleFieldsOnly() throws Exception {
+
+    String query = "SELECT t.rowid AS `rowid` FROM hbase.`index_test_primary` as t " +
+        " where t.driverlicense = 100007423";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*" +
+            "RestrictedJsonTableGroupScan.*tableName=.*index_test_primary(.*[\n\r])+.*" +
+            "JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=i_lic"},
+        new String[]{}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("rowid").baselineValues("1012")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void NonCoveringWithExtraConditonOnPrimary() throws Exception {
+
+    String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53 AND t.name.lname='UZwNk'";
+    try {
+      test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";");
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan",
+                      ".*JsonTableGroupScan.*indexName=i_age",},
+              new String[]{}
+      );
+
+      testBuilder()
+              .sqlQuery(query)
+              .ordered()
+              .baselineColumns("fname").baselineValues("KfFzK")
+              .go();
+    } finally {
+      test(defaultRowKeyJoinBackIOFactor);
+    }
+    return;
+  }
+
+  @Test
+  public void Intersect2indexesPlan() throws Exception {
+
+    String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53 AND t.personal.income=45";
+    try {
+      test(defaultHavingIndexPlan);
+      test(preferIntersectPlans + ";" + disableFTS);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashJoin(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)"},
+              new String[]{}
+      );
+
+      testBuilder()
+              .sqlQuery(query)
+              .unOrdered()
+              .baselineColumns("lname").baselineValues("UZwNk")
+              .baselineColumns("lname").baselineValues("foNwtze")
+              .baselineColumns("lname").baselineValues("qGZVfY")
+              .go();
+      testBuilder()
+              .optionSettingQueriesForTestQuery(sliceTargetSmall)
+              .optionSettingQueriesForBaseline(sliceTargetDefault)
+              .unOrdered()
+              .sqlQuery(query)
+              .sqlBaselineQuery(query)
+              .build()
+              .run();
+    } finally {
+      test(defaultIntersectPlans + ";" + enableFTS);
+    }
+    return;
+  }
+
+  @Test
+  public void CompositeIndexNonCoveringPlan() throws Exception {
+
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'pc' AND t.address.city='pfrrs'";
+    try {
+      test(defaultHavingIndexPlan + ";" + lowRowKeyJoinBackIOFactor + ";");
+
+      //either i_state_city or i_state_age_phone will be picked depends on cost model, both is fine for testing composite index nonCovering plan
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_"},
+              new String[]{}
+      );
+
+      testBuilder()
+              .sqlQuery(query)
+              .unOrdered()
+              .baselineColumns("ssn").baselineValues("100007423")
+              .baselineColumns("ssn").baselineValues("100008861")
+              .go();
+
+      testBuilder()
+              .optionSettingQueriesForTestQuery(sliceTargetSmall)
+              .optionSettingQueriesForBaseline(sliceTargetDefault)
+              .unOrdered()
+              .sqlQuery(query)
+              .sqlBaselineQuery(query)
+              .build()
+              .run();
+    } finally {
+      test(defaultRowKeyJoinBackIOFactor);
+    }
+    return;
+  }
+
+  @Test//filter cover indexed, included and not in index at all filter
+  public void CompositeIndexNonCoveringFilterWithAllFieldsPlan() throws Exception {
+
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'pc' AND t.address.city='pfrrs' AND t.driverlicense IN (100007423, 100007424)";
+    test(defaultHavingIndexPlan+";"+lowRowKeyJoinBackIOFactor+";");
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan.*condition=.*state.*city.*driverlicense.*or.*driverlicense.*(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+        new String[]{}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("ssn").baselineValues("100007423")
+        .go();
+
+    testBuilder()
+        .optionSettingQueriesForTestQuery(sliceTargetSmall)
+        .optionSettingQueriesForBaseline(sliceTargetDefault)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+
+    return;
+  }
+  @Test
+  public void CompositeIndexCoveringPlan() throws Exception {
+
+    String query = "SELECT t.`address`.`city` AS `city` FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'pc' AND t.address.city='pfrrs'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*indexName=i_state_city"},
+        new String[]{"RowKeyJoin", "Filter"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("city").baselineValues("pfrrs")
+        .baselineColumns("city").baselineValues("pfrrs")
+        .go();
+
+    testBuilder()
+        .optionSettingQueriesForTestQuery(sliceTargetSmall)
+        .optionSettingQueriesForBaseline(sliceTargetDefault)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+    return;
+  }
+
+  @Test
+  public void TestNonCoveringRangePartition_1() throws Exception {
+
+    String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53";
+    String[] expectedPlan = new String[] {"RowKeyJoin(.*[\n\r])+.*" +
+        "RestrictedJsonTableGroupScan.*tableName=.*index_test_primary(.*[\n\r])+.*" +
+        "RangePartitionExchange(.*[\n\r])+.*" +
+    "JsonTableGroupScan.*tableName=.*index_test_primary,.*indexName=(i_age|i_age_with_fname)"};
+    test(defaultHavingIndexPlan+";"+sliceTargetSmall+";");
+    PlanTestBase.testPlanMatchingPatterns(query,
+        expectedPlan, new String[]{});
+
+    try {
+      testBuilder()
+          .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+          .optionSettingQueriesForBaseline(noIndexPlan)
+          .unOrdered()
+          .sqlQuery(query)
+          .sqlBaselineQuery(query)
+          .build()
+          .run();
+    } finally {
+      test(defaultHavingIndexPlan);
+      test(sliceTargetDefault);
+    }
+    return;
+  }
+
+  @Test
+  public void TestCastVarCharCoveringPlan() throws Exception {
+    //length 255 is to exact match the casted indexed field's length
+    String query = "SELECT t._id as tid, cast(t.driverlicense as varchar(255)) as driverlicense FROM hbase.`index_test_primary` as t " +
+        " where cast(t.driverlicense as varchar(255))='100007423'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_cast_vchar_lic"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    System.out.println("TestCastCoveringPlan Plan Verified!");
+
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("tid", "driverlicense").baselineValues("1012", "100007423")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void TestCastINTCoveringPlan() throws Exception {
+    String query = "SELECT t._id as tid, CAST(t.id.ssn as INT) as ssn, t.contact.phone AS `phone` FROM hbase.`index_test_primary` as t " +
+        " where CAST(t.id.ssn as INT) = 100007423";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_cast_int_ssn"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    System.out.println("TestCastCoveringPlan Plan Verified!");
+
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("tid", "ssn", "phone").baselineValues("1012", 100007423, "6500005471")
+        .go();
+
+    return;
+  }
+
+  @Test
+  public void TestCastNonCoveringPlan() throws Exception {
+    String query = "SELECT t.id.ssn AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where CAST(t.id.ssn as INT) = 100007423";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_cast_int_ssn"},
+        new String[]{}
+    );
+
+    System.out.println("TestCastNonCoveringPlan Plan Verified!");
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("ssn").baselineValues("100007423")
+        .go();
+    return;
+  }
+
+  @Test
+  public void TestCastVarchar_ConvertToRangePlan() throws Exception {
+    String query = "SELECT t.id.ssn AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where CAST(driverlicense as VARCHAR(10)) = '100007423'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*MATCHES \"\\^.*100007423.*E.*\\$\".*indexName=i_cast_vchar_lic"},
+        new String[]{}
+    );
+
+    System.out.println("TestCastVarchar_ConvertToRangePlan Verified!");
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("ssn").baselineValues("100007423")
+        .go();
+
+    return;
+  }
+
+  @Test // cast expression in filter is not indexed, but the same field casted to different type was indexed (CAST id.ssn as INT)
+  public void TestCastNoIndexPlan() throws Exception {
+    String query = "select t.id.ssn from hbase.`index_test_primary` t where cast(t.id.ssn as varchar(10)) = '100007423'";
+
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[]{},
+        new String[]{"indexName"}
+    );
+
+  }
+
+  @Test
+  public void TestLongerCastVarCharNoIndex() throws Exception {
+    //length 256 is to exact match the casted indexed field's length
+    String query = "SELECT t._id as tid, cast(t.driverlicense as varchar(500)) as driverlicense FROM hbase.`index_test_primary` as t " +
+        " where cast(t.driverlicense as varchar(500))='100007423'";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {},
+        new String[]{"RowKeyJoin", "indexName="}
+    );
+
+    System.out.println("TestLongerCastVarCharNoIndex Plan Verified!");
+
+    return;
+  }
+
+  @Test
+  public void TestCoveringPlanSortRemoved() throws Exception {
+    String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn <'100000003' order by t.id.ssn";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+        new String[]{"Sort"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("phone").baselineValues("6500008069")
+        .baselineColumns("phone").baselineValues("6500001411")
+        .baselineColumns("phone").baselineValues("6500001595")
+        .go();
+  }
+
+  @Test
+  public void TestCoveringPlanSortNotRemoved() throws Exception {
+    String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn <'100000003' order by t.contact.phone";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+        new String[]{"RowkeyJoin"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("phone").baselineValues("6500001411")
+        .baselineColumns("phone").baselineValues("6500001595")
+        .baselineColumns("phone").baselineValues("6500008069")
+        .go();
+  }
+
+  @Test
+  public void TestCoveringPlanSortRemovedWithSimpleFields() throws Exception {
+    String query = "SELECT t.driverlicense as l FROM hbase.`index_test_primary` as t " +
+        " where t.driverlicense < 100000003 order by t.driverlicense";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_lic"},
+        new String[]{"Sort"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("l").baselineValues(100000000l)
+        .baselineColumns("l").baselineValues(100000001l)
+        .baselineColumns("l").baselineValues(100000002l)
+        .go();
+  }
+
+  @Test
+  public void TestNonCoveringPlanSortRemoved() throws Exception {
+    String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        " where t.driverlicense < 100000003 order by t.driverlicense";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_lic"},
+        new String[]{"Sort"}
+    );
+    String query2 = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn < '100000003' order by t.id.ssn";
+    PlanTestBase.testPlanMatchingPatterns(query2,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+        new String[]{"Sort"}
+    );
+
+    //simple field, driverlicense
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("phone").baselineValues("6500008069")
+        .baselineColumns("phone").baselineValues("6500001411")
+        .baselineColumns("phone").baselineValues("6500001595")
+        .go();
+
+    //query on field of item expression(having capProject), non-simple field t.id.ssn
+    testBuilder()
+        .sqlQuery(query2)
+        .ordered()
+        .baselineColumns("fname").baselineValues("VcFahj")
+        .baselineColumns("fname").baselineValues("WbKVK")
+        .baselineColumns("fname").baselineValues("vSAEsyFN")
+        .go();
+
+    test(sliceTargetSmall);
+    try {
+      PlanTestBase.testPlanMatchingPatterns(query2,
+          new String[]{"SingleMergeExchange(.*[\n\r])+.*"
+              + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_ssn"},
+          new String[]{"Sort"}
+      );
+    } finally {
+      test(sliceTargetDefault);
+    }
+  }
+
+  //test cases are from TestNonCoveringPlanSortRemoved. Sort was removed when force_sort_noncovering was default(false)
+  @Test
+  public void TestNonCoveringPlanWithNoRemoveSortOption() throws Exception {
+    try {
+      test("alter session set `planner.index.force_sort_noncovering`=true");
+      test(defaultHavingIndexPlan);
+
+      String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+          " where t.driverlicense < 100000003 order by t.driverlicense";
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[]{"Sort", "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_lic"},
+          new String[]{}
+      );
+
+      String query2 = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " +
+          " where t.id.ssn < '100000003' order by t.id.ssn";
+      PlanTestBase.testPlanMatchingPatterns(query2,
+          new String[]{"Sort", "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+          new String[]{}
+      );
+
+      //simple field, driverlicense
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("phone").baselineValues("6500008069")
+          .baselineColumns("phone").baselineValues("6500001411")
+          .baselineColumns("phone").baselineValues("6500001595")
+          .go();
+
+      //query on field of item expression(having capProject), non-simple field t.id.ssn
+      testBuilder()
+          .sqlQuery(query2)
+          .ordered()
+          .baselineColumns("fname").baselineValues("VcFahj")
+          .baselineColumns("fname").baselineValues("WbKVK")
+          .baselineColumns("fname").baselineValues("vSAEsyFN")
+          .go();
+
+      test(sliceTargetSmall);
+      try {
+        PlanTestBase.testPlanMatchingPatterns(query2,
+            new String[]{"Sort", "SingleMergeExchange(.*[\n\r])+.*"
+                + "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_ssn"},
+            new String[]{}
+        );
+      } finally {
+        test(sliceTargetDefault);
+      }
+    }
+    finally {
+      test("alter session reset `planner.index.force_sort_noncovering`");
+    }
+  }
+
+  @Test  // 2 table join, each table has local predicate on top-level column
+  public void TestCoveringPlanJoin_1() throws Exception {
+    String query = "SELECT count(*) as cnt FROM hbase.`index_test_primary` as t1 " +
+        " inner join hbase.`index_test_primary` as t2 on t1.driverlicense = t2.driverlicense " +
+        " where t1.driverlicense < 100000003 and t2.driverlicense < 100000003" ;
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=",
+                      ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+        new String[]{}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("cnt").baselineValues(3L)
+        .go();
+  }
+
+  @Test  // 2 table join, each table has local predicate on nested column
+  public void TestCoveringPlanJoin_2() throws Exception {
+    String query = "SELECT count(*) as cnt FROM hbase.`index_test_primary` as t1 " +
+        " inner join hbase.`index_test_primary` as t2 on t1.contact.phone = t2.contact.phone " +
+        " where t1.id.ssn < '100000003' and t2.id.ssn < '100000003' ";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=",
+                      ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+        new String[]{}
+    );
+
+    testBuilder()
+       .sqlQuery(query)
+       .ordered()
+       .baselineColumns("cnt").baselineValues(3L)
+       .go();
+  }
+
+  @Test  // leading prefix of index has Equality conditions and ORDER BY last column; Sort SHOULD be dropped
+  public void TestCoveringPlanSortPrefix_1() throws Exception {
+    String query = "SELECT t.contact.phone FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age = 35 and t.contact.phone < '6500003000' order by t.contact.phone";
+    test(defaultHavingIndexPlan);
+
+    //we should glue to index i_state_age_phone to make sure we are testing the targeted prefix construction code path
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{"Sort"}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+      .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+      .optionSettingQueriesForBaseline(noIndexPlan)
+      .unOrdered()
+      .sqlQuery(query)
+      .sqlBaselineQuery(query)
+      .build()
+      .run();
+  }
+
+  @Test  // leading prefix of index has Non-Equality conditions and ORDER BY last column; Sort SHOULD NOT be dropped
+  public void TestCoveringPlanSortPrefix_2() throws Exception {
+    String query = "SELECT t.contact.phone FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.contact.phone";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+      .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+      .optionSettingQueriesForBaseline(noIndexPlan)
+      .unOrdered()
+      .sqlQuery(query)
+      .sqlBaselineQuery(query)
+      .build()
+      .run();
+  }
+
+  @Test  //ORDER BY last two columns not in the indexed order; Sort SHOULD NOT be dropped
+  public void TestCoveringPlanSortPrefix_3() throws Exception {
+    String query = "SELECT CAST(t.personal.age as VARCHAR) as age, t.contact.phone FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.contact.phone, t.personal.age";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .optionSettingQueriesForBaseline(noIndexPlan)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+  }
+
+  @Test  // last two index fields in non-Equality conditions, ORDER BY last two fields; Sort SHOULD be dropped
+  public void TestCoveringPlanSortPrefix_4() throws Exception {
+    String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{"Sort"}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .optionSettingQueriesForBaseline(noIndexPlan)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+  }
+
+  @Test  // index field in two or more equality conditions, it is not leading prefix, Sort SHOULD NOT be dropped
+  public void TestCoveringPlanSortPrefix_5() throws Exception {
+    String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age IN (31, 32, 33, 34) and t.contact.phone < '6500003000' order by t.contact.phone";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .optionSettingQueriesForBaseline(noIndexPlan)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+  }
+
+  @Test  // last two index fields in non-Equality conditions, ORDER BY last two fields NULLS FIRST; Sort SHOULD NOT be dropped
+  public void TestCoveringPlanSortPrefix_6() throws Exception {
+    String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone NULLS FIRST";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort", ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .optionSettingQueriesForBaseline(noIndexPlan)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+  }
+
+  @Test  // last two index fields in non-Equality conditions, ORDER BY last two fields NULLS LAST; Sort SHOULD be dropped
+  public void TestCoveringPlanSortPrefix_7() throws Exception {
+    String query = "SELECT t._id as tid, t.contact.phone, CAST(t.personal.age as VARCHAR) as age FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'wo' and t.personal.age < 35 and t.contact.phone < '6500003000' order by t.personal.age, t.contact.phone NULLS LAST";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_state_age_phone"},
+        new String[]{"Sort"}
+    );
+
+    // compare the results of index plan with the no-index plan
+    testBuilder()
+        .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+        .optionSettingQueriesForBaseline(noIndexPlan)
+        .unOrdered()
+        .sqlQuery(query)
+        .sqlBaselineQuery(query)
+        .build()
+        .run();
+  }
+
+  @Test
+  public void orderByCastCoveringPlan() throws Exception {
+    String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        " where CAST(t.id.ssn as INT) < 100000003 order by CAST(t.id.ssn as INT)";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+        new String[]{"Sort"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("phone").baselineValues("6500008069")
+        .baselineColumns("phone").baselineValues("6500001411")
+        .baselineColumns("phone").baselineValues("6500001595")
+        .go();
+  }
+
+  @Test // non-covering plan. sort by the only indexed field, sort SHOULD be removed
+  public void orderByNonCoveringPlan() throws Exception {
+    String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn < '100000003' order by t.id.ssn";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+        new String[]{"Sort"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("lname").baselineValues("iuMG")
+        .baselineColumns("lname").baselineValues("KpFq")
+        .baselineColumns("lname").baselineValues("bkkAvz")
+        .go();
+  }
+
+  @Test //non-covering plan. order by cast indexed field, sort SHOULD be removed
+  public void orderByCastNonCoveringPlan() throws Exception {
+    String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " +
+        " where CAST(t.id.ssn as INT) < 100000003 order by CAST(t.id.ssn as INT)";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName="},
+        new String[]{"Sort"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("lname").baselineValues("iuMG")
+        .baselineColumns("lname").baselineValues("KpFq")
+        .baselineColumns("lname").baselineValues("bkkAvz")
+        .go();
+  }
+
+
+  @Ignore //in statsCache, condition state+city has rowcount 1250, but state only has 1000. so it is picking i_state_age_phone
+  @Test //non-covering, order by non leading field, and leading fields are not in equality condition, Sort SHOULD NOT be removed
+  public void NonCoveringPlan_SortPrefix_1() throws Exception {
+
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.address.state > 'pc' AND t.address.city>'pfrrr' AND t.address.city<'pfrrt' order by t.adddress.city";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort",
+            "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_city"},
+        new String[]{}
+    );
+    return;
+  }
+
+  @Test //non-covering, order by non leading field, and leading fields are in equality condition, Sort SHOULD be removed
+  public void NonCoveringPlan_SortPrefix_2() throws Exception {
+
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.address.state = 'pc' AND t.address.city>'pfrrr' AND t.address.city<'pfrrt' order by t.address.city";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {
+            "RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*JsonTableGroupScan.*indexName=i_state_city"},
+        new String[]{"Sort"}
+    );
+    return;
+  }
+
+  @Ignore ("Should be modified to get an index plan; not very useful since most covering plan filters get pushed")
+  @Test //Correct projection and results when filter on non-indexed column in covering plan.
+  public void nonIndexedColumnFilterCoveringPlan() throws Exception {
+    String query = "SELECT t.name.fname as fname FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age > 68 and t.name.fname IN ('CnGobfR', 'THOHP')";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {".*Filter.*CnGobfR.*THOHP.*",
+            ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName="},
+        new String[] {".*Filter.*ITEM*CnGobfR.*THOHP.*"});
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("fname").baselineValues("CnGobfR")
+        .baselineColumns("fname").baselineValues("THOHP")
+        .baselineColumns("fname").baselineValues("CnGobfR")
+        .go();
+  }
+
+  @Test
+  @Ignore ("Fix after MEP 5.0")
+  public void orderByLimitNonCoveringPlan() throws Exception {
+    String query = "SELECT t.name.lname as lname FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn < '100000003' order by t.id.ssn limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(sliceTargetSmall);
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[]{"Limit(.*[\n\r])+.*SingleMergeExchange(.*[\n\r])+.*Limit(.*[\n\r])+.*indexName="},
+          new String[]{"Sort"}
+      );
+
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("lname").baselineValues("iuMG")
+          .baselineColumns("lname").baselineValues("KpFq")
+          .go();
+    } finally {
+      test(sliceTargetDefault);
+    }
+  }
+
+  @Test
+  public void orderByLimitCoveringPlan() throws Exception {
+    String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn < '100000003' order by t.id.ssn limit 2";
+    test(defaultHavingIndexPlan);
+
+    //when index table has only one tablet, the SingleMergeExchange in the middle of two Limits will be removed.
+    //The lower limit gets pushed into the scan
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Limit(.*[\n\r])+.*indexName=.*limit=2"},
+        new String[]{"Sort"}
+    );
+
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("phone").baselineValues("6500008069")
+        .baselineColumns("phone").baselineValues("6500001411")
+        .go();
+  }
+
+  @Test
+  public void pickAnyIndexWithFTSDisabledPlan() throws Exception {
+    String lowCoveringSel = "alter session set `planner.index.covering_selectivity_threshold` = 0.025";
+    String defaultCoveringSel = "alter session reset `planner.index.covering_selectivity_threshold`";
+    String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn = '100007423'";
+    try {
+      test(defaultHavingIndexPlan + ";" + lowCoveringSel + ";");
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[]{".*JsonTableGroupScan.*tableName=.*index_test_primary"},
+          new String[]{".*indexName=i_ssn"}
+      );
+      // Must not throw CANNOTPLANEXCEPTION
+      test(defaultHavingIndexPlan + ";" + lowCoveringSel + ";" + disableFTS + ";");
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[]{".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+          new String[]{"RowKeyJoin"}
+      );
+    } finally {
+      test(defaultCoveringSel+";"+enableFTS+";");
+    }
+  }
+
+  @Test
+  public void testCaseSensitive() throws Exception {
+    String query = "SELECT t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        " where t.id.SSN = '100000003' ";
+    test(defaultHavingIndexPlan);
+
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {""},
+        new String[]{"indexName"}
+    );
+
+  }
+
+  @Test
+  public void testCaseSensitiveIncludedField() throws Exception {
+
+    String query = "SELECT t.`CONTACT`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn = '100007423'";
+    test(defaultHavingIndexPlan);
+
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[]{"RowKeyJoin",
+            ".*JsonTableGroupScan.*tableName=.*index_test_primary.*indexName=i_ssn"},
+        new String[]{}
+    );
+  }
+
+
+  @Test
+  public void testHashIndexNoRemovingSort() throws Exception {
+    String query = "SELECT t.`contact`.`phone` as phone FROM hbase.`index_test_primary` as t " +
+        " where t.reverseid <'10' order by t.reverseid";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"Sort", "indexName=hash_i_reverseid", "RowKeyJoin"},
+        new String[]{}
+    );
+  }
+
+  @Test
+  public void testCastTimestampPlan() throws Exception {
+    String query = "SELECT  t.id.ssn as ssn FROM hbase.`index_test_primary` as t " +
+        " where cast(t.activity.irs.firstlogin as timestamp)=to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"indexName=hash_i_cast_timestamp_firstlogin"},
+        new String[]{"RowKeyJoin"}
+    );
+    testBuilder()
+        .sqlQuery(query)
+        .ordered()
+        .baselineColumns("ssn").baselineValues("100007423")
+        .go();
+
+  }
+
+  @Test
+  public void testNotConditionNoIndexPlan() throws Exception {
+    String query = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where NOT t.id.ssn = '100007423'";
+
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {},
+        new String[]{"indexName="}
+    );
+
+
+    String notInQuery = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn NOT IN ('100007423', '100007424')";
+    PlanTestBase.testPlanMatchingPatterns(notInQuery,
+        new String[] {},
+        new String[]{"indexName="}
+    );
+
+    String notLikeQuery = "SELECT t.`id`.`ssn` AS `ssn` FROM hbase.`index_test_primary` as t " +
+        " where t.id.ssn NOT LIKE '100007423'";
+    PlanTestBase.testPlanMatchingPatterns(notLikeQuery,
+        new String[] {},
+        new String[]{"indexName="}
+    );
+
+  }
+
+  @Test
+  public void testNoFilterOrderByCoveringPlan() throws Exception {
+    String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        "order by t.id.ssn limit 2";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"indexName=i_ssn"},
+        new String[]{"Sort", "TopN", "RowKeyJoin"}
+    );
+    testBuilder()
+        .ordered()
+        .sqlQuery(query)
+        .baselineColumns("ssn", "phone").baselineValues("100000000", "6500008069")
+        .baselineColumns("ssn", "phone").baselineValues("100000001", "6500001411")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testNoFilterAndLimitOrderByCoveringPlan() throws Exception {
+    String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+            "order by t.id.ssn";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+            new String[] {"Sort"},
+            new String[]{"indexName=*", "RowKeyJoin", "TopN"}
+    );
+  }
+
+  @Test
+  public void testNoFilterOrderByCast() throws Exception {
+    String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        "order by CAST(t.id.ssn as INT) limit 2";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"indexName=i_cast_int_ssn"},
+        new String[]{"TopN", "Sort", "RowKeyJoin"}
+    );
+    testBuilder()
+        .ordered()
+        .sqlQuery(query)
+        .baselineColumns("ssn", "phone").baselineValues(100000000, "6500008069")
+        .baselineColumns("ssn", "phone").baselineValues(100000001, "6500001411")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testNoFilterAndLimitOrderByCast() throws Exception {
+    String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+            "order by CAST(t.id.ssn as INT)";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+            new String[] { "Sort"},
+            new String[]{"indexName=*","TopN", "RowKeyJoin"}
+    );
+  }
+
+  @Test
+  public void testNoFilterOrderByHashIndex() throws Exception {
+    String query = "SELECT cast(t.activity.irs.firstlogin as timestamp) AS `firstlogin`, t.id.ssn as ssn FROM hbase.`index_test_primary` as t " +
+        "order by cast(t.activity.irs.firstlogin as timestamp), t.id.ssn limit 2";
+    test(defaultHavingIndexPlan);
+    //no collation for hash index so Sort or TopN must have been preserved
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"(Sort|TopN)"},
+        new String[]{"indexName="}
+    );
+    DateTime date = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+        .parseDateTime("2010-01-21 00:12:24");
+
+    testBuilder()
+        .ordered()
+        .sqlQuery(query)
+        .baselineColumns("firstlogin", "ssn").baselineValues(date, "100005592")
+        .baselineColumns("firstlogin", "ssn").baselineValues(date, "100005844")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testNoFilterOrderBySimpleField() throws Exception {
+    String query = "SELECT t.reverseid as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+        "order by t.driverlicense limit 2";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"indexName=i_lic"},
+        new String[]{"Sort", "TopN"}
+    );
+    testBuilder()
+        .ordered()
+        .sqlQuery(query)
+        .baselineColumns("rid", "lic").baselineValues("4539", 100000000L)
+        .baselineColumns("rid", "lic").baselineValues("943", 100000001L)
+        .build()
+        .run();
+  }
+
+  @Test //negative case for no filter plan
+  public void testNoFilterOrderByNoIndexMatch() throws Exception {
+    String query = "SELECT t.`id`.`ssn` AS `ssn`, t.contact.phone as phone FROM hbase.`index_test_primary` as t " +
+        "order by t.name.fname limit 2";
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"(Sort|TopN)"},
+        new String[]{"indexName="}
+    );
+  }
+
+// Enable this testcase once MD-2848 is fixed.
+//  @Test
+//  public void IntersectPlanWithOneSideNoRows() throws Exception {
+//    try {
+//      String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+//              " where t.personal.age = 53 AND t.personal.income=111145";
+//      test(defaultHavingIndexPlan);
+//      test(preferIntersectPlans + ";" + disableFTS);
+//      PlanTestBase.testPlanMatchingPatterns(query,
+//              new String[]{"RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashJoin(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)(.*[\n\r])+.*JsonTableGroupScan.*indexName=(i_age|i_income)"},
+//              new String[]{}
+//      );
+//
+//      testNoResult(query);
+//
+//    } finally {
+//      test(defaultIntersectPlans + ";" + enableFTS);
+//    }
+//  }
+
+  //"i_cast_age_state_phone", "$CAST(personal.age@STRING),address.state,contact.phone", "name.fname",
+  @Test
+  public void testTrailingFieldIndexCovering() throws Exception {
+    String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+        " where cast(t.personal.age as INT)=53 AND t.contact.phone='6500005471' ";
+
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"indexName=i_cast_age_income_phone"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    testBuilder()
+        .ordered()
+        .sqlQuery(query)
+        .baselineColumns("fname").baselineValues("KfFzK")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testIncludedFieldCovering() throws Exception {
+    String query = "SELECT t.`contact`.`phone` AS `phone` FROM hbase.`index_test_primary` as t " +
+        " where cast(t.personal.age as INT)=53 AND t.name.fname='KfFzK' ";
+
+    test(defaultHavingIndexPlan);
+    PlanTestBase.testPlanMatchingPatterns(query,
+        new String[] {"indexName=i_cast_age_income_phone"},
+        new String[]{"RowKeyJoin"}
+    );
+
+    testBuilder()
+        .ordered()
+        .sqlQuery(query)
+        .baselineColumns("phone").baselineValues("6500005471")
+        .build()
+        .run();
+  }
+
+  @Test
+  public void testWithFilterGroupBy() throws Exception {
+    String query = " select t1.driverlicense from hbase.`index_test_primary` t1" +
+            " where t1.driverlicense > 100000001 group by t1.driverlicense limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      //no collation for hash index so Sort or TopN must have been preserved
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_lic", "StreamAgg"},
+              new String[]{"(Sort|TopN)"}
+      );
+
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .baselineColumns("driverlicense").baselineValues(100000002L)
+              .baselineColumns("driverlicense").baselineValues(100000003L)
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test
+  public void testNoFilterOrderByDesc() throws Exception {
+    String query = " select t1.driverlicense from hbase.`index_test_primary` t1" +
+            " order by t1.driverlicense desc limit 2";
+    test(defaultHavingIndexPlan);
+    //no collation for hash index so Sort or TopN must have been preserved
+    PlanTestBase.testPlanMatchingPatterns(query,
+            new String[] {"(Sort|TopN)"},
+            new String[]{"indexName="}
+    );
+
+    testBuilder()
+            .unOrdered()
+            .sqlQuery(query)
+            .baselineColumns("driverlicense").baselineValues(100009999L)
+            .baselineColumns("driverlicense").baselineValues(100009998L)
+            .build()
+            .run();
+  }
+
+  @Test
+  public void testNoFilterGroupBy() throws Exception {
+    String query = " select t1.driverlicense from hbase.`index_test_primary` t1" +
+            " group by t1.driverlicense limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      //no collation for hash index so Sort or TopN must have been preserved
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_lic", "StreamAgg"},
+              new String[]{"(Sort|TopN)"}
+      );
+
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .baselineColumns("driverlicense").baselineValues(100000000L)
+              .baselineColumns("driverlicense").baselineValues(100000001L)
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test
+  public void testNoFilterGroupByCoveringPlan() throws Exception {
+    String query = "SELECT t.`id`.`ssn` AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " +
+            "group by t.id.ssn limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_ssn", "StreamAgg"},
+              new String[]{"Sort", "TopN", "RowKeyJoin"}
+      );
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .baselineColumns("ssn", "phone").baselineValues("100000000", "6500008069")
+              .baselineColumns("ssn", "phone").baselineValues("100000001", "6500001411")
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test
+  public void testNoFilterGroupByCast() throws Exception {
+    String query = "SELECT CAST(t.id.ssn as INT) AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " +
+            "group by CAST(t.id.ssn as INT) limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_cast_int_ssn", "StreamAgg"},
+              new String[]{"TopN", "Sort", "RowKeyJoin"}
+      );
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .baselineColumns("ssn", "phone").baselineValues(100000000, "6500008069")
+              .baselineColumns("ssn", "phone").baselineValues(100000001, "6500001411")
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test
+  public void testNoFilterGroupByHashIndex() throws Exception {
+    String query = "SELECT cast(t.activity.irs.firstlogin as timestamp) AS `firstlogin`, max(t.id.ssn) as ssn FROM hbase.`index_test_primary` as t " +
+            "group by cast(t.activity.irs.firstlogin as timestamp) limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      //no collation for hash index so Sort or TopN must have been preserved
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"(Sort|TopN)", "StreamAgg"},
+              new String[]{"indexName="}
+      );
+      DateTime date1 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+              .parseDateTime("2010-01-21 00:12:24");
+
+      DateTime date2 = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+              .parseDateTime("2010-01-21 00:24:48");
+      testBuilder()
+              .unOrdered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .baselineColumns("firstlogin", "ssn").baselineValues(date1, "100006852")
+              .baselineColumns("firstlogin", "ssn").baselineValues(date2, "100003660")
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test
+  public void testNoFilterGroupBySimpleField() throws Exception {
+    String query = "SELECT max(t.reverseid) as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+            "group by t.driverlicense limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_lic", "StreamAgg"},
+              new String[]{"Sort", "TopN"}
+      );
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .baselineColumns("rid", "lic").baselineValues("4539", 100000000L)
+              .baselineColumns("rid", "lic").baselineValues("943", 100000001L)
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test //negative case for no filter plan
+  public void testNoFilterGroupByNoIndexMatch() throws Exception {
+    String query = "SELECT max(t.`id`.`ssn`) AS `ssn`, max(t.contact.phone) as phone FROM hbase.`index_test_primary` as t " +
+            "group by t.name.fname limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"(Sort|TopN)", "StreamAgg"},
+              new String[]{"indexName="}
+      );
+    } finally {
+      test(enableHashAgg);
+    }
+  }
+
+  @Test
+  public void testNoFilterGroupBySimpleFieldParallel() throws Exception {
+    String query = "SELECT max(t.reverseid) as rid, t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+        "group by t.driverlicense order by t.driverlicense limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(disableHashAgg);
+      test(sliceTargetSmall);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_lic", "StreamAgg", "HashToMergeExchange"},
+              new String[]{"Sort", "TopN"}
+      );
+      testBuilder()
+              .unOrdered()
+              .sqlQuery(query)
+              .optionSettingQueriesForTestQuery(defaultHavingIndexPlan)
+              .optionSettingQueriesForTestQuery(disableHashAgg)
+              .optionSettingQueriesForTestQuery(sliceTargetSmall)
+              .baselineColumns("rid", "lic").baselineValues("4539", 100000000L)
+              .baselineColumns("rid", "lic").baselineValues("943", 100000001L)
+              .build()
+              .run();
+    } finally {
+      test(enableHashAgg);
+      test(sliceTargetDefault);
+    }
+  }
+
+  @Test
+  public void testLimitPushdownCoveringPlan() throws Exception {
+    String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53 limit 3";
+    try {
+      test(defaultHavingIndexPlan + ";" + disableFTS + ";");
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+              new String[]{".*JsonTableGroupScan.*indexName=i_age_with_fname.*rowcount = 3.0"},
+              new String[]{}
+      );
+    } finally {
+      test(enableFTS);
+    }
+  }
+
+  @Test
+  public void testLimitPushdownOrderByCoveringPlan() throws Exception {
+    String query = "SELECT t.`name`.`fname` AS `fname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53 order by t.personal.age limit 3";
+    try {
+      test(defaultHavingIndexPlan + ";" + disableFTS + ";");
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+              new String[]{".*JsonTableGroupScan.*indexName=i_age_with_fname.*rowcount = 3.0"},
+              new String[]{}
+      );
+    } finally {
+      test(enableFTS);
+    }
+  }
+
+  @Test
+  public void testLimitPushdownNonCoveringPlan() throws Exception {
+    String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53 limit 7";
+    try {
+      test(defaultHavingIndexPlan+";"+disableFTS+";");
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+              new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*tableName=.*index_test_primary.*rowcount = 7.0"},
+              new String[]{}
+      );
+    } finally {
+      test(enableFTS);
+    }
+  }
+
+  @Test
+  public void testLimitPushdownOrderByNonCoveringPlan() throws Exception {
+    // Limit pushdown should NOT happen past rowkey join when ordering is required
+    String query = "SELECT t.`name`.`lname` AS `lname` FROM hbase.`index_test_primary` as t " +
+        " where t.personal.age = 53 order by t.personal.age limit 7";
+    try {
+      test(defaultHavingIndexPlan + ";" + disableFTS + ";" + sliceTargetSmall + ";");
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+              new String[]{"RowKeyJoin", ".*RestrictedJsonTableGroupScan.*"},
+              new String[]{".*tableName=.*index_test_primary.*rowcount = 7.*"}
+      );
+    } finally {
+      test(enableFTS);
+    }
+  }
+
+  @Test
+  public void testLimit0Pushdown() throws Exception {
+    // Limit pushdown should NOT happen past project with CONVERT_FROMJSON
+    String query = "select convert_from(convert_to(t.`name`.`lname`, 'JSON'), 'JSON') " +
+        "from hbase.`index_test_primary` as t limit 0";
+    try {
+      test(defaultHavingIndexPlan + ";");
+      PlanTestBase.testPlanWithAttributesMatchingPatterns(query,
+          new String[]{"Limit(.*[\n\r])+.*Project.*CONVERT_FROMJSON(.*[\n\r])+.*Scan"},
+          new String[]{}
+      );
+    } finally {
+    }
+  }
+
+  @Test
+  public void testRemovalOfReduntantHashToMergeExchange() throws Exception {
+    String query = "SELECT t.driverlicense as lic FROM hbase.`index_test_primary` as t " +
+            "order by t.driverlicense limit 2";
+    try {
+      test(defaultHavingIndexPlan);
+      test(sliceTargetSmall);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_lic"},
+              new String[]{"HashToMergeExchange", "Sort", "TopN"});
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .baselineColumns("lic").baselineValues(100000000L)
+              .baselineColumns("lic").baselineValues(100000001L)
+              .build()
+              .run();
+    } finally {
+      test(sliceTargetDefault);
+    }
+  }
+
+  @Test
+  public void testMultiPhaseAgg() throws Exception {
+    String query = "select count(t.reverseid) from hbase.`index_test_primary` as t " +
+            "group by t.driverlicense order by t.driverlicense";
+    try {
+      test(defaultHavingIndexPlan);
+      test(sliceTargetSmall);
+      PlanTestBase.testPlanMatchingPatterns(query,
+              new String[]{"indexName=i_lic", "HashToMergeExchange", "StreamAgg", "StreamAgg"},
+              new String[]{"Sort", "TopN"});
+    } finally {
+      test(sliceTargetDefault);
+    }
+  }
+
+  @Test
+  public void testHangForSimpleDistinct() throws Exception {
+    String query = "select distinct t.driverlicense from hbase.`index_test_primary` as t order by t.driverlicense limit 1";
+
+    try {
+      test(sliceTargetSmall);
+      testBuilder()
+              .ordered()
+              .sqlQuery(query)
+              .baselineColumns("driverlicense").baselineValues(100000000L)
+              .build()
+              .run();
+    } finally {
+      test(sliceTargetDefault);
+    }
+  }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java
new file mode 100644
index 0000000..bc857d1
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGen.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import static com.mapr.drill.maprdb.tests.MaprDBTestsSuite.INDEX_FLUSH_TIMEOUT;
+
+import java.io.InputStream;
+import java.io.StringBufferInputStream;
+
+import org.apache.hadoop.fs.Path;
+import org.ojai.DocumentStream;
+import org.ojai.json.Json;
+
+import com.mapr.db.Admin;
+import com.mapr.db.Table;
+import com.mapr.db.TableDescriptor;
+import com.mapr.db.impl.MapRDBImpl;
+import com.mapr.db.impl.TableDescriptorImpl;
+import com.mapr.db.tests.utils.DBTests;
+import com.mapr.fs.utils.ssh.TestCluster;
+
+/**
+ * This class is to generate a MapR json table of this schema:
+ * {
+ *   "address" : {
+ *      "city":"wtj",
+ *      "state":"ho"
+ *   }
+ *   "contact" : {
+ *      "email":"VcFahjRfM@gmail.com",
+ *      "phone":"6500005583"
+ *   }
+ *   "id" : {
+ *      "ssn":"100005461"
+ *   }
+ *   "name" : {
+ *      "fname":"VcFahj",
+ *      "lname":"RfM"
+ *   }
+ * }
+ *
+ */
+public class LargeTableGen extends LargeTableGenBase {
+
+  static final int SPLIT_SIZE = 5000;
+  private Admin admin;
+
+  public LargeTableGen(Admin dbadmin) {
+    admin = dbadmin;
+  }
+
+  Table createOrGetTable(String tableName, int recordNum) {
+    if (admin.tableExists(tableName)) {
+      return MapRDBImpl.getTable(tableName);
+      //admin.deleteTable(tableName);
+    }
+    else {
+      TableDescriptor desc = new TableDescriptorImpl(new Path(tableName));
+
+      int splits = (recordNum / SPLIT_SIZE) - (((recordNum % SPLIT_SIZE) > 1)? 0 : 1);
+
+      String[] splitsStr = new String[splits];
+      StringBuilder strBuilder = new StringBuilder("Splits:");
+      for(int i=0; i<splits; ++i) {
+        splitsStr[i] = String.format("%d", (i+1)*SPLIT_SIZE);
+        strBuilder.append(splitsStr[i] + ", ");
+      }
+      System.out.print(strBuilder.toString());
+
+      return admin.createTable(desc, splitsStr);
+    }
+  }
+
+  private void createIndex(Table table, String[] indexDef) throws Exception {
+    if(indexDef == null) {
+      //don't create index here. indexes may have been created
+      return;
+    }
+    for(int i=0; i<indexDef.length / 3; ++i) {
+      String indexCmd = String.format("maprcli table index add"
+          + " -path " + table.getPath()
+          + " -index %s"
+          + " -indexedfields '%s'"
+          + ((indexDef[3 * i + 2].length()==0)?"":" -includedfields '%s'")
+          + ((indexDef[3 * i].startsWith("hash"))? " -hashed true" : ""),
+          indexDefInCommand(indexDef[3 * i]), //index name
+          indexDefInCommand(indexDef[3 * i + 1]), //indexedfields
+          indexDefInCommand(indexDef[3 * i + 2])); //includedfields
+      System.out.println(indexCmd);
+
+      TestCluster.runCommand(indexCmd);
+      DBTests.admin().getTableIndexes(table.getPath(), true);
+    }
+  }
+
+  private String indexDefInCommand(String def) {
+    String[] splitted = def.split(",");
+    StringBuffer ret = new StringBuffer();
+    for(String field: splitted) {
+      if(ret.length() == 0) {
+        ret.append(field);
+      }
+      else {
+        ret.append(",").append(field);
+      }
+    }
+    return ret.toString();
+  }
+  public void generateTableWithIndex(String tablePath, int recordNumber, String[] indexDef) throws Exception {
+    // create index
+
+    initRandVector(recordNumber);
+    initDictionary();
+    DBTests.setTableStatsSendInterval(1);
+
+    if (admin.tableExists(tablePath)) {
+      //admin.deleteTable(tablePath);
+    }
+
+    //create Json String
+    int batch, i;
+    int BATCH_SIZE=2000;
+    try (Table table = createOrGetTable(tablePath, recordNumber)) {
+      //create index
+      createIndex(table, indexDef);
+      for (batch = 0; batch < recordNumber; batch += BATCH_SIZE) {
+        int batchStop = Math.min(recordNumber, batch + BATCH_SIZE);
+        StringBuffer strBuf = new StringBuffer();
+        for (i = batch; i < batchStop; ++i) {
+
+          strBuf.append(String.format("{\"rowid\": \"%d\", \"reverseid\": \"%d\", \"id\": {\"ssn\": \"%s\"}, \"contact\": {\"phone\": \"%s\", \"email\": \"%s\"}," +
+                  "\"address\": {\"city\": \"%s\", \"state\": \"%s\"}, \"name\": { \"fname\": \"%s\", \"lname\": \"%s\" }," +
+                  "\"personal\": {\"age\" : %s, \"income\": %s, \"birthdate\": {\"$dateDay\": \"%s\"} }," +
+                  "\"activity\": {\"irs\" : { \"firstlogin\":  \"%s\" } }," +
+                  "\"driverlicense\":{\"$numberLong\": %s} } \n",
+              i + 1, recordNumber - i , getSSN(i), getPhone(i), getEmail(i),
+              getAddress(i)[2], getAddress(i)[1], getFirstName(i), getLastName(i),
+              getAge(i), getIncome(i), getBirthdate(i),
+              getFirstLogin(i),
+              getSSN(i)));
+        }
+        try (InputStream in = new StringBufferInputStream(strBuf.toString());
+             DocumentStream stream = Json.newDocumentStream(in)) {
+          //write by individual document
+          //for (Document document : stream) {
+          //  table.insert(document, "rowid");
+          //}
+          try {
+            table.insert(stream, "rowid"); //insert a batch  of document in stream
+          }catch(Exception e) {
+            System.out.println(stream.toString());
+            throw e;
+          }
+        }
+      }
+      table.flush();
+      DBTests.waitForIndexFlush(table.getPath(), INDEX_FLUSH_TIMEOUT);
+      Thread.sleep(200000);
+    }
+  }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java
new file mode 100644
index 0000000..917f42a
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/LargeTableGenBase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import org.apache.commons.lang3.RandomStringUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+public class LargeTableGenBase {
+
+  private boolean dict_ready = false;
+
+  protected List<String> firstnames;
+  protected List<String> lastnames;
+  protected List<String[]> cities;
+  protected int[] randomized;
+
+  protected synchronized void  initDictionary() {
+    initDictionaryWithRand();
+  }
+
+  protected void initDictionaryWithRand() {
+    {
+      firstnames = new ArrayList<>();
+      lastnames = new ArrayList<>();
+      cities = new ArrayList<>();
+      List<String> states = new ArrayList<>();
+
+      int fnNum = 2000; //2k
+      int lnNum = 200000;//200k
+      int cityNum = 10000;//10k
+      int stateNum = 50;
+      Random rand = new Random(2017);
+      int i;
+      try {
+        Set<String> strSet = new LinkedHashSet<>();
+        while(strSet.size() < stateNum) {
+          strSet.add(RandomStringUtils.random(2, 0, 0, true, false, null, rand));
+        }
+        states.addAll(strSet);
+
+        strSet = new LinkedHashSet<>();
+        while(strSet.size() < cityNum) {
+          int len = 3 + strSet.size() % 6;
+          strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand));
+        }
+
+        Iterator<String> it = strSet.iterator();
+        for(i=0; i<cityNum; ++i) {
+          cities.add(new String[]{"10000", states.get(i%stateNum),  it.next()});
+        }
+
+        strSet = new LinkedHashSet<>();
+        while(strSet.size() < fnNum) {
+          int len = 3 + strSet.size() % 6;
+          strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand));
+        }
+        firstnames.addAll(strSet);
+
+        strSet = new LinkedHashSet<>();
+        while(strSet.size() < lnNum) {
+          int len = 3 + strSet.size() % 6;
+          strSet.add(RandomStringUtils.random(len, 0, 0, true, false, null, rand));
+        }
+        lastnames.addAll(strSet);
+      }
+      catch(Exception e) {
+        System.out.println("init data got exception");
+        e.printStackTrace();
+      }
+      dict_ready = true;
+    }
+  }
+
+  protected  String getFirstName(int i) {
+    return firstnames.get((randomized[ i%randomized.length ] + i )% firstnames.size());
+  }
+
+  protected String getLastName(int i) {
+    return lastnames.get((randomized[ (2*i + randomized[i%randomized.length])% randomized.length]) % lastnames.size());
+  }
+
+  protected String[] getAddress(int i) {
+    return cities.get((randomized[(i+ randomized[i%randomized.length])%randomized.length]) % cities.size());
+  }
+
+  protected String getSSN(int i){
+    return String.format("%d", 1000*1000*100 + randomized[ i % randomized.length]);
+  }
+
+  protected String getPhone(int i) {
+    //80% phones are unique,
+    return String.format("%d", 6500*1000*1000L + randomized[ (randomized.length - i) %((int) (randomized.length * 0.8)) ]);
+  }
+
+  protected String getEmail(int i){
+    return getFirstName(i) + getLastName(i) + "@" + "gmail.com";
+  }
+
+  protected String getAge(int i) {
+    return String.format("%d",randomized[i%randomized.length] % 60 + 10);
+  }
+
+  protected String getIncome(int i) {//unit should be $10k
+    return String.format("%d",randomized[i%randomized.length] % 47 + 1);
+  }
+
+  //date yyyy-mm-dd
+  protected String getBirthdate(int i) {
+    int thisseed = randomized[i%randomized.length];
+    return String.format("%d-%02d-%02d",
+        2016 - (thisseed % 60 + 10), thisseed % 12 + 1, (thisseed * 31) % 28 + 1 );
+  }
+
+  //timestamp, yyyy-mm-dd HH:mm:ss
+  protected String getFirstLogin(int i) {
+    int thisseed = randomized[i%randomized.length];
+    int nextseed = randomized[(i+1)%randomized.length];
+    return String.format("%d-%02d-%02d %02d:%02d:%02d.0",
+        2016 - (thisseed % 7), (thisseed * 31) % 12 + 1, thisseed % 28 + 1, nextseed % 24, nextseed % 60, (nextseed * 47) % 60 );
+  }
+
+
+  protected String getField(String field, int i) {
+    if(field.equals("ssn")) {
+      return getSSN(i);
+    }
+    else if (field.equals("phone")) {
+      return getPhone(i);
+    }
+    else if(field.equals("email")) {
+      return getEmail(i);
+    }
+    else if(field.equals("city")) {
+      return getAddress(i)[1];
+    }
+    else if(field.equals("state")) {
+      return getAddress(i)[0];
+    }
+    else if(field.equals("fname")) {
+      return getFirstName(i);
+    }
+    else if(field.equals("lname")) {
+      return getLastName(i);
+    }
+    return "";
+  }
+
+
+  protected void initRandVector(int recordNumber) {
+    int i;
+    Random rand = new Random(2016);
+    randomized = new int[recordNumber];
+    for(i = 0; i<recordNumber; ++i) {
+      randomized[i] = i;
+    }
+    for (i=0; i<recordNumber; ++i) {
+      int idx1 =  rand.nextInt(recordNumber);
+      int idx2 = rand.nextInt(recordNumber);
+      int temp = randomized[idx1];
+      randomized[idx1] = randomized[idx2];
+      randomized[idx2] = temp;
+    }
+  }
+
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java
new file mode 100644
index 0000000..36e25ab
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/StatisticsTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+import com.google.common.collect.Lists;
+import com.mapr.db.Admin;
+import com.mapr.drill.maprdb.tests.MaprDBTestsSuite;
+import com.mapr.drill.maprdb.tests.json.BaseJsonTest;
+import com.mapr.tests.annotations.ClusterTest;
+import org.apache.drill.PlanTestBase;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.List;
+
+@Category(ClusterTest.class)
+public class StatisticsTest extends IndexPlanTest {
+  /**
+   *  A sample row of this 10K table:
+   ------------------+-----------------------------+--------+
+   | 1012  | {"city":"pfrrs","state":"pc"}  | {"email":"KfFzKUZwNk@gmail.com","phone":"6500005471"}  |
+   {"ssn":"100007423"}  | {"fname":"KfFzK","lname":"UZwNk"}  | {"age":53.0,"income":45.0}  | 1012   |
+   *
+   * This test suite generate random content to fill all the rows, since the random function always start from
+   * the same seed for different runs, when the row count is not changed, the data in table will always be the same,
+   * thus the query result could be predicted and verified.
+   */
+
+  @Test
+  @Ignore("Currently untested; re-enable after stats/costing integration complete")
+  public void testFilters() throws Exception {
+    String query;
+    String explain = "explain plan including all attributes for ";
+
+    // Top-level ANDs - Leading columns (personal.age), (address.state)
+    query = "select * from hbase.`index_test_primary` t "
+        + " where (t.personal.age < 30 or t.personal.age > 100)"
+        + " and (t.address.state = 'mo' or t.address.state = 'ca')";
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+
+    // Top-level ORs - Cannot split top-level ORs so use defaults
+    query = "select * from hbase.`index_test_primary` t "
+        + " where (t.personal.age > 30 and t.personal.age < 100)"
+        + " or (t.address.state = 'mo')";
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+
+    // ANDed condition - Leading index column(personal.age) and non-leading column(address.city)
+    query = "select * from hbase.`index_test_primary` t "
+        + " where (t.personal.age < 30 or t.personal.age > 100)"
+        + " and `address.city` = 'sf'";
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+
+    // ANDed condition - Leading index columns (address.state) and (address.city)
+    query = "select * from hbase.`index_test_primary` t "
+        + " where (`address.state` = 'mo' or `address.state` = 'ca') " // Leading index column
+        + " and `address.city` = 'sf'";                                // Non leading index column
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+
+    // ANDed condition - Leading index columns (address.state) and non-index column (name.fname)
+    query = "select * from hbase.`index_test_primary` t "
+        + " where (`address.state` = 'mo' or `address.state` = 'ca') " // Leading index column
+        + " and `name.fname` = 'VcFahj'";                              // Non index column
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+
+    // Simple condition - LIKE predicate
+    query = "select t._id as rowid from hbase.`index_test_primary` as t "
+        + "where t.driverlicense like '100007423%'";
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+
+    // Simple condition - LIKE predicate with ESCAPE clause
+    query = "select t._id as rowid from hbase.`index_test_primary` as t "
+        + "where t.driverlicense like '100007423%' ESCAPE '/'";
+    PlanTestBase.testPlanMatchingPatterns(explain+query,
+        new String[] {".*JsonTableGroupScan.*tableName=.*index_test_primary.*rows=10000"},
+        new String[] {}
+    );
+  }
+}
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java
new file mode 100644
index 0000000..a501f8f
--- /dev/null
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/TableIndexCmd.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mapr.drill.maprdb.tests.index;
+
+
+import com.mapr.db.Admin;
+import com.mapr.db.MapRDB;
+import org.apache.drill.exec.util.GuavaPatcher;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+* Copy classes to a MapR cluster node, then run a command like this:
+* java -classpath /tmp/drill-cmd-1.9.0-SNAPSHOT.jar:/opt/mapr/drill/drill-1.9.0/jars/*:/opt/mapr/drill/drill-1.9.0/jars/3rdparty/*:/opt/mapr/drill/drill-1.9.0/jars/ext/*
+*                 org.apache.drill.hbase.index.TableIndexGen -host 10.10.88.128 -port 5181 [-table pop3] [-size 1000000]
+*/
+
+class TestBigTable {
+
+  Admin admin;
+  boolean initialized = false;
+
+  LargeTableGen gen;
+
+  /*
+    "hbase.zookeeper.quorum": "10.10.88.128",
+    "hbase.zookeeper.property.clientPort": "5181"
+   */
+  void init(String host, String port) {
+    try {
+      admin = MapRDB.newAdmin();
+      initialized = true;
+      gen = new LargeTableGen(admin);
+    } catch (Exception e) {
+      System.out.println("Connection to HBase threw" + e.getMessage());
+    }
+  }
+}
+
+
+public class TableIndexCmd {
+
+  public static Map<String,String> parseParameter(String[] params) {
+    HashMap<String,String> retParams = new HashMap<String, String>();
+    for (int i=0; i<params.length; ++i) {
+      if (params[i].startsWith("-") && i<params.length - 1) {
+        String paramName = params[i].replaceFirst("-*", "");
+        retParams.put(paramName, params[i+1]);
+        ++i;
+      }
+    }
+    return retParams;
+  }
+
+  public static void pressEnterKeyToContinue()
+  {
+    System.out.println("Press any key to continue...");
+    try
+    {
+      System.in.read();
+    }
+    catch(Exception e)
+    {}
+  }
+
+
+  public static void main(String[] args) {
+    GuavaPatcher.patch();
+
+    String inHost = new String("localhost");
+    String inPort = new String("5181");
+    String inTable = new String("/tmp/population");
+    String dictPath = "hbase";
+    boolean waitKeyPress = true;
+    long inSize = 10000;
+    Map<String, String> params = parseParameter(args);
+    if(args.length >= 2) {
+      if(params.get("host") != null) {
+        inHost = params.get("host");
+      }
+      if(params.get("port") != null) {
+        inPort = params.get("port");
+      }
+      if(params.get("table") != null) {
+        inTable = params.get("table");
+      }
+      if(params.get("size") != null) {
+        inSize = Long.parseLong(params.get("size"));
+      }
+      if(params.get("dict") != null) {
+        dictPath = params.get("dict");
+      }
+      if(params.get("wait") != null) {
+        String answer = params.get("wait");
+        waitKeyPress = answer.startsWith("y") || answer.startsWith("t")? true : false;
+      }
+    }
+    if(waitKeyPress == true) {
+      pressEnterKeyToContinue();
+    }
+    try {
+      TestBigTable tbt = new TestBigTable();
+      tbt.init(inHost, inPort);
+      tbt.gen.generateTableWithIndex(inTable, (int)(inSize & 0xFFFFFFFFL), null);
+    }
+    catch(Exception e) {
+      System.out.println("generate big table got exception:" + e.getMessage());
+      e.printStackTrace();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java
new file mode 100644
index 0000000..5f4da74
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/OrderedRel.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Class implementing OrderedPrel interface guarantees to provide ordered
+ * output on certain columns. TopNPrel and SortPrel base classes which implement
+ * this interface.
+ */
+public interface OrderedRel extends DrillRelNode {
+
+  /**
+   * A method to return ordering columns of the result.
+   * @return Collation order of the output.
+   */
+  RelCollation getCollation();
+
+  /**
+   * Offset value represented in RexNode.
+   * @return offset.
+   */
+  RexNode getOffset();
+
+  /**
+   * Fetch value represented in RexNode.
+   * @return fetch
+   */
+  RexNode getFetch();
+
+  /**
+   * A method to return if this relational node can be dropped during optimization process.
+   * @return true if this node can be dropped, false otherwise.
+   */
+  boolean canBeDropped();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
index 65788cb..45251c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.index;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -28,6 +27,7 @@ import org.apache.drill.exec.physical.base.DbGroupScan;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
 import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import java.util.List;
 import java.util.Set;
 
@@ -66,7 +66,7 @@ public interface IndexCallContext {
 
   RexNode getOrigCondition();
 
-  Sort getSort();
+  OrderedRel getSort();
 
   void createSortExprs();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java
index 27198bb..3a6ea83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexLogicalPlanCallContext.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.planner.index;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -31,6 +30,7 @@ import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
 import org.apache.drill.exec.planner.logical.DrillSortRel;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.calcite.rel.RelNode;
 
@@ -164,7 +164,7 @@ public class IndexLogicalPlanCallContext implements IndexCallContext {
     return origPushedCondition;
   }
 
-  public Sort getSort() {
+  public OrderedRel getSort() {
     return sort;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java
index 9c7b651..91ff02c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPhysicalPlanCallContext.java
@@ -21,7 +21,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
@@ -29,10 +28,10 @@ import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
 import org.apache.drill.exec.physical.base.DbGroupScan;
 import org.apache.drill.exec.planner.common.DrillProjectRelBase;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.physical.SortPrel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.FilterPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
@@ -42,8 +41,9 @@ import java.util.List;
 import java.util.Set;
 
 public class IndexPhysicalPlanCallContext implements IndexCallContext {
+
   final public RelOptRuleCall call;
-  final public SortPrel sort;
+  final public OrderedRel sort;
   final public ProjectPrel upperProject;
   final public FilterPrel filter;
   final public ProjectPrel lowerProject;
@@ -67,7 +67,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext {
   }
 
   public IndexPhysicalPlanCallContext(RelOptRuleCall call,
-                                      SortPrel sort,
+                                      OrderedRel sort,
                                       ProjectPrel capProject,
                                       FilterPrel filter,
                                       ProjectPrel project,
@@ -83,7 +83,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext {
   }
 
   public IndexPhysicalPlanCallContext(RelOptRuleCall call,
-                                      SortPrel sort,
+                                      OrderedRel sort,
                                       ProjectPrel project,
                                       ScanPrel scan, ExchangePrel exch) {
     this.call = call;
@@ -171,7 +171,7 @@ public class IndexPhysicalPlanCallContext implements IndexCallContext {
     return origPushedCondition;
   }
 
-  public Sort getSort() {
+  public OrderedRel getSort() {
     return sort;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java
index 666e282..cdad63a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexPlanUtils.java
@@ -38,6 +38,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
@@ -56,6 +57,7 @@ import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexInputRef;
@@ -346,6 +348,21 @@ public class IndexPlanUtils {
     return proj.getProjects();
   }
 
+  public static boolean generateLimit(OrderedRel sort) {
+    RexNode fetchNode = sort.getFetch();
+    int fetchValue = (fetchNode == null) ? -1 : RexLiteral.intValue(fetchNode);
+    return fetchValue >=0;
+  }
+
+  public static RexNode getOffset(OrderedRel sort) {
+    return sort.getOffset();
+  }
+
+  public static RexNode getFetch(OrderedRel sort) {
+    return sort.getFetch();
+  }
+
+
   /**
    * generate logical expressions for sort rexNodes in SortRel, the result is store to IndexPlanCallContext
    * @param indexContext
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java
index 36ff61f..456542b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/AbstractIndexPlanGenerator.java
@@ -30,7 +30,6 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
 import org.apache.calcite.rel.type.RelRecordType;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -43,16 +42,19 @@ import org.apache.drill.exec.planner.logical.DrillFilterRel;
 import org.apache.drill.exec.planner.logical.DrillProjectRel;
 import org.apache.drill.exec.planner.logical.DrillSortRel;
 import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.physical.SubsetTransformer;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.Prule;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.SortPrel;
 import org.apache.drill.exec.planner.physical.HashToMergeExchangePrel;
 import org.apache.drill.exec.planner.physical.SingleMergeExchangePrel;
 import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.planner.physical.Prule;
-import org.apache.drill.exec.planner.physical.SubsetTransformer;
-import org.apache.drill.exec.planner.physical.DrillDistributionTrait;
+import org.apache.drill.exec.planner.physical.TopNPrel;
+import org.apache.drill.exec.planner.physical.SortPrel;
+import org.apache.drill.exec.planner.physical.LimitPrel;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -67,7 +69,6 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
   final protected DrillProjectRelBase origProject;
   final protected DrillScanRelBase origScan;
   final protected DrillProjectRelBase upperProject;
-  final protected RelNode origSort;
 
   final protected RexNode indexCondition;
   final protected RexNode remainderCondition;
@@ -84,7 +85,6 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
     this.origProject = indexContext.getLowerProject();
     this.origScan = indexContext.getScan();
     this.upperProject = indexContext.getUpperProject();
-    this.origSort = indexContext.getSort();
     this.indexCondition = indexCondition;
     this.remainderCondition = remainderCondition;
     this.indexContext = indexContext;
@@ -168,8 +168,8 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
     return set;
   }
 
-  protected static boolean toRemoveSort(Sort sort, RelCollation inputCollation) {
-    if ( (inputCollation != null) && inputCollation.satisfies(IndexPlanUtils.getCollation(sort))) {
+  protected static boolean toRemoveSort(RelCollation sortCollation, RelCollation inputCollation) {
+    if ( (inputCollation != null) && inputCollation.satisfies(sortCollation)) {
       return true;
     }
     return false;
@@ -194,18 +194,34 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
     }
   }
 
+  private static RelNode getSortOrTopN(IndexCallContext indexContext,
+                                       RelNode sortNode, RelNode newRel, RelNode child) {
+    if (sortNode instanceof TopNPrel) {
+      return new TopNPrel(sortNode.getCluster(),
+                    newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL).plus(indexContext.getCollation()),
+                    child, ((TopNPrel)sortNode).getLimit(), indexContext.getCollation());
+    }
+    return new SortPrel(sortNode.getCluster(),
+            newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL).plus(indexContext.getCollation()),
+            child, indexContext.getCollation());
+  }
+
   public static RelNode getSortNode(IndexCallContext indexContext, RelNode newRel, boolean donotGenerateSort,
                                     boolean isSingleton, boolean isExchangeRequired) {
-    Sort rel = indexContext.getSort();
+    OrderedRel rel = indexContext.getSort();
     DrillDistributionTrait hashDistribution =
         new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
             ImmutableList.copyOf(indexContext.getDistributionFields()));
 
-    if ( toRemoveSort(indexContext.getSort(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
+    if ( toRemoveSort(indexContext.getCollation(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
       //we are going to remove sort
       logger.debug("Not generating SortPrel since we have the required collation");
-
-      RelTraitSet traits = newRel.getTraitSet().plus(IndexPlanUtils.getCollation(rel)).plus(Prel.DRILL_PHYSICAL);
+      if (IndexPlanUtils.generateLimit(rel)) {
+        newRel = new LimitPrel(newRel.getCluster(),
+                newRel.getTraitSet().plus(indexContext.getCollation()).plus(Prel.DRILL_PHYSICAL),
+                newRel, IndexPlanUtils.getOffset(rel), IndexPlanUtils.getFetch(rel));
+      }
+      RelTraitSet traits = newRel.getTraitSet().plus(indexContext.getCollation()).plus(Prel.DRILL_PHYSICAL);
       newRel = Prule.convert(newRel, traits);
       newRel = getExchange(newRel.getCluster(), isSingleton, isExchangeRequired,
                                  traits, hashDistribution, indexContext, newRel);
@@ -215,10 +231,9 @@ public abstract class AbstractIndexPlanGenerator extends SubsetTransformer<RelNo
         logger.debug("Not generating SortPrel and index plan, since just picking index for full index scan is not beneficial.");
         return null;
       }
-      RelTraitSet traits = newRel.getTraitSet().plus(IndexPlanUtils.getCollation(rel)).plus(Prel.DRILL_PHYSICAL);
-      newRel = new SortPrel(rel.getCluster(),
-              newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL).plus(IndexPlanUtils.getCollation(rel)),
-          Prule.convert(newRel, newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL)), IndexPlanUtils.getCollation(rel));
+      RelTraitSet traits = newRel.getTraitSet().plus(indexContext.getCollation()).plus(Prel.DRILL_PHYSICAL);
+      newRel = getSortOrTopN(indexContext, rel, newRel,
+                  Prule.convert(newRel, newRel.getTraitSet().replace(Prel.DRILL_PHYSICAL)));
       newRel = getExchange(newRel.getCluster(), isSingleton, isExchangeRequired,
                                  traits, hashDistribution, indexContext, newRel);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java
index 163aef9..e06ac8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/CoveringPlanNoFilterGenerator.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.planner.index.generators;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollation;
@@ -30,11 +32,11 @@ import org.apache.drill.exec.planner.index.FunctionalIndexInfo;
 import org.apache.drill.exec.planner.index.IndexPlanUtils;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
-import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.Prule;
+import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.physical.base.DbGroupScan;
 import java.util.List;
 
@@ -62,7 +64,7 @@ public class CoveringPlanNoFilterGenerator extends AbstractIndexPlanGenerator {
   }
 
   public RelNode convertChild() throws InvalidRelException {
-
+    Preconditions.checkNotNull(indexContext.getSort());
     if (indexGroupScan == null) {
       logger.error("Null indexgroupScan in CoveringIndexPlanGenerator.convertChild");
       return null;
@@ -106,11 +108,9 @@ public class CoveringPlanNoFilterGenerator extends AbstractIndexPlanGenerator {
       }
     }
 
-    if (indexContext.getSort() != null) {
-      finalRel = getSortNode(indexContext, finalRel, true, isSingletonSortedStream, indexContext.getExchange() != null);
-      if (finalRel == null) {
-        return null;
-      }
+    finalRel = getSortNode(indexContext, finalRel, true, isSingletonSortedStream, indexContext.getExchange() != null);
+    if (finalRel == null) {
+      return null;
     }
 
     finalRel = Prule.convert(finalRel, finalRel.getTraitSet().plus(Prel.DRILL_PHYSICAL));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index db220fa..e1337bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -319,7 +319,7 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
     if (indexContext.getSort() != null) {
       // When ordering is required, serialize the index scan side. With parallel index scans, the rowkey join may receive
       // unsorted input because ordering is not guaranteed across different parallel inputs.
-      if (toRemoveSort(indexContext.getSort(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
+      if (toRemoveSort(indexContext.getCollation(), newRel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE))) {
         ((IndexGroupScan)indexScanPrel.getGroupScan()).setParallelizationWidth(1);
       }
       newRel = getSortNode(indexContext, newRel, false,true, true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
index db09504..86ed430 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/rules/DbScanSortRemovalRule.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.planner.index.rules;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelOptRuleOperand;
@@ -39,7 +41,7 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
-import org.apache.drill.exec.planner.physical.SortPrel;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.calcite.rel.RelNode;
@@ -52,21 +54,21 @@ public class DbScanSortRemovalRule extends Prule {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DbScanSortRemovalRule.class);
 
   public static final RelOptRule INDEX_SORT_EXCHANGE_SCAN =
-      new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+      new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
           RelOptHelper.some(HashToRandomExchangePrel.class,
               RelOptHelper.any(ScanPrel.class))), "DbScanSortRemovalRule:sort_exchange_Scan", new MatchSES());
 
   public static final RelOptRule INDEX_SORT_SCAN =
-          new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+          new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
                           RelOptHelper.any(ScanPrel.class)), "DbScanSortRemovalRule:Sort_Scan", new MatchSS());
 
   public static final RelOptRule INDEX_SORT_PROJ_SCAN =
-          new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+          new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
                   RelOptHelper.some(ProjectPrel.class,
                     RelOptHelper.any(ScanPrel.class))), "DbScanSortRemovalRule:Sort_Proj_Scan", new MatchSPS());
 
   public static final RelOptRule INDEX_SORT_EXCHANGE_PROJ_SCAN =
-      new DbScanSortRemovalRule(RelOptHelper.some(SortPrel.class,
+      new DbScanSortRemovalRule(RelOptHelper.some(OrderedRel.class,
           RelOptHelper.some(HashToRandomExchangePrel.class,
               RelOptHelper.some(ProjectPrel.class,
                   RelOptHelper.any(ScanPrel.class)))), "DbScanSortRemovalRule:sort_exchange_proj_Scan", new MatchSEPS());
@@ -80,16 +82,21 @@ public class DbScanSortRemovalRule extends Prule {
     this.match = match;
   }
 
+  private static boolean isRemovableRel(OrderedRel node) {
+    return node.canBeDropped();
+  }
+
   private static class MatchSES extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
 
     public boolean match(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel)call.rel(2);
-      return checkScan(scan.getGroupScan());
+      final OrderedRel sort = call.rel(0);
+      final ScanPrel scan = call.rel(2);
+      return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
     }
 
     public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
       final ScanPrel scan = call.rel(2);
-      final SortPrel sort = call.rel(0);
+      final OrderedRel sort = call.rel(0);
       final ExchangePrel exch = call.rel(1);
       return new IndexPhysicalPlanCallContext(call, sort, null, scan, exch);
     }
@@ -98,13 +105,14 @@ public class DbScanSortRemovalRule extends Prule {
   private static class MatchSS extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
 
     public boolean match(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel)call.rel(1);
-      return checkScan(scan.getGroupScan());
+      final OrderedRel sort = call.rel(0);
+      final ScanPrel scan = call.rel(1);
+      return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
     }
 
     public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
       final ScanPrel scan = call.rel(1);
-      final SortPrel sort = call.rel(0);
+      final OrderedRel sort = call.rel(0);
       return new IndexPhysicalPlanCallContext(call, sort, null, scan, null);
     }
   }
@@ -112,14 +120,15 @@ public class DbScanSortRemovalRule extends Prule {
   private static class MatchSPS extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
 
     public boolean match(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel)call.rel(2);
-      return checkScan(scan.getGroupScan());
+      final OrderedRel sort = call.rel(0);
+      final ScanPrel scan = call.rel(2);
+      return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
     }
 
     public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
       final ScanPrel scan = call.rel(2);
       final ProjectPrel proj = call.rel(1);
-      final SortPrel sort = call.rel(0);
+      final OrderedRel sort = call.rel(0);
       return new IndexPhysicalPlanCallContext(call, sort, proj, scan, null);
     }
   }
@@ -127,13 +136,14 @@ public class DbScanSortRemovalRule extends Prule {
   private static class MatchSEPS extends AbstractMatchFunction<IndexPhysicalPlanCallContext> {
 
     public boolean match(RelOptRuleCall call) {
-      final ScanPrel scan = (ScanPrel)call.rel(3);
-      return checkScan(scan.getGroupScan());
+      final OrderedRel sort = call.rel(0);
+      final ScanPrel scan = call.rel(3);
+      return sort instanceof Prel && checkScan(scan.getGroupScan()) && isRemovableRel(sort);
     }
 
     public IndexPhysicalPlanCallContext onMatch(RelOptRuleCall call) {
       final ScanPrel scan = call.rel(3);
-      final SortPrel sort = call.rel(0);
+      final OrderedRel sort = call.rel(0);
       final ProjectPrel proj = call.rel(2);
       final ExchangePrel exch = call.rel(1);
       return new IndexPhysicalPlanCallContext(call,  sort, proj, scan, exch);
@@ -187,12 +197,15 @@ public class DbScanSortRemovalRule extends Prule {
                           false, settings);
           if (planGen.convertChild() != null) {
             indexContext.getCall().transformTo(planGen.convertChild());
+          } else {
+            logger.debug("Not able to generate index plan in ", this.getClass().toString());
           }
         } catch (Exception e) {
           logger.warn("Exception while trying to generate indexscan to remove sort", e);
         }
       }
     } else {
+      Preconditions.checkNotNull(indexContext.getSort());
       //This case tries to use the already generated index to see if a sort can be removed.
       if (indexContext.scan.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE).getFieldCollations().size() == 0) {
         return;
@@ -204,12 +217,12 @@ public class DbScanSortRemovalRule extends Prule {
           inputs.add(finalRel);
           finalRel = indexContext.lowerProject.copy(indexContext.lowerProject.getTraitSet(), inputs);
         }
-        if (indexContext.getSort() != null) {
-          finalRel = AbstractIndexPlanGenerator.getSortNode(indexContext, finalRel, true,false,
+
+        finalRel = AbstractIndexPlanGenerator.getSortNode(indexContext, finalRel, true,false,
                   indexContext.exch != null);
-        }
 
         if (finalRel == null) {
+          logger.debug("Not able to generate index plan in ", this.getClass().toString());
           return;
         }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
index cfa0e26..1e380cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelFieldCollation;
@@ -41,7 +42,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 /**
  * Sort implemented in Drill.
  */
-public class DrillSortRel extends Sort implements DrillRel {
+public class DrillSortRel extends Sort implements DrillRel,OrderedRel {
 
   /** Creates a DrillSortRel. */
   public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
@@ -98,4 +99,18 @@ public class DrillSortRel extends Sort implements DrillRel {
     return new DrillSortRel(context.getCluster(), context.getLogicalTraits(), input, RelCollations.of(collations));
   }
 
+  @Override
+  public RexNode getOffset() {
+    return offset;
+  }
+
+  @Override
+  public RexNode getFetch() {
+    return fetch;
+  }
+
+  @Override
+  public boolean canBeDropped() {
+    return true;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 77fb4c8..8064c42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
@@ -40,16 +41,25 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
 
-public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
+public class SortPrel extends org.apache.calcite.rel.core.Sort implements OrderedRel,Prel {
+  private final boolean isRemovable;
 
   /** Creates a DrillSortRel. */
   public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
     super(cluster, traits, input, collation);
+    isRemovable = true;
   }
 
   /** Creates a DrillSortRel with offset and fetch. */
   public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
     super(cluster, traits, input, collation, offset, fetch);
+    isRemovable = true;
+  }
+
+  /** Creates a DrillSortRel. */
+  public SortPrel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, boolean isRemovable) {
+    super(cluster, traits, input, collation);
+    this.isRemovable = isRemovable;
   }
 
   @Override
@@ -141,4 +151,20 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
 
     return this.copy(traits, children.get(0), collationTrait, this.offset, this.fetch);
   }
+
+  @Override
+  public RexNode getOffset() {
+    return offset;
+  }
+
+  @Override
+  public RexNode getFetch() {
+    return fetch;
+  }
+
+  @Override
+  public boolean canBeDropped() {
+    return isRemovable;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java
index 3fc86b3..bec1b6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrule.java
@@ -47,26 +47,25 @@ public class SortPrule extends Prule{
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final DrillSortRel sort = (DrillSortRel) call.rel(0);
+    final DrillSortRel sort = call.rel(0);
     final RelNode input = sort.getInput();
 
     // Keep the collation in logical sort. Convert input into a RelNode with 1) this collation, 2) Physical, 3) hash distributed on
 
     DrillDistributionTrait hashDistribution =
-        new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort)));
+            new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(sort)));
 
-    final RelTraitSet traits = sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution);
-
-    final RelNode convertedInput = convert(input, traits);
+    final RelTraitSet traits = RelTraitSet.createEmpty().plus(Prel.DRILL_PHYSICAL).plus(hashDistribution);
+    SortPrel child = new SortPrel(sort.getCluster(), traits.plus(sort.getCollation()),
+            convert(sort.getInput(), traits), sort.getCollation(), false);
 
     if(isSingleMode(call)){
-      call.transformTo(convertedInput);
+      call.transformTo(child);
     }else{
-      RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), convertedInput, sort.getCollation());
+      RelNode exch = new SingleMergeExchangePrel(sort.getCluster(), sort.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON), child, sort.getCollation());
       call.transformTo(exch);  // transform logical "sort" into "SingleMergeExchange".
 
     }
-
   }
 
   private List<DistributionField> getDistributionField(DrillSortRel rel) {
@@ -76,7 +75,6 @@ public class SortPrule extends Prule{
       DistributionField field = new DistributionField(relField.getFieldIndex());
       distFields.add(field);
     }
-
     return distFields;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index e9414f1..f8f4b9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -26,8 +26,11 @@ import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.planner.common.OrderedRel;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -40,7 +43,7 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 
-public class TopNPrel extends SinglePrel {
+public class TopNPrel extends SinglePrel implements OrderedRel,Prel {
 
   protected int limit;
   protected final RelCollation collation;
@@ -66,6 +69,28 @@ public class TopNPrel extends SinglePrel {
     return creator.addMetadata(this, topN);
   }
 
+  @Override
+  public RelCollation getCollation() {
+    return collation;
+  }
+
+  @Override
+  public RexNode getOffset() {
+    return getCluster().getRexBuilder().makeExactLiteral(BigDecimal.ZERO,
+                  getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+  }
+
+  @Override
+  public RexNode getFetch() {
+    return getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit),
+                 getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+  }
+
+  @Override
+  public boolean canBeDropped() {
+    return true;
+  }
+
   /**
    * Cost of doing Top-N is proportional to M log N where M is the total number of
    * input rows and N is the limit for Top-N.  This makes Top-N preferable to Sort
@@ -93,6 +118,10 @@ public class TopNPrel extends SinglePrel {
         .item("limit", limit);
   }
 
+  public int getLimit() {
+    return limit;
+  }
+
   @Override
   public SelectionVectorMode[] getSupportedEncodings() {
     return SelectionVectorMode.NONE_AND_TWO;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
index f77a437..fa8e69d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/ExcessiveExchangeIdentifier.java
@@ -25,6 +25,11 @@ import org.apache.drill.exec.planner.physical.ExchangePrel;
 import org.apache.drill.exec.planner.physical.Prel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.planner.physical.ScreenPrel;
+import org.apache.drill.exec.planner.physical.LimitPrel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.SingleMergeExchangePrel;
+import org.apache.drill.exec.planner.physical.HashToMergeExchangePrel;
 import org.apache.calcite.rel.RelNode;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -48,7 +53,18 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
     parent.add(prel);
     MajorFragmentStat newFrag = new MajorFragmentStat();
     newFrag.setRightSideOfLateral(parent.isRightSideOfLateral());
+
+    if (prel instanceof SingleMergeExchangePrel) {
+      newFrag.isSimpleRel = true;
+    }
+
     Prel newChild = ((Prel) prel.getInput()).accept(this, newFrag);
+
+    if (parent.isSimpleRel &&
+        prel instanceof HashToMergeExchangePrel) {
+      return newChild;
+    }
+
     if (canRemoveExchange(parent, newFrag)) {
       return newChild;
     } else {
@@ -129,6 +145,8 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
       s.add(p);
     }
 
+    s.setHashDistribution(prel);
+
     for(Prel p : prel) {
       children.add(p.accept(this, s));
     }
@@ -145,6 +163,9 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
     private int maxWidth = Integer.MAX_VALUE;
     private boolean isMultiSubScan = false;
     private boolean rightSideOfLateral = false;
+    //This flag if true signifies that all the Rels thus far
+    //are simple rels with no distribution requirement.
+    private boolean isSimpleRel = false;
 
     public void add(Prel prel) {
       maxRows = Math.max(prel.estimateRowCount(prel.getCluster().getMetadataQuery()), maxRows);
@@ -162,6 +183,13 @@ public class ExcessiveExchangeIdentifier extends BasePrelVisitor<Prel, Excessive
       add(prel);
     }
 
+    public void setHashDistribution(Prel prel) {
+      isSimpleRel = isSimpleRel &&
+                    (prel instanceof LimitPrel ||
+                     prel instanceof ProjectPrel ||
+                     prel instanceof FilterPrel);
+    }
+
     public boolean isSingular() {
       // do not remove exchanges when a scan has more than one subscans (e.g. SystemTableScan)
       if (isMultiSubScan) {