You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/10/25 23:09:02 UTC

[drill] 06/08: DRILL-6381: Address code review comments.

This is an automated email from the ASF dual-hosted git repository.

amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 5fa9c808daef8ea70a39ac2248daa99055955b72
Author: Aman Sinha <as...@maprtech.com>
AuthorDate: Mon Oct 1 12:06:39 2018 -0700

    DRILL-6381: Address code review comments.
---
 .../planner/index/MapRDBFunctionalIndexInfo.java   |  5 ++++
 .../exec/planner/index/MapRDBIndexDiscover.java    | 14 ++++++-----
 .../store/mapr/db/RestrictedMapRDBSubScanSpec.java | 28 ----------------------
 .../db/json/JsonTableRangePartitionFunction.java   |  2 +-
 .../drill/exec/physical/config/HashJoinPOP.java    | 12 ----------
 .../planner/index/AbstractIndexDescriptor.java     |  6 +++++
 .../drill/exec/planner/index/IndexDescriptor.java  | 11 +++++++--
 .../drill/exec/planner/index/IndexGroup.java       |  6 +----
 .../generators/IndexIntersectPlanGenerator.java    | 22 +++++++++++++----
 .../generators/NonCoveringIndexPlanGenerator.java  | 21 ++++++++--------
 .../physical/DrillDistributionTraitDef.java        |  8 ++++++-
 .../exec/planner/physical/RowKeyJoinPrel.java      |  7 ++++--
 12 files changed, 70 insertions(+), 72 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
