You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/02 04:27:38 UTC
[1/2] incubator-carbondata git commit: fix bug in late decode
optimizer and strategy
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 72900c553 -> f47bbc2c2
fix bug in late decode optimizer and strategy
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/07761876
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/07761876
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/07761876
Branch: refs/heads/master
Commit: 07761876e45bb76d9932fd2009108c722b718280
Parents: 72900c5
Author: QiangCai <qi...@qq.com>
Authored: Fri Dec 2 07:50:08 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Dec 2 12:27:02 2016 +0800
----------------------------------------------------------------------
conf/dataload.properties.template | 4 +-
examples/spark2/src/main/resources/data.csv | 20 +-
.../carbondata/examples/CarbonExample.scala | 36 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 6 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 151 +++++++-
.../execution/CarbonLateDecodeStrategy.scala | 345 ++++++++++++++++++-
.../sql/optimizer/CarbonLateDecodeRule.scala | 101 +-----
7 files changed, 533 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/conf/dataload.properties.template
----------------------------------------------------------------------
diff --git a/conf/dataload.properties.template b/conf/dataload.properties.template
index d5e9d6a..cfafb4c 100644
--- a/conf/dataload.properties.template
+++ b/conf/dataload.properties.template
@@ -18,14 +18,14 @@
#carbon store path
# you should change to the code path of your local machine
-carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store
+carbon.storelocation=/home/david/Documents/incubator-carbondata/examples/spark2/target/store
#true: use kettle to load data
#false: use new flow to load data
use_kettle=true
# you should change to the code path of your local machine
-carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins
+carbon.kettle.home=/home/david/Documents/incubator-carbondata/processing/carbonplugins
#csv delimiter character
delimiter=,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index 5d3169e..83ea3b3 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,11 +1,11 @@
shortField,intField,bigintField,doubleField,stringField,timestampField
-1, 10, 100, 48.4, spark, 2015/4/23
-5, 17, 140, 43.4, spark, 2015/7/27
-1, 11, 100, 44.4, flink, 2015/5/23
-1, 10, 150, 43.4, spark, 2015/7/24
-1, 10, 100, 47.4, spark, 2015/7/23
-3, 14, 160, 43.4, hive, 2015/7/26
-2, 10, 100, 43.4, impala, 2015/7/23
-1, 10, 100, 43.4, spark, 2015/5/23
-4, 16, 130, 42.4, impala, 2015/7/23
-1, 10, 100, 43.4, spark, 2015/7/23
+1,10,100,48.4,spark,2015/4/23
+5,17,140,43.4,spark,2015/7/27
+1,11,100,44.4,flink,2015/5/23
+1,10,150,43.4,spark,2015/7/24
+1,10,100,47.4,spark,2015/7/23
+3,14,160,43.4,hive,2015/7/26
+2,10,100,43.4,impala,2015/7/23
+1,10,100,43.4,spark,2015/5/23
+4,16,130,42.4,impala,2015/7/23
+1,10,100,43.4,spark,2015/7/23
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 75fdd1c..d3a7e86 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -26,7 +26,7 @@ object CarbonExample {
def main(args: Array[String]): Unit = {
// to run the example, plz change this path to your local machine path
- val rootPath = "/Users/jackylk/code/incubator-carbondata"
+ val rootPath = "/home/david/Documents/incubator-carbondata"
val spark = SparkSession
.builder()
.master("local")
@@ -38,10 +38,10 @@ object CarbonExample {
spark.sparkContext.setLogLevel("WARN")
// Drop table
- spark.sql("DROP TABLE IF EXISTS carbon_table")
- spark.sql("DROP TABLE IF EXISTS csv_table")
-
- // Create table
+// spark.sql("DROP TABLE IF EXISTS carbon_table")
+// spark.sql("DROP TABLE IF EXISTS csv_table")
+//
+// // Create table
spark.sql(
s"""
| CREATE TABLE carbon_table(
@@ -96,14 +96,26 @@ object CarbonExample {
FROM carbon_table
""").show
-// spark.sql("""
-// SELECT sum(intField), stringField
-// FROM carbon_table
-// GROUP BY stringField
-// """).show
+ spark.sql("""
+ SELECT *
+ FROM carbon_table where length(stringField) = 5
+ """).show
+
+ spark.sql("""
+ SELECT sum(intField), stringField
+ FROM carbon_table
+ GROUP BY stringField
+ """).show
+
+ spark.sql(
+ """
+ |select t1.*, t2.*
+ |from carbon_table t1, carbon_table t2
+ |where t1.stringField = t2.stringField
+ """.stripMargin).show
// Drop table
- spark.sql("DROP TABLE IF EXISTS carbon_table")
- spark.sql("DROP TABLE IF EXISTS csv_table")
+// spark.sql("DROP TABLE IF EXISTS carbon_table")
+// spark.sql("DROP TABLE IF EXISTS csv_table")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 24182ec..3b951ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -41,7 +41,7 @@ case class CarbonDatasourceHadoopRelation(
paths: Array[String],
parameters: Map[String, String],
tableSchema: Option[StructType])
- extends BaseRelation with PrunedFilteredScan {
+ extends BaseRelation {
lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
@@ -59,7 +59,7 @@ case class CarbonDatasourceHadoopRelation(
override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)
- override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
val job = new Job(new JobConf())
val conf = new Configuration(job.getConfiguration)
val filterExpression: Option[Expression] = filters.flatMap { filter =>
@@ -74,5 +74,5 @@ case class CarbonDatasourceHadoopRelation(
new CarbonScanRDD[Row](sqlContext.sparkContext, projection, filterExpression.orNull,
absIdentifier, carbonTable)
}
-
+ override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d05aefd..c7ca61d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.spark.TaskContext
+import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
@@ -33,8 +33,7 @@ import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, ColumnIdentif
import org.apache.carbondata.core.carbon.metadata.datatype.DataType
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.querystatistics._
-import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
/**
@@ -220,3 +219,149 @@ case class CarbonDictionaryDecoder(
}
}
+
+
+
+
+class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
+ profile: CarbonProfile,
+ aliasMap: CarbonAliasDecoderRelation,
+ prev: RDD[Row],
+ output: Seq[Attribute])
+ extends RDD[Row](prev) {
+
+ def canBeDecoded(attr: Attribute): Boolean = {
+ profile match {
+ case ip: IncludeProfile if ip.attributes.nonEmpty =>
+ ip.attributes
+ .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+ case ep: ExcludeProfile =>
+ !ep.attributes
+ .exists(a => a.name.equalsIgnoreCase(attr.name) && a.exprId == attr.exprId)
+ case _ => true
+ }
+ }
+
+ def convertCarbonToSparkDataType(carbonDimension: CarbonDimension,
+ relation: CarbonRelation): types.DataType = {
+ carbonDimension.getDataType match {
+ case DataType.STRING => StringType
+ case DataType.SHORT => ShortType
+ case DataType.INT => IntegerType
+ case DataType.LONG => LongType
+ case DataType.DOUBLE => DoubleType
+ case DataType.BOOLEAN => BooleanType
+ case DataType.DECIMAL =>
+ val scale: Int = carbonDimension.getColumnSchema.getScale
+ val precision: Int = carbonDimension.getColumnSchema.getPrecision
+ if (scale == 0 && precision == 0) {
+ DecimalType(18, 2)
+ } else {
+ DecimalType(precision, scale)
+ }
+ case DataType.TIMESTAMP => TimestampType
+ case DataType.STRUCT =>
+ CarbonMetastoreTypes
+ .toDataType(s"struct<${ relation.getStructChildren(carbonDimension.getColName) }>")
+ case DataType.ARRAY =>
+ CarbonMetastoreTypes
+ .toDataType(s"array<${ relation.getArrayChildren(carbonDimension.getColName) }>")
+ }
+ }
+
+ val getDictionaryColumnIds = {
+ val dictIds: Array[(String, ColumnIdentifier, DataType)] = output.map { a =>
+ val attr = aliasMap.getOrElse(a, a)
+ val relation = relations.find(p => p.contains(attr))
+ if(relation.isDefined && canBeDecoded(attr)) {
+ val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null &&
+ carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+ carbonDimension.getDataType)
+ } else {
+ (null, null, null)
+ }
+ } else {
+ (null, null, null)
+ }
+
+ }.toArray
+ dictIds
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+ val storepath = CarbonEnv.get.carbonMetastore.storePath
+ val absoluteTableIdentifiers = relations.map { relation =>
+ val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
+ (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
+ }.toMap
+
+ val cacheProvider: CacheProvider = CacheProvider.getInstance
+ val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+ cacheProvider.createCache(CacheType.FORWARD_DICTIONARY, storepath)
+ val dicts: Seq[Dictionary] = getDictionary(absoluteTableIdentifiers,
+ forwardDictionaryCache)
+ val dictIndex = dicts.zipWithIndex.filter(x => x._1 != null).map(x => x._2)
+ // add a task completion listener to clear dictionary that is a decisive factor for
+ // LRU eviction policy
+ val dictionaryTaskCleaner = TaskContext.get
+ dictionaryTaskCleaner.addTaskCompletionListener(context =>
+ dicts.foreach { dictionary =>
+ if (null != dictionary) {
+ dictionary.clear
+ }
+ }
+ )
+ val iter = firstParent[Row].iterator(split, context)
+ new Iterator[Row] {
+ var flag = true
+ var total = 0L
+
+ override final def hasNext: Boolean = iter.hasNext
+
+ override final def next(): Row = {
+ val startTime = System.currentTimeMillis()
+ val data = iter.next().asInstanceOf[GenericRow].toSeq.toArray
+ dictIndex.foreach { index =>
+ if ( data(index) != null) {
+ data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
+ .getDictionaryValueForKey(data(index).asInstanceOf[Int]),
+ getDictionaryColumnIds(index)._3)
+ }
+ }
+ new GenericRow(data)
+ }
+ }
+ }
+
+ private def isRequiredToDecode = {
+ getDictionaryColumnIds.find(p => p._1 != null) match {
+ case Some(value) => true
+ case _ => false
+ }
+ }
+
+ private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
+ cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
+ val dicts: Seq[Dictionary] = getDictionaryColumnIds.map { f =>
+ if (f._2 != null) {
+ try {
+ cache.get(new DictionaryColumnUniqueIdentifier(
+ atiMap(f._1).getCarbonTableIdentifier,
+ f._2, f._3))
+ } catch {
+ case _: Throwable => null
+ }
+ } else {
+ null
+ }
+ }
+ dicts
+ }
+
+ override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 4ae8d61..c73fde6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -17,16 +17,47 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.{CarbonDictionaryCatalystDecoder, CarbonDictionaryDecoder}
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.CatalystTypeConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
+
+
/**
* Carbon strategy for late decode (convert dictionary key to value as late as possible), which
* can improve the aggregation performance and reduce memory usage
*/
private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
+ val PUSHED_FILTERS = "PushedFilters"
+
def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
+ case PhysicalOperation(projects, filters, l: LogicalRelation)
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ pruneFilterProject(
+ l,
+ projects,
+ filters,
+ (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
+ a.map(_.name).toArray, f), needDecoder)) ::
+ Nil
case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
CarbonDictionaryDecoder(relations,
profile,
@@ -37,4 +68,316 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
}
}
+
+ def getDecoderRDD(logicalRelation: LogicalRelation,
+ projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
+ rdd: RDD[Row],
+ output: Seq[Attribute]): RDD[Row] = {
+ val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
+ val attrs = projectExprsNeedToDecode.map { attr =>
+ val newAttr = AttributeReference(attr.name,
+ attr.dataType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+ relation.addAttribute(newAttr)
+ newAttr
+ }
+ new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs),
+ CarbonAliasDecoderRelation(), rdd, output)
+ }
+
+ private[this] def toCatalystRDD(
+ relation: LogicalRelation,
+ output: Seq[Attribute],
+ rdd: RDD[Row],
+ needoDecode: ArrayBuffer[AttributeReference]):
+ RDD[InternalRow] = {
+ val newRdd = if (needoDecode.size > 0) {
+ getDecoderRDD(relation, needoDecode, rdd, output)
+ } else {
+ rdd
+ }
+ if (relation.relation.needConversion) {
+ execution.RDDConversions.rowToRowRdd(newRdd, output.map(_.dataType))
+ } else {
+ newRdd.asInstanceOf[RDD[InternalRow]]
+ }
+ }
+
+ protected def pruneFilterProject(
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Array[Filter],
+ ArrayBuffer[AttributeReference]) =>
+ RDD[InternalRow]) = {
+ pruneFilterProjectRaw(
+ relation,
+ projects,
+ filterPredicates,
+ (requestedColumns, _, pushedFilters, a) => {
+ scanBuilder(requestedColumns, pushedFilters.toArray, a)
+ })
+ }
+
+ protected def pruneFilterProjectRaw(
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Seq[Expression],
+ Seq[Filter], ArrayBuffer[AttributeReference]) =>
+ RDD[InternalRow]) = {
+
+ val projectSet = AttributeSet(projects.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+
+ val candidatePredicates = filterPredicates.map {
+ _ transform {
+ case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
+ }
+ }
+
+ val (unhandledPredicates, pushedFilters) =
+ selectFilters(relation.relation, candidatePredicates)
+
+ // A set of column attributes that are only referenced by pushed down filters. We can eliminate
+ // them from requested columns.
+ val handledSet = {
+ val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
+ val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
+ AttributeSet(handledPredicates.flatMap(_.references)) --
+ (projectSet ++ unhandledSet).map(relation.attributeMap)
+ }
+
+ // Combines all Catalyst filter `Expression`s that are either not convertible to data source
+ // `Filter`s or cannot be handled by `relation`.
+ val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
+ val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ val map = table.carbonRelation.metaData.dictionaryMap
+
+ val metadata: Map[String, String] = {
+ val pairs = ArrayBuffer.empty[(String, String)]
+
+ if (pushedFilters.nonEmpty) {
+ pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+ }
+ pairs.toMap
+ }
+
+
+ val needDecoder = ArrayBuffer[AttributeReference]()
+ filterCondition match {
+ case Some(exp: Expression) =>
+ exp.references.collect {
+ case attr: AttributeReference =>
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ needDecoder += attr
+ }
+ }
+ case None =>
+ }
+
+ projects.map {
+ case attr: AttributeReference =>
+ case Alias(attr: AttributeReference, _) =>
+ case others =>
+ others.references.map { f =>
+ val dictionary = map.get(f.name)
+ if (dictionary.isDefined && dictionary.get) {
+ needDecoder += f.asInstanceOf[AttributeReference]
+ }
+ }
+ }
+
+ if (projects.map(_.toAttribute) == projects &&
+ projectSet.size == projects.size &&
+ filterSet.subsetOf(projectSet)) {
+ // When it is possible to just use column pruning to get the right projection and
+ // when the columns of this projection are enough to evaluate all filter conditions,
+ // just do a scan followed by a filter, with no extra project.
+ val requestedColumns = projects
+ // Safe due to if above.
+ .asInstanceOf[Seq[Attribute]]
+ // Match original case of attributes.
+ .map(relation.attributeMap)
+ // Don't request columns that are only referenced by pushed filters.
+ .filterNot(handledSet.contains)
+ val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+
+ val updateProject = projects.map { expr =>
+ var attr = expr.toAttribute.asInstanceOf[AttributeReference]
+ if (!needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr
+ .exprId, attr.qualifier)
+ }
+ }
+ attr
+ }
+ val scan = execution.DataSourceScanExec.create(
+ updateProject,
+ scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+ relation.relation, metadata, relation.metastoreTableIdentifier)
+ filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+ } else {
+ // Don't request columns that are only referenced by pushed filters.
+ val requestedColumns =
+ (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
+ val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+ val scan = execution.DataSourceScanExec.create(
+ updateRequestedColumns,
+ scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+ relation.relation, metadata, relation.metastoreTableIdentifier)
+ execution.ProjectExec(
+ projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+ }
+ }
+
+ def updateRequestedColumnsFunc(requestedColumns: Seq[AttributeReference],
+ relation: CarbonDatasourceHadoopRelation,
+ needDecoder: ArrayBuffer[AttributeReference]): Seq[AttributeReference] = {
+ val map = relation.carbonRelation.metaData.dictionaryMap
+ requestedColumns.map { attr =>
+ if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+ attr
+ } else {
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ AttributeReference(attr.name,
+ IntegerType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier)
+ } else {
+ attr
+ }
+ }
+ }
+ }
+
+
+ protected[sql] def selectFilters(
+ relation: BaseRelation,
+ predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+
+ // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
+ // called `predicate`s, while all data source filters of type `sources.Filter` are simply called
+ // `filter`s.
+
+ val translated: Seq[(Expression, Filter)] =
+ for {
+ predicate <- predicates
+ filter <- translateFilter(predicate)
+ } yield predicate -> filter
+
+ // A map from original Catalyst expressions to corresponding translated data source filters.
+ val translatedMap: Map[Expression, Filter] = translated.toMap
+
+ // Catalyst predicate expressions that cannot be translated to data source filters.
+ val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
+
+ // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
+ // at here is that a data source may not be able to apply this filter to every row
+ // of the underlying dataset.
+ val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet
+
+ val (unhandled, handled) = translated.partition {
+ case (predicate, filter) =>
+ unhandledFilters.contains(filter)
+ }
+
+ // Catalyst predicate expressions that can be translated to data source filters, but cannot be
+ // handled by `relation`.
+ val (unhandledPredicates, _) = unhandled.unzip
+
+ // Translated data source filters that can be handled by `relation`
+ val (_, handledFilters) = handled.unzip
+
+ // translated contains all filters that have been converted to the public Filter interface.
+ // We should always push them to the data source no matter whether the data source can apply
+ // a filter to every row or not.
+ val (_, translatedFilters) = translated.unzip
+
+ (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+ }
+
+ /**
+ * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
+ * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
+ */
+ protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
+ predicate match {
+ case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
+ Some(sources.EqualTo(a.name, convertToScala(v, t)))
+ case expressions.EqualTo(Literal(v, t), a: Attribute) =>
+ Some(sources.EqualTo(a.name, convertToScala(v, t)))
+
+ case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
+ Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
+ case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
+ Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
+
+ case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThan(a.name, convertToScala(v, t)))
+ case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThan(a.name, convertToScala(v, t)))
+
+ case expressions.LessThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThan(a.name, convertToScala(v, t)))
+ case expressions.LessThan(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThan(a.name, convertToScala(v, t)))
+
+ case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
+ case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
+
+ case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
+ case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
+
+ case expressions.InSet(a: Attribute, set) =>
+ val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
+ Some(sources.In(a.name, set.toArray.map(toScala)))
+
+ // Because we only convert In to InSet in Optimizer when there are more than certain
+ // items. So it is possible we still get an In expression here that needs to be pushed
+ // down.
+ case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
+ Some(sources.In(a.name, hSet.toArray.map(toScala)))
+
+ case expressions.IsNull(a: Attribute) =>
+ Some(sources.IsNull(a.name))
+ case expressions.IsNotNull(a: Attribute) =>
+ Some(sources.IsNotNull(a.name))
+
+ case expressions.And(left, right) =>
+ (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
+
+ case expressions.Or(left, right) =>
+ for {
+ leftFilter <- translateFilter(left)
+ rightFilter <- translateFilter(right)
+ } yield sources.Or(leftFilter, rightFilter)
+
+ case expressions.Not(child) =>
+ translateFilter(child).map(sources.Not)
+
+ case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
+ Some(sources.StringStartsWith(a.name, v.toString))
+
+ case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
+ Some(sources.StringEndsWith(a.name, v.toString))
+
+ case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
+ Some(sources.StringContains(a.name, v.toString))
+ case _ => None
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07761876/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index c4b5d70..6b6960d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -55,110 +55,13 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
}
}
- def updateCarbonRelationDataType(plan: LogicalPlan): LogicalPlan = {
- val relations = plan collect {
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- }
- if(relations.nonEmpty && !isOptimized(plan)) {
- val map = mutable.HashMap[ExprId, AttributeReference]()
- val updateRelationPlan = plan transformDown {
- case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
- val newRelation = updateRelation(relation)
- val newl = LogicalRelation(newRelation, l.expectedOutputAttributes, l
- .metastoreTableIdentifier)
- for(i <- 0 until l.output.size) {
- map.put(l.output(i).exprId, newl.output(i))
- }
- newl
- }
-
- updateRelationPlan transformDown {
- case sort: Sort =>
- val sortExprs = sort.order.map { s =>
- s.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }.asInstanceOf[SortOrder]
- }
- Sort(sortExprs, sort.global, sort.child)
- case agg: Aggregate =>
- val aggExps = agg.aggregateExpressions.map { aggExp =>
- aggExp transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- case other => other
- }
- }.asInstanceOf[Seq[NamedExpression]]
-
- val grpExps = agg.groupingExpressions.map { gexp =>
- gexp.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- }
- Aggregate(grpExps, aggExps, agg.child)
- case expand: Expand =>
- expand.transformExpressions {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- case filter: Filter =>
- val filterExps = filter.condition transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- Filter(filterExps, filter.child)
- case p: Project if relations.nonEmpty =>
- val prExps = p.projectList.map { prExp =>
- prExp.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- }.asInstanceOf[Seq[NamedExpression]]
- Project(prExps, p.child)
- case wd: Window if relations.nonEmpty =>
- val prExps = wd.output.map { prExp =>
- prExp.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- }.asInstanceOf[Seq[Attribute]]
- val wdExps = wd.windowExpressions.map { gexp =>
- gexp.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- }.asInstanceOf[Seq[NamedExpression]]
- val partitionSpec = wd.partitionSpec.map{ exp =>
- exp.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- }
- val orderSpec = wd.orderSpec.map { exp =>
- exp.transform {
- case attr: AttributeReference =>
- map.getOrElse(attr.exprId, attr)
- }
- }.asInstanceOf[Seq[SortOrder]]
- Window(wdExps, partitionSpec, orderSpec, wd.child)
- case others => others
- }
- } else {
- plan
- }
- }
-
def apply(plan: LogicalPlan): LogicalPlan = {
- val updatePlan = updateCarbonRelationDataType(plan)
- relations = collectCarbonRelation(updatePlan)
+ relations = collectCarbonRelation(plan)
if (relations.nonEmpty && !isOptimized(plan)) {
LOGGER.info("Starting to optimize plan")
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder("")
val queryStatistic = new QueryStatistic()
- val result = transformCarbonPlan(updatePlan, relations)
+ val result = transformCarbonPlan(plan, relations)
queryStatistic.addStatistics("Time taken for Carbon Optimizer to optimize: ",
System.currentTimeMillis)
recorder.recordStatistics(queryStatistic)
[2/2] incubator-carbondata git commit: [CARBONDATA-481][SPARK2]fix
late decoder and support whole stage code gen This closes #379
Posted by ja...@apache.org.
[CARBONDATA-481][SPARK2]fix late decoder and support whole stage code gen This closes #379
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f47bbc2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f47bbc2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f47bbc2c
Branch: refs/heads/master
Commit: f47bbc2c23c330c6c30f768efb0f18ea610d5e30
Parents: 72900c5 0776187
Author: jackylk <ja...@huawei.com>
Authored: Fri Dec 2 12:27:26 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri Dec 2 12:27:26 2016 +0800
----------------------------------------------------------------------
conf/dataload.properties.template | 4 +-
examples/spark2/src/main/resources/data.csv | 20 +-
.../carbondata/examples/CarbonExample.scala | 36 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 6 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 151 +++++++-
.../execution/CarbonLateDecodeStrategy.scala | 345 ++++++++++++++++++-
.../sql/optimizer/CarbonLateDecodeRule.scala | 101 +-----
7 files changed, 533 insertions(+), 130 deletions(-)
----------------------------------------------------------------------