You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/21 18:00:08 UTC
[41/50] [abbrv] carbondata git commit: [CARBONDATA-3112] Optimise
decompressing while filling the vector during conversion of primitive typess
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecdf3a5b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index b4dd1b1..16763d3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -303,6 +303,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
// applying the filter in spark's side. So we should disable vectorPushRowFilters option
// in case of filters on global dictionary.
val hasDictionaryFilterCols = hasFilterOnDictionaryColumn(filterSet, table)
+
+ // In case of more dictionary columns spark code gen needs generate lot of code and that slows
+ // down the query, so we limit the direct fill in case of more dictionary columns.
+ val hasMoreDictionaryCols = hasMoreDictionaryColumnsOnProjection(projectSet, table)
val vectorPushRowFilters = CarbonProperties.getInstance().isPushRowFiltersForVector
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
@@ -342,7 +346,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
updateRequestedColumns.asInstanceOf[Seq[Attribute]])
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
- && !hasDictionaryFilterCols) {
+ && !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
// Here carbon only do page pruning and row level pruning will be done by spark.
scan.inputRDDs().head match {
case rdd: CarbonScanRDD[InternalRow] =>
@@ -386,7 +390,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
var updateRequestedColumns =
- if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols) {
+ if (!vectorPushRowFilters && !implictsExisted && !hasDictionaryFilterCols
+ && !hasMoreDictionaryCols) {
updateRequestedColumnsFunc(
(projectSet ++ filterSet).map(relation.attributeMap).toSeq,
table,
@@ -398,7 +403,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
supportBatchedDataSource(relation.relation.sqlContext,
updateRequestedColumns.asInstanceOf[Seq[Attribute]]) &&
needDecoder.isEmpty
- if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols) {
+ if (!vectorPushRowFilters && !supportBatch && !implictsExisted && !hasDictionaryFilterCols
+ && !hasMoreDictionaryCols) {
// revert for row scan
updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
}
@@ -414,7 +420,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
updateRequestedColumns.asInstanceOf[Seq[Attribute]])
// Check whether spark should handle row filters in case of vector flow.
if (!vectorPushRowFilters && scan.isInstanceOf[CarbonDataSourceScan]
- && !implictsExisted && !hasDictionaryFilterCols) {
+ && !implictsExisted && !hasDictionaryFilterCols && !hasMoreDictionaryCols) {
// Here carbon only do page pruning and row level pruning will be done by spark.
scan.inputRDDs().head match {
case rdd: CarbonScanRDD[InternalRow] =>
@@ -518,6 +524,18 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
filterColumns.exists(c => map.get(c.name).getOrElse(false))
}
+ private def hasMoreDictionaryColumnsOnProjection(projectColumns: AttributeSet,
+ relation: CarbonDatasourceHadoopRelation): Boolean = {
+ val map = relation.carbonRelation.metaData.dictionaryMap
+ var count = 0
+ projectColumns.foreach{c =>
+ if (map.get(c.name).getOrElse(false)) {
+ count += 1
+ }
+ }
+ count > CarbonCommonConstants.CARBON_ALLOW_DIRECT_FILL_DICT_COLS_LIMIT
+ }
+
private def getPartitioning(carbonTable: CarbonTable,
output: Seq[Attribute]): Partitioning = {
val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getTableName)