index 01561a3..ec38636 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBFunctionalIndexInfo.java
@@ -138,6 +138,11 @@ public class MapRDBFunctionalIndexInfo implements FunctionalIndexInfo {
   }
 
   /**
+   * Suppose the index key has functions (rather than plain columns): CAST(a as int), CAST(b as varchar(10)),
+   * then we want to maintain a mapping of the logical expression of that function to the schema path of the
+   * base column involved in the function. In this example map has 2 entries:
+   *   CAST(a as int)  --> 'a'
+   *   CAST(b as varchar(10)) --> 'b'
    * @return the map of indexed expression --> the involved schema paths in a indexed expression
    */
   public Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr() {
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
index e1b8a61..c231e11 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/planner/index/MapRDBIndexDiscover.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.expression.parser.ExprLexer;
 import org.apache.drill.common.expression.parser.ExprParser;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.common.util.DrillFileUtils;
 import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillScanRelBase;
@@ -67,6 +68,7 @@ import java.util.Set;
 public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDiscover {
 
   static final String DEFAULT_STRING_CAST_LEN_STR = "256";
+  static final String FIELD_DELIMITER = ":";
 
   public MapRDBIndexDiscover(GroupScan inScan, DrillScanRelBase scanRel) {
     super((AbstractDbGroupScan) inScan, scanRel);
@@ -78,14 +80,14 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
 
   @Override
   public IndexCollection getTableIndex(String tableName) {
-    //return getTableIndexFromCommandLine(tableName);
     return getTableIndexFromMFS(tableName);
   }
 
   /**
-   *
+   * For a given table name get the list of indexes defined on the table according to the visibility of
+   * the indexes based on permissions.
    * @param tableName
-   * @return
+   * @return an IndexCollection representing the list of indexes for that table
    */
   private IndexCollection getTableIndexFromMFS(String tableName) {
     try {
@@ -120,8 +122,8 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
 
   FileSelection deriveFSSelection(DrillFileSystem fs, IndexDescriptor idxDesc) throws IOException {
     String tableName = idxDesc.getTableName();
-    String[] tablePath = tableName.split("/");
-    String tableParent = tableName.substring(0, tableName.lastIndexOf("/"));
+    String[] tablePath = tableName.split(DrillFileUtils.SEPARATOR);
+    String tableParent = tableName.substring(0, tableName.lastIndexOf(DrillFileUtils.SEPARATOR));
 
     return FileSelection.create(fs, tableParent, tablePath[tablePath.length - 1], false);
   }
@@ -318,7 +320,7 @@ public class MapRDBIndexDiscover extends IndexDiscoverBase implements IndexDisco
   private DrillIndexDescriptor buildIndexDescriptor(String tableName, IndexDesc desc)
       throws InvalidIndexDefinitionException {
     if (desc.isExternal()) {
-      //XX: not support external index
+      // External index is not currently supported
       return null;
     }
 
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
index bd8a32a..596699f 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScanSpec.java
@@ -188,32 +188,4 @@ public class RestrictedMapRDBSubScanSpec extends MapRDBSubScanSpec {
     currentIndex += numKeys;
   }
 
-  /**
-   * Returns the next row key in the iteration.
-   * @return the next row key in the iteration or null if no more row keys
-   */
-  @JsonIgnore
-  public String nextRowKey() {
-    if (hasRowKey()) {
-      // get the entry at the current index within this batch
-      Object o = rowKeyVector.getAccessor().getObject(currentIndex++);
-      if (o == null) {
-        throw new DrillRuntimeException("Encountered a null row key during restricted subscan !");
-      }
-
-      // this is specific to the way the hash join maintains its entries. once we have reached the max
-      // occupied index within a batch, move to the next one and reset the current index to 0
-      // TODO: we should try to abstract this out
-      if (currentIndex > maxOccupiedIndex) {
-        Pair<ValueVector, Integer> currentBatch = rjbatch.nextRowKeyBatch();
-        if (currentBatch != null) {
-          init(currentBatch);
-        }
-      }
-
-      return o.toString();
-    }
-    return null;
-  }
-
 }
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
index acaa6ca..436347f 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableRangePartitionFunction.java
@@ -186,7 +186,7 @@ public class JsonTableRangePartitionFunction extends AbstractRangePartitionFunct
     // get the table handle from the table cache
     Table table = plugin.getJsonTableCache().getTable(tableName, userName);
 
-    // Set the condition to null such that all scan ranges are retrieved for the primary table.
+    // Get all scan ranges for the primary table.
     // The reason is the row keys could typically belong to any one of the tablets of the table, so
     // there is no use trying to get only limited set of scan ranges.
     // NOTE: here we use the restrictedScanRangeSizeMB because the range partitioning should be parallelized
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
index 21d6092..4df9c38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
@@ -50,16 +50,6 @@ public class HashJoinPOP extends AbstractJoinPop {
   @JsonProperty("subScanForRowKeyJoin")
   private SubScan subScanForRowKeyJoin;
 
-  /*
-  public HashJoinPOP(
-      @JsonProperty("left") PhysicalOperator left,
-      @JsonProperty("right") PhysicalOperator right,
-      @JsonProperty("conditions") List<JoinCondition> conditions,
-      @JsonProperty("joinType") JoinRelType joinType
-  ) {
-    this(left, right, conditions, joinType, false, JoinControl.DEFAULT);
-  }
-*/
   @JsonCreator
   public HashJoinPOP(@JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right,
                      @JsonProperty("conditions") List<JoinCondition> conditions,
@@ -80,8 +70,6 @@ public class HashJoinPOP extends AbstractJoinPop {
                      List<JoinCondition> conditions,
                      JoinRelType joinType) {
     this(left, right, conditions, joinType, null, false, JoinControl.DEFAULT);
-    // super(left, right, joinType, null, conditions);
-    // Preconditions.checkArgument(joinType != null, "Join type is missing for HashJoin Pop");
   }
 
   @VisibleForTesting
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
index f908ead..dd042da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
@@ -71,4 +71,10 @@ public abstract class AbstractIndexDescriptor extends DrillIndexDefinition imple
       int numProjectedFields, GroupScan primaryGroupScan) {
     throw new UnsupportedOperationException("getCost() not supported for this index.");
   }
+
+  @Override
+  public boolean isAsyncIndex() {
+    return true;
+  }
+
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
index d43ba81..3b63230 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
@@ -72,12 +72,19 @@ public interface IndexDescriptor extends IndexDefinition {
    * @param primaryGroupScan Primary table's GroupScan instance
    * @return a RelOptCost instance representing the total cost
    */
-  public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+  RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
       int numProjectedFields, GroupScan primaryGroupScan);
 
   /**
    * Get the costing factors associated with the storage/format plugin
    */
-  public PluginCost getPluginCostModel();
+  PluginCost getPluginCostModel();
+
+  /**
+   * Whether this index is maintained synchronously (i.e primary table updates are propagated to the index
+   * synchronously) or asynchronously with some delay.  The latter is more common for distributed NoSQL databases.
+   * @return True if the index is maintained asynchronously, False otherwise
+   */
+  boolean isAsyncIndex();
 
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
index ea34ea5..3d3aeeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
@@ -33,11 +33,7 @@ public class IndexGroup {
   }
 
   public boolean isIntersectIndex() {
-    if (indexProps.size() > 1) {
-      return true;
-    } else {
-      return false;
-    }
+    return indexProps.size() > 1;
   }
 
   public int numIndexes() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
index f2091f6..b380c28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/IndexIntersectPlanGenerator.java
@@ -237,7 +237,8 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
     return finalRel;
   }
 
-  private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant) {
+  private Pair<RelNode, DbGroupScan> buildRestrictedDBScan(RexNode remnant,
+      boolean isAnyIndexAsync) {
 
     DbGroupScan origDbGroupScan = (DbGroupScan)IndexPlanUtils.getGroupScan(origScan);
     List<SchemaPath> cols = new ArrayList<SchemaPath>(origDbGroupScan.getColumns());
@@ -266,9 +267,16 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
     final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
         dbScan.getCluster().getTypeFactory().builder();
 
-    FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
-          dbScan, indexContext.getOrigCondition());
-    lastRelNode = leftIndexFilterPrel;
+    FilterPrel leftIndexFilterPrel = null;
+
+    // See NonCoveringIndexPlanGenerator for why we are re-applying index filter condition in case of async indexes.
+    // For intersect planning, any one of the intersected indexes may be async but to keep it simple we re-apply the
+    // full original condition.
+    if (isAnyIndexAsync) {
+      new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
+            dbScan, indexContext.getOrigCondition());
+      lastRelNode = leftIndexFilterPrel;
+    }
 
     // new Project's rowtype is original Project's rowtype [plus rowkey if rowkey is not in original rowtype]
     ProjectPrel leftIndexProjectPrel = null;
@@ -301,8 +309,12 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
   @Override
   public RelNode convertChild(final RelNode filter, final RelNode input) throws InvalidRelException {
     Map<IndexDescriptor, RexNode> idxConditionMap = Maps.newLinkedHashMap();
+    boolean isAnyIndexAsync = false;
     for(IndexDescriptor idx : indexInfoMap.keySet()) {
       idxConditionMap.put(idx, indexInfoMap.get(idx).indexCondition);
+      if (!isAnyIndexAsync && idx.isAsyncIndex()) {
+        isAnyIndexAsync = true;
+      }
     }
 
     RelNode indexPlan = null;
@@ -323,7 +335,7 @@ public class IndexIntersectPlanGenerator extends AbstractIndexPlanGenerator {
 
     //now with index plan constructed, build plan of left(probe) side to use restricted db scan
 
-    Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant);
+    Pair<RelNode, DbGroupScan> leftRelAndScan = buildRestrictedDBScan(remnant, isAnyIndexAsync);
 
     RelNode finalRel = buildRowKeyJoin(leftRelAndScan.left, rangeDistRight, true, JoinControl.DEFAULT);
     if ( upperProject != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
index e1337bc..c1bcf68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/generators/NonCoveringIndexPlanGenerator.java
@@ -205,16 +205,17 @@ public class NonCoveringIndexPlanGenerator extends AbstractIndexPlanGenerator {
     final RelDataTypeFactory.FieldInfoBuilder leftFieldTypeBuilder =
         dbScan.getCluster().getTypeFactory().builder();
 
-    //we are applying the same index condition to primary table's restricted scan, the reason
-    // for this is, the scans on index table and primary table are not a transaction, meaning that _after_ index scan,
-    // primary table might already have data get updated, thus some rows picked by index were modified and no more satisfy the
-    // index condition. By applying the same index condition again here, we will avoid the possibility to have some
-    //not-wanted records get into downstream operators in such scenarios.
-    //the remainder condition will be applied on top of RowKeyJoin.
-    FilterPrel leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
-          dbScan, indexContext.getOrigCondition());
-
-    lastLeft = leftIndexFilterPrel;
+    // We are applying the same index condition to primary table's restricted scan. The reason is, the index may be an async
+    // index .. i.e it is not synchronously updated along with the primary table update as part of a single transaction, so it
+    // is possible that after or during index scan, the primary table rows may have been updated and no longer satisfy the index
+    // condition. By re-applying the index condition here, we will ensure non-qualifying records are filtered out.
+    // The remainder condition will be applied on top of RowKeyJoin.
+    FilterPrel leftIndexFilterPrel = null;
+    if (indexDesc.isAsyncIndex()) {
+      leftIndexFilterPrel = new FilterPrel(dbScan.getCluster(), dbScan.getTraitSet(),
+            dbScan, indexContext.getOrigCondition());
+      lastLeft = leftIndexFilterPrel;
+    }
 
     RelDataType origRowType = origProject == null ? origScan.getRowType() : origProject.getRowType();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
index 0626483..6d52184 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillDistributionTraitDef.java
@@ -86,7 +86,13 @@ public class DrillDistributionTraitDef extends RelTraitDef<DrillDistributionTrai
         return new HashToRandomExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
                                              toDist.getFields());
       case RANGE_DISTRIBUTED:
-        // return new OrderedPartitionExchangePrel(rel.getCluster(), planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel);
+        // NOTE: earlier, for Range Distribution we were creating an OrderedPartitionExchange; however that operator is not actually
+        // used in any of the query plans because Drill's Sort does not do range based sorting (it does a HashToRandomExchange followed
+        // by a Sort).  Here, we are generating a RangePartitionExchange instead of OrderedPartitionExchange. The run-time implementation
+        // of RPE is a much simpler operator..it just does 'bucketing' based on ranges.  Also, it allows a parameter to specify the
+        // partitioning function whereas the OPE does a much more complex inferencing to determine which partition goes where. In future,
+        // if we do want to leverage OPE then we could create a new type of distribution trait or make the DistributionType a
+        // class instead of a simple enum and then we can distinguish whether an OPE or RPE is needed.
         return new RangePartitionExchangePrel(rel.getCluster(),
             planner.emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist), rel,
             toDist.getFields(), toDist.getPartitionFunction());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
index 616eb56..7e8f77e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrel.java
@@ -82,8 +82,11 @@ public class RowKeyJoinPrel extends JoinPrel implements Prel {
     }
     double rowCount = mq.getRowCount(this.getRight());
     DrillCostFactory costFactory = (DrillCostFactory) planner.getCostFactory();
-    return costFactory.makeCost(rowCount, 0, 0, 0,
-        0 /* mem cost is 0 because this operator does not make any extra copy of either the left or right batches */);
+
+    // RowKeyJoin operator by itself incurs negligible CPU and I/O cost since it is not doing a real join.
+    // The actual cost is attributed to the skip-scan (random I/O). The RK join will hold 1 batch in memory but
+    // it is not making any extra copy of either the left or right batches, so the memory cost is 0
+    return costFactory.makeCost(rowCount, 0, 0, 0, 0);
   }
 
   @Override