You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 11:59:01 UTC

carbondata git commit: [CARBONDATA-3011] Add carbon property to configure vector based row pruning push down

Repository: carbondata
Updated Branches:
  refs/heads/master 9578786b2 -> de6e98b08


[CARBONDATA-3011] Add carbon property to configure vector based row pruning push down

Added below configuration in carbon to enable or disable row filter push down for vector.

carbon.push.rowfilters.for.vector
When enabled complete row filters will be handled by carbon in case of vector.
If it is disabled then only page level pruning will be done by carbon and row level filtering will be done by spark for vector.
 There is no change in flow for non-vector based queries.

Default value is true

This closes #2818


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de6e98b0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de6e98b0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de6e98b0

Branch: refs/heads/master
Commit: de6e98b085723811b0894e659c3c4ce9770f7ca2
Parents: 9578786
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 10:32:18 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Oct 25 17:28:29 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 12 +++
 .../carbondata/core/scan/model/QueryModel.java  | 13 ++++
 .../carbondata/core/util/CarbonProperties.java  |  8 ++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 17 +++-
 .../strategy/CarbonLateDecodeStrategy.scala     | 82 +++++++++++++++++---
 5 files changed, 120 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index fa5227b..72da3bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1725,6 +1725,18 @@ public final class CarbonCommonConstants {
    */
   public static final String CARBON_WRITTEN_BY_APPNAME = "carbon.writtenby.app.name";
 
+  /**
+   * When enabled complete row filters will be handled by carbon in case of vector.
+   * If it is disabled then only page level pruning will be done by carbon and row level filtering
+   * will be done by spark for vector.
+   * There is no change in flow for non-vector based queries.
+   */
+  @CarbonProperty
+  public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR =
+      "carbon.push.rowfilters.for.vector";
+
+  public static final String CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT = "true";
+
   //////////////////////////////////////////////////////////////////////////////////////////
   // Unused constants and parameters start here
   //////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index d90c35e..0951da0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -124,6 +124,11 @@ public class QueryModel {
 
   private boolean preFetchData = true;
 
+  /**
+   * It fills the vector directly from decoded column page with out any staging and conversions
+   */
+  private boolean isDirectVectorFill;
+
   private QueryModel(CarbonTable carbonTable) {
     tableBlockInfos = new ArrayList<TableBlockInfo>();
     invalidSegmentIds = new ArrayList<>();
@@ -406,6 +411,14 @@ public class QueryModel {
     this.preFetchData = preFetchData;
   }
 
+  public boolean isDirectVectorFill() {
+    return isDirectVectorFill;
+  }
+
+  public void setDirectVectorFill(boolean directVectorFill) {
+    isDirectVectorFill = directVectorFill;
+  }
+
   @Override
   public String toString() {
     return String.format("scan on table %s.%s, %d projection columns with filter (%s)",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index e6d48e5..49d89e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1500,6 +1500,12 @@ public final class CarbonProperties {
     return spillPercentage;
   }
 
+  public boolean isPushRowFiltersForVector() {
+    String pushFilters = getProperty(CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR,
+            CarbonCommonConstants.CARBON_PUSH_ROW_FILTERS_FOR_VECTOR_DEFAULT);
+    return Boolean.parseBoolean(pushFilters);
+  }
+
   private void validateSortMemorySpillPercentage() {
     String spillPercentageStr = carbonProperties.getProperty(
         CARBON_LOAD_SORT_MEMORY_SPILL_PERCENTAGE,
@@ -1558,4 +1564,6 @@ public final class CarbonProperties {
           CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT_DEFAULT);
     }
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 1a7eae2..33031fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -90,6 +90,8 @@ class CarbonScanRDD[T: ClassTag](
   }
   private var vectorReader = false
 
+  private var directScan = false
+
   private val bucketedTable = tableInfo.getFactTable.getBucketingInfo
 
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -228,9 +230,13 @@ class CarbonScanRDD[T: ClassTag](
       statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
       statisticRecorder.recordStatisticsForDriver(statistic, queryId)
       statistic = new QueryStatistic()
-      val carbonDistribution = CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
-        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+      val carbonDistribution = if (directScan) {
+        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
+      } else {
+        CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+      }
       // If bucketing is enabled on table then partitions should be grouped based on buckets.
       if (bucketedTable != null) {
         var i = 0
@@ -437,6 +443,7 @@ class CarbonScanRDD[T: ClassTag](
         case _ =>
           // create record reader for CarbonData file format
           if (vectorReader) {
+            model.setDirectVectorFill(directScan)
             val carbonRecordReader = createVectorizedCarbonRecordReader(model,
               inputMetricsStats,
               "true")
@@ -748,4 +755,8 @@ class CarbonScanRDD[T: ClassTag](
     vectorReader = boolean
   }
 
+  // TODO find the better way set it.
+  def setDirectScanSupport(isDirectScan: Boolean): Unit = {
+    directScan = isDirectScan
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de6e98b0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index f0184cd..04b65e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -57,6 +57,8 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
 private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   val PUSHED_FILTERS = "PushedFilters"
 
+  val vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
+
   /*
   Spark 2.3.1 plan there can be case of multiple projections like below
   Project [substring(name, 1, 2)#124, name#123, tupleId#117, cast(rand(-6778822102499951904)#125
@@ -213,11 +215,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       rdd: RDD[InternalRow],
       needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
+    val scanRdd = rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
     if (needDecode.nonEmpty) {
-      rdd.asInstanceOf[CarbonScanRDD[InternalRow]].setVectorReaderSupport(false)
+      scanRdd.setVectorReaderSupport(false)
       getDecoderRDD(relation, needDecode, rdd, output)
     } else {
-      rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
+      scanRdd
         .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
       rdd
     }
@@ -300,7 +303,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           }
         }
     }
-
+    // in case of the global dictionary if it has the filter then it needs to decode all data before
+    // applying the filter in spark's side. So we should disable vectorPushRowFilters option
+    // in case of filters on global dictionary.
+    val hasDictionaryFilterCols = hasFilterOnDictionaryColumn(filterSet, table)
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
         filterSet.subsetOf(projectSet)) {
@@ -337,19 +343,37 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
-      filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+      // Check whether spark should handle row filters in case of vector flow.
+      if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
+          && !hasDictionaryFilterCols) {
+        // Here carbon only do page pruning and row level pruning will be done by spark.
+        scan.inputRDDs().head match {
+          case rdd: CarbonScanRDD[InternalRow] =>
+            rdd.setDirectScanSupport(true)
+          case _ =>
+        }
+        filterPredicates.reduceLeftOption(expressions.And).map(execution.FilterExec(_, scan))
+          .getOrElse(scan)
+      } else {
+        filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+      }
     } else {
 
       var newProjectList: Seq[Attribute] = Seq.empty
+      // In case of implicit exist we should disable vectorPushRowFilters as it goes in IUD flow
+      // to get the positionId or tupleID
+      var implictsExisted = false
       val updatedProjects = projects.map {
           case a@Alias(s: ScalaUDF, name)
             if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
                 name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
             val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
             newProjectList :+= reference
+            implictsExisted = true
             reference
           case a@Alias(s: ScalaUDF, name)
             if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
+            implictsExisted = true
             val reference =
               AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
                 StringType, true)().withExprId(a.exprId)
@@ -363,7 +387,24 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       // Don't request columns that are only referenced by pushed filters.
       val requestedColumns =
         (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
-      val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+
+      var updateRequestedColumns =
+        if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols) {
+          updateRequestedColumnsFunc(
+            (projectSet ++ filterSet).map(relation.attributeMap).toSeq,
+            table,
+            needDecoder)
+      } else {
+        updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+      }
+      val supportBatch =
+        supportBatchedDataSource(relation.relation.sqlContext,
+          updateRequestedColumns.asInstanceOf[Seq[Attribute]]) &&
+        needDecoder.isEmpty
+      if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols) {
+        // revert for row scan
+        updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+      }
       val scan = getDataSourceScan(relation,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]],
         partitions,
@@ -374,10 +415,27 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         metadata,
         needDecoder,
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
-      execution.ProjectExec(
-        updateRequestedColumnsFunc(updatedProjects, table,
-          needDecoder).asInstanceOf[Seq[NamedExpression]],
-        filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+      // Check whether spark should handle row filters in case of vector flow.
+      if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
+          && !implictsExisted && !hasDictionaryFilterCols) {
+        // Here carbon only do page pruning and row level pruning will be done by spark.
+        scan.inputRDDs().head match {
+          case rdd: CarbonScanRDD[InternalRow] =>
+            rdd.setDirectScanSupport(true)
+          case _ =>
+        }
+        execution.ProjectExec(
+          updateRequestedColumnsFunc(updatedProjects, table,
+            needDecoder).asInstanceOf[Seq[NamedExpression]],
+          filterPredicates.reduceLeftOption(expressions.And).map(
+            execution.FilterExec(_, scan)).getOrElse(scan))
+      } else {
+        execution.ProjectExec(
+          updateRequestedColumnsFunc(updatedProjects, table,
+            needDecoder).asInstanceOf[Seq[NamedExpression]],
+          filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+      }
+
     }
   }
 
@@ -457,6 +515,12 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     }
   }
 
+  private def hasFilterOnDictionaryColumn(filterColumns: AttributeSet,
+      relation: CarbonDatasourceHadoopRelation): Boolean = {
+    val map = relation.carbonRelation.metaData.dictionaryMap
+    filterColumns.exists(c => map.get(c.name).getOrElse(false))
+  }
+
   private def getPartitioning(carbonTable: CarbonTable,
       output: Seq[Attribute]): Partitioning = {
     val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)