You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 18:00:08 UTC

[41/50] [abbrv] carbondata git commit: [CARBONDATA-3112] Optimise decompressing while filling the vector during conversion of primitive typess

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecdf3a5b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index b4dd1b1..16763d3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -303,6 +303,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     // applying the filter in spark's side. So we should disable vectorPushRowFilters option
     // in case of filters on global dictionary.
     val hasDictionaryFilterCols = hasFilterOnDictionaryColumn(filterSet, table)
+
+    // In case of more dictionary columns spark code gen needs generate lot of code and that slows
+    // down the query, so we limit the direct fill in case of more dictionary columns.
+    val hasMoreDictionaryCols = hasMoreDictionaryColumnsOnProjection(projectSet, table)
     val vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
     if (projects.map(_.toAttribute) == projects &&
         projectSet.size == projects.size &&
@@ -342,7 +346,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
-          && !hasDictionaryFilterCols) {
+          && !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
         // Here carbon only do page pruning and row level pruning will be done by spark.
         scan.inputRDDs().head match {
           case rdd: CarbonScanRDD[InternalRow] =>
@@ -386,7 +390,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
 
       var updateRequestedColumns =
-        if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols) {
+        if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols
+            && !hasMoreDictionaryCols) {
           updateRequestedColumnsFunc(
             (projectSet ++ filterSet).map(relation.attributeMap).toSeq,
             table,
@@ -398,7 +403,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         supportBatchedDataSource(relation.relation.sqlContext,
           updateRequestedColumns.asInstanceOf[Seq[Attribute]]) &&
         needDecoder.isEmpty
-      if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols) {
+      if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols
+          && !hasMoreDictionaryCols) {
         // revert for row scan
         updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
       }
@@ -414,7 +420,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         updateRequestedColumns.asInstanceOf[Seq[Attribute]])
       // Check whether spark should handle row filters in case of vector flow.
       if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
-          && !implictsExisted && !hasDictionaryFilterCols) {
+          && !implictsExisted && !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
         // Here carbon only do page pruning and row level pruning will be done by spark.
         scan.inputRDDs().head match {
           case rdd: CarbonScanRDD[InternalRow] =>
@@ -518,6 +524,18 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
     filterColumns.exists(c => map.get(c.name).getOrElse(false))
   }
 
+  private def hasMoreDictionaryColumnsOnProjection(projectColumns: AttributeSet,
+      relation: CarbonDatasourceHadoopRelation): Boolean = {
+    val map = relation.carbonRelation.metaData.dictionaryMap
+    var count = 0
+    projectColumns.foreach{c =>
+      if (map.get(c.name).getOrElse(false)) {
+        count += 1
+      }
+    }
+    count > CarbonCommonConstants.CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT
+  }
+
   private def getPartitioning(carbonTable: CarbonTable,
       output: Seq[Attribute]): Partitioning = {
     val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)