You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/07 07:22:27 UTC
[1/2] incubator-carbondata git commit: fixlatedecoder
Repository: incubator-carbondata
Updated Branches:
refs/heads/master ac4575536 -> 47658b17d
fixlatedecoder
fix comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a9553e6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a9553e6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a9553e6b
Branch: refs/heads/master
Commit: a9553e6b8d086b98fab6df7b93a4e78150796fc9
Parents: ac45755
Author: QiangCai <qi...@qq.com>
Authored: Tue Dec 6 17:40:21 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 7 15:21:51 2016 +0800
----------------------------------------------------------------------
examples/spark2/src/main/resources/data.csv | 20 +--
.../carbondata/examples/CarbonExample.scala | 10 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 1 -
.../execution/CarbonLateDecodeStrategy.scala | 157 ++++++++++---------
.../sql/optimizer/CarbonLateDecodeRule.scala | 128 +++++++++++++--
.../carbondata/CarbonDataSourceSuite.scala | 28 +++-
6 files changed, 236 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index fcdf3c1..b44672f 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,10 +1,10 @@
-1,10,100,48.4,spark,2015/4/23
-5,17,140,43.4,spark,2015/7/27
-1,11,100,44.4,flink,2015/5/23
-1,10,150,43.4,spark,2015/7/24
-1,10,100,47.4,spark,2015/7/23
-3,14,160,43.4,hive,2015/7/26
-2,10,100,43.4,impala,2015/7/23
-1,10,100,43.4,spark,2015/5/23
-4,16,130,42.4,impala,2015/7/23
-1,10,100,43.4,spark,2015/7/23
\ No newline at end of file
+1,10,100,48.4,spark,2015/4/23,1.23
+5,17,140,43.4,spark,2015/7/27,3.45
+1,11,100,44.4,flink,2015/5/23,23.23
+1,10,150,43.4,spark,2015/7/24,254.12
+1,10,100,47.4,spark,2015/7/23,876.14
+3,14,160,43.4,hive,2015/7/26,3454.32
+2,10,100,43.4,impala,2015/7/23,456.98
+1,10,100,43.4,spark,2015/5/23,32.53
+4,16,130,42.4,impala,2015/7/23,67.23
+1,10,100,43.4,spark,2015/7/23,832.23
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 59cc4e9..17674ef 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -68,8 +68,8 @@ object CarbonExample {
| bigintField long,
| doubleField double,
| stringField string,
- | timestampField timestamp
- | )
+ | timestampField timestamp,
+ | decimalField decimal(18,2))
| USING org.apache.spark.sql.CarbonSource
""".stripMargin)
@@ -86,7 +86,8 @@ object CarbonExample {
| bigintField long,
| doubleField double,
| stringField string,
- | timestampField string)
+ | timestampField string,
+ | decimalField decimal(18,2))
| ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""".stripMargin)
@@ -105,13 +106,14 @@ object CarbonExample {
s"""
| INSERT INTO TABLE carbon_table
| SELECT shortField, intField, bigintField, doubleField, stringField,
- | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField
+ | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField
| FROM csv_table
""".stripMargin)
spark.sql("""
SELECT *
FROM carbon_table
+ where stringfield = 'spark' and decimalField > 40
""").show
spark.sql("""
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c7ca61d..db864c7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -320,7 +320,6 @@ class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
new Iterator[Row] {
var flag = true
var total = 0L
-
override final def hasNext: Boolean = iter.hasNext
override final def next(): Row = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index c73fde6..57b2139 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.CatalystTypeConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
@@ -31,14 +29,10 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{IntegerType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types.IntegerType
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-
-
-
/**
* Carbon strategy for late decode (convert dictionary key to value as late as possible), which
* can improve the aggregation performance and reduce memory usage
@@ -92,10 +86,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
relation: LogicalRelation,
output: Seq[Attribute],
rdd: RDD[Row],
- needoDecode: ArrayBuffer[AttributeReference]):
+ needDecode: ArrayBuffer[AttributeReference]):
RDD[InternalRow] = {
- val newRdd = if (needoDecode.size > 0) {
- getDecoderRDD(relation, needoDecode, rdd, output)
+ val newRdd = if (needDecode.size > 0) {
+ getDecoderRDD(relation, needDecode, rdd, output)
} else {
rdd
}
@@ -310,74 +304,89 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
*/
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
predicate match {
- case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
- Some(sources.EqualTo(a.name, convertToScala(v, t)))
- case expressions.EqualTo(Literal(v, t), a: Attribute) =>
- Some(sources.EqualTo(a.name, convertToScala(v, t)))
-
- case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
- Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
- case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
- Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
-
- case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
- Some(sources.GreaterThan(a.name, convertToScala(v, t)))
- case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
- Some(sources.LessThan(a.name, convertToScala(v, t)))
-
- case expressions.LessThan(a: Attribute, Literal(v, t)) =>
- Some(sources.LessThan(a.name, convertToScala(v, t)))
- case expressions.LessThan(Literal(v, t), a: Attribute) =>
- Some(sources.GreaterThan(a.name, convertToScala(v, t)))
-
- case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
- Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
- case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
- Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
-
- case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
- Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
- case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
- Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
-
- case expressions.InSet(a: Attribute, set) =>
- val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
- Some(sources.In(a.name, set.toArray.map(toScala)))
-
- // Because we only convert In to InSet in Optimizer when there are more than certain
- // items. So it is possible we still get an In expression here that needs to be pushed
- // down.
- case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
- val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
- Some(sources.In(a.name, hSet.toArray.map(toScala)))
+ case or@ Or(left, right) =>
- case expressions.IsNull(a: Attribute) =>
- Some(sources.IsNull(a.name))
- case expressions.IsNotNull(a: Attribute) =>
- Some(sources.IsNotNull(a.name))
+ val leftFilter = translateFilter(left)
+ val rightFilter = translateFilter(right)
+ if (leftFilter.isDefined && rightFilter.isDefined) {
+ Some( sources.Or(leftFilter.get, rightFilter.get))
+ } else {
+ None
+ }
- case expressions.And(left, right) =>
+ case And(left, right) =>
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
- case expressions.Or(left, right) =>
- for {
- leftFilter <- translateFilter(left)
- rightFilter <- translateFilter(right)
- } yield sources.Or(leftFilter, rightFilter)
-
- case expressions.Not(child) =>
- translateFilter(child).map(sources.Not)
-
- case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
- Some(sources.StringStartsWith(a.name, v.toString))
-
- case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
- Some(sources.StringEndsWith(a.name, v.toString))
-
- case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
- Some(sources.StringContains(a.name, v.toString))
- case _ => None
+ case EqualTo(a: Attribute, Literal(v, t)) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(l@Literal(v, t), a: Attribute) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.EqualTo(a.name, v))
+
+ case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+ case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+ case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.Not(sources.In(a.name, hSet.toArray)))
+ case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.In(a.name, hSet.toArray))
+ case Not(In(Cast(a: Attribute, _), list))
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.Not(sources.In(a.name, hSet.toArray)))
+ case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.In(a.name, hSet.toArray))
+
+ case GreaterThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThan(a.name, v))
+ case GreaterThan(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThan(a.name, v))
+ case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.GreaterThan(a.name, v))
+ case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.LessThan(a.name, v))
+
+ case LessThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThan(a.name, v))
+ case LessThan(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThan(a.name, v))
+ case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.LessThan(a.name, v))
+ case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.GreaterThan(a.name, v))
+
+ case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+
+ case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+
+ case others => None
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 6b6960d..fb9df70 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -162,7 +162,38 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
} else {
Sort(sort.order, sort.global, child)
}
-
+ case union: Union
+ if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] ||
+ union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) =>
+ val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+ val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+ union.children(0).output.foreach(attr =>
+ leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+ union.children(1).output.foreach(attr =>
+ rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+ var leftPlan = union.children(0)
+ var rightPlan = union.children(1)
+ if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 &&
+ !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+ leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
+ new util.HashSet[AttributeReferenceWrapper](),
+ union.children(0))
+ }
+ if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 &&
+ !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+ rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
+ new util.HashSet[AttributeReferenceWrapper](),
+ union.children(1))
+ }
+ if (!decoder) {
+ decoder = true
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
+ Union(leftPlan, rightPlan),
+ isOuter = true)
+ } else {
+ Union(leftPlan, rightPlan)
+ }
case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
agg.aggregateExpressions.map {
@@ -456,39 +487,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
case cd: CarbonDictionaryCatalystDecoder =>
cd
case sort: Sort =>
+ val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+ if (sort.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+ val tempDecoder = sort.child.asInstanceOf[CarbonDictionaryTempDecoder]
+ tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+ }
val sortExprs = sort.order.map { s =>
s.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}.asInstanceOf[SortOrder]
}
Sort(sortExprs, sort.global, sort.child)
case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
+ val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+ if (agg.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+ val tempDecoder = agg.child.asInstanceOf[CarbonDictionaryTempDecoder]
+ tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+ }
+
val aggExps = agg.aggregateExpressions.map { aggExp =>
aggExp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}.asInstanceOf[Seq[NamedExpression]]
val grpExps = agg.groupingExpressions.map { gexp =>
gexp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}
Aggregate(grpExps, aggExps, agg.child)
case expand: Expand =>
+ val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+ if (expand.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+ val tempDecoder = expand.child.asInstanceOf[CarbonDictionaryTempDecoder]
+ tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+ }
expand.transformExpressions {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
case filter: Filter =>
- val filterExps = filter.condition transform {
- case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
- }
- Filter(filterExps, filter.child)
+ filter
case j: Join =>
marker.pushBinaryMarker(allAttrsNotDecode)
j
@@ -496,36 +559,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
marker.pushBinaryMarker(allAttrsNotDecode)
u
case p: Project if relations.nonEmpty =>
+ val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+ if (p.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+ val tempDecoder = p.child.asInstanceOf[CarbonDictionaryTempDecoder]
+ tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+ }
val prExps = p.projectList.map { prExp =>
prExp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}.asInstanceOf[Seq[NamedExpression]]
Project(prExps, p.child)
case wd: Window if relations.nonEmpty =>
+ val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+ if (wd.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+ val tempDecoder = wd.child.asInstanceOf[CarbonDictionaryTempDecoder]
+ tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+ }
val prExps = wd.output.map { prExp =>
prExp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}.asInstanceOf[Seq[Attribute]]
val wdExps = wd.windowExpressions.map { gexp =>
gexp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}.asInstanceOf[Seq[NamedExpression]]
val partitionSpec = wd.partitionSpec.map{ exp =>
exp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}
val orderSpec = wd.orderSpec.map { exp =>
exp.transform {
case attr: AttributeReference =>
- updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+ if(tempAttr.isDefined) {
+ tempAttr.get
+ } else {
+ updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+ }
}
}.asInstanceOf[Seq[SortOrder]]
Window(wdExps, partitionSpec, orderSpec, wd.child)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index aaa0a20..13c0257 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -26,7 +26,7 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
override def beforeAll(): Unit = {
spark = SparkSession
.builder()
- .master("local[4]")
+ .master("local")
.appName("CarbonExample")
.enableHiveSupport()
.config(CarbonCommonConstants.STORE_LOCATION,
@@ -48,10 +48,23 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
| bigintField long,
| doubleField double,
| stringField string,
- | decimalField decimal(13, 0)
- | )
+ | decimalField decimal(13, 0),
+ | timestampField string)
| USING org.apache.spark.sql.CarbonSource
""".stripMargin)
+
+ spark.sql(
+ s"""
+ | CREATE TABLE csv_table
+ | ( shortField short,
+ | intField int,
+ | bigintField long,
+ | doubleField double,
+ | stringField string,
+ | decimalField decimal(13, 0),
+ | timestampField string)
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ """.stripMargin)
}
override def afterAll(): Unit = {
@@ -64,10 +77,17 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
spark.sql("select * from carbon_testtable").collect()
}
-
test("agg") {
spark.sql("select stringField, sum(intField) , sum(decimalField) " +
"from carbon_testtable group by stringField").collect()
+
+ spark.sql(
+ s"""
+ | INSERT INTO TABLE carbon_testtable
+ | SELECT shortField, intField, bigintField, doubleField, stringField,
+ | decimalField, from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField
+ | FROM csv_table
+ """.stripMargin)
}
}
[2/2] incubator-carbondata git commit: [CARBONDATA-497][SPARK2]fix
datatype issue of CarbonLateDecoderRule This closes #403
Posted by ja...@apache.org.
[CARBONDATA-497][SPARK2]fix datatype issue of CarbonLateDecoderRule This closes #403
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/47658b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/47658b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/47658b17
Branch: refs/heads/master
Commit: 47658b17d9dfd9bb150f8e3ea38bcb1c26dd89e4
Parents: ac45755 a9553e6
Author: jackylk <ja...@huawei.com>
Authored: Wed Dec 7 15:22:15 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 7 15:22:15 2016 +0800
----------------------------------------------------------------------
examples/spark2/src/main/resources/data.csv | 20 +--
.../carbondata/examples/CarbonExample.scala | 10 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 1 -
.../execution/CarbonLateDecodeStrategy.scala | 157 ++++++++++---------
.../sql/optimizer/CarbonLateDecodeRule.scala | 128 +++++++++++++--
.../carbondata/CarbonDataSourceSuite.scala | 28 +++-
6 files changed, 236 insertions(+), 108 deletions(-)
----------------------------------------------------------------------