You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/07 07:22:27 UTC

[1/2] incubator-carbondata git commit: fixlatedecoder

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master ac4575536 -> 47658b17d


fixlatedecoder

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a9553e6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a9553e6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a9553e6b

Branch: refs/heads/master
Commit: a9553e6b8d086b98fab6df7b93a4e78150796fc9
Parents: ac45755
Author: QiangCai <qi...@qq.com>
Authored: Tue Dec 6 17:40:21 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 7 15:21:51 2016 +0800

----------------------------------------------------------------------
 examples/spark2/src/main/resources/data.csv     |  20 +--
 .../carbondata/examples/CarbonExample.scala     |  10 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   1 -
 .../execution/CarbonLateDecodeStrategy.scala    | 157 ++++++++++---------
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 128 +++++++++++++--
 .../carbondata/CarbonDataSourceSuite.scala      |  28 +++-
 6 files changed, 236 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
index fcdf3c1..b44672f 100644
--- a/examples/spark2/src/main/resources/data.csv
+++ b/examples/spark2/src/main/resources/data.csv
@@ -1,10 +1,10 @@
-1,10,100,48.4,spark,2015/4/23
-5,17,140,43.4,spark,2015/7/27
-1,11,100,44.4,flink,2015/5/23
-1,10,150,43.4,spark,2015/7/24
-1,10,100,47.4,spark,2015/7/23
-3,14,160,43.4,hive,2015/7/26
-2,10,100,43.4,impala,2015/7/23
-1,10,100,43.4,spark,2015/5/23
-4,16,130,42.4,impala,2015/7/23
-1,10,100,43.4,spark,2015/7/23
\ No newline at end of file
+1,10,100,48.4,spark,2015/4/23,1.23
+5,17,140,43.4,spark,2015/7/27,3.45
+1,11,100,44.4,flink,2015/5/23,23.23
+1,10,150,43.4,spark,2015/7/24,254.12
+1,10,100,47.4,spark,2015/7/23,876.14
+3,14,160,43.4,hive,2015/7/26,3454.32
+2,10,100,43.4,impala,2015/7/23,456.98
+1,10,100,43.4,spark,2015/5/23,32.53
+4,16,130,42.4,impala,2015/7/23,67.23
+1,10,100,43.4,spark,2015/7/23,832.23
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 59cc4e9..17674ef 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -68,8 +68,8 @@ object CarbonExample {
          |    bigintField long,
          |    doubleField double,
          |    stringField string,
-         |    timestampField timestamp
-         | )
+         |    timestampField timestamp,
+         |    decimalField decimal(18,2))
          | USING org.apache.spark.sql.CarbonSource
        """.stripMargin)
 
@@ -86,7 +86,8 @@ object CarbonExample {
          |    bigintField long,
          |    doubleField double,
          |    stringField string,
-         |    timestampField string)
+         |    timestampField string,
+         |    decimalField decimal(18,2))
          |    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
        """.stripMargin)
 
@@ -105,13 +106,14 @@ object CarbonExample {
       s"""
          | INSERT INTO TABLE carbon_table
          | SELECT shortField, intField, bigintField, doubleField, stringField,
-         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField
+         | from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField, decimalField
          | FROM csv_table
        """.stripMargin)
 
     spark.sql("""
              SELECT *
              FROM carbon_table
+             where stringfield = 'spark' and decimalField > 40
               """).show
 
     spark.sql("""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c7ca61d..db864c7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -320,7 +320,6 @@ class CarbonDecoderRDD(relations: Seq[CarbonDecoderRelation],
       new Iterator[Row] {
         var flag = true
         var total = 0L
-
         override final def hasNext: Boolean = iter.hasNext
 
         override final def next(): Row = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index c73fde6..57b2139 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -21,8 +21,6 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.CatalystTypeConverters._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
@@ -31,14 +29,10 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{IntegerType, StringType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.types.IntegerType
 
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 
-
-
-
 /**
  * Carbon strategy for late decode (convert dictionary key to value as late as possible), which
  * can improve the aggregation performance and reduce memory usage
@@ -92,10 +86,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
                                    relation: LogicalRelation,
                                    output: Seq[Attribute],
                                    rdd: RDD[Row],
-                                   needoDecode: ArrayBuffer[AttributeReference]):
+                                   needDecode: ArrayBuffer[AttributeReference]):
   RDD[InternalRow] = {
-    val newRdd = if (needoDecode.size > 0) {
-      getDecoderRDD(relation, needoDecode, rdd, output)
+    val newRdd = if (needDecode.size > 0) {
+      getDecoderRDD(relation, needDecode, rdd, output)
     } else {
       rdd
     }
@@ -310,74 +304,89 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
    */
   protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
     predicate match {
-      case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
-        Some(sources.EqualTo(a.name, convertToScala(v, t)))
-      case expressions.EqualTo(Literal(v, t), a: Attribute) =>
-        Some(sources.EqualTo(a.name, convertToScala(v, t)))
-
-      case expressions.EqualNullSafe(a: Attribute, Literal(v, t)) =>
-        Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
-      case expressions.EqualNullSafe(Literal(v, t), a: Attribute) =>
-        Some(sources.EqualNullSafe(a.name, convertToScala(v, t)))
-
-      case expressions.GreaterThan(a: Attribute, Literal(v, t)) =>
-        Some(sources.GreaterThan(a.name, convertToScala(v, t)))
-      case expressions.GreaterThan(Literal(v, t), a: Attribute) =>
-        Some(sources.LessThan(a.name, convertToScala(v, t)))
-
-      case expressions.LessThan(a: Attribute, Literal(v, t)) =>
-        Some(sources.LessThan(a.name, convertToScala(v, t)))
-      case expressions.LessThan(Literal(v, t), a: Attribute) =>
-        Some(sources.GreaterThan(a.name, convertToScala(v, t)))
-
-      case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
-        Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
-      case expressions.GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
-        Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
-
-      case expressions.LessThanOrEqual(a: Attribute, Literal(v, t)) =>
-        Some(sources.LessThanOrEqual(a.name, convertToScala(v, t)))
-      case expressions.LessThanOrEqual(Literal(v, t), a: Attribute) =>
-        Some(sources.GreaterThanOrEqual(a.name, convertToScala(v, t)))
-
-      case expressions.InSet(a: Attribute, set) =>
-        val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
-        Some(sources.In(a.name, set.toArray.map(toScala)))
-
-      // Because we only convert In to InSet in Optimizer when there are more than certain
-      // items. So it is possible we still get an In expression here that needs to be pushed
-      // down.
-      case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
-        val hSet = list.map(e => e.eval(EmptyRow))
-        val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
-        Some(sources.In(a.name, hSet.toArray.map(toScala)))
+      case or@ Or(left, right) =>
 
-      case expressions.IsNull(a: Attribute) =>
-        Some(sources.IsNull(a.name))
-      case expressions.IsNotNull(a: Attribute) =>
-        Some(sources.IsNotNull(a.name))
+        val leftFilter = translateFilter(left)
+        val rightFilter = translateFilter(right)
+        if (leftFilter.isDefined && rightFilter.isDefined) {
+          Some( sources.Or(leftFilter.get, rightFilter.get))
+        } else {
+          None
+        }
 
-      case expressions.And(left, right) =>
+      case And(left, right) =>
         (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
 
-      case expressions.Or(left, right) =>
-        for {
-          leftFilter <- translateFilter(left)
-          rightFilter <- translateFilter(right)
-        } yield sources.Or(leftFilter, rightFilter)
-
-      case expressions.Not(child) =>
-        translateFilter(child).map(sources.Not)
-
-      case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
-        Some(sources.StringStartsWith(a.name, v.toString))
-
-      case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
-        Some(sources.StringEndsWith(a.name, v.toString))
-
-      case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
-        Some(sources.StringContains(a.name, v.toString))
-      case _ => None
+      case EqualTo(a: Attribute, Literal(v, t)) =>
+        Some(sources.EqualTo(a.name, v))
+      case EqualTo(l@Literal(v, t), a: Attribute) =>
+        Some(sources.EqualTo(a.name, v))
+      case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+        Some(sources.EqualTo(a.name, v))
+      case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+        Some(sources.EqualTo(a.name, v))
+
+      case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+      case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+      case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+      case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+          Some(sources.Not(sources.EqualTo(a.name, v)))
+      case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+      case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+      case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        Some(sources.Not(sources.In(a.name, hSet.toArray)))
+      case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        Some(sources.In(a.name, hSet.toArray))
+      case Not(In(Cast(a: Attribute, _), list))
+        if !list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        Some(sources.Not(sources.In(a.name, hSet.toArray)))
+      case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+        val hSet = list.map(e => e.eval(EmptyRow))
+        Some(sources.In(a.name, hSet.toArray))
+
+      case GreaterThan(a: Attribute, Literal(v, t)) =>
+        Some(sources.GreaterThan(a.name, v))
+      case GreaterThan(Literal(v, t), a: Attribute) =>
+        Some(sources.LessThan(a.name, v))
+      case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+        Some(sources.GreaterThan(a.name, v))
+      case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+        Some(sources.LessThan(a.name, v))
+
+      case LessThan(a: Attribute, Literal(v, t)) =>
+        Some(sources.LessThan(a.name, v))
+      case LessThan(Literal(v, t), a: Attribute) =>
+        Some(sources.GreaterThan(a.name, v))
+      case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+        Some(sources.LessThan(a.name, v))
+      case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+        Some(sources.GreaterThan(a.name, v))
+
+      case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+        Some(sources.GreaterThanOrEqual(a.name, v))
+      case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+        Some(sources.LessThanOrEqual(a.name, v))
+      case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+        Some(sources.GreaterThanOrEqual(a.name, v))
+      case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+        Some(sources.LessThanOrEqual(a.name, v))
+
+      case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+        Some(sources.LessThanOrEqual(a.name, v))
+      case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+        Some(sources.GreaterThanOrEqual(a.name, v))
+      case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+        Some(sources.LessThanOrEqual(a.name, v))
+      case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+        Some(sources.GreaterThanOrEqual(a.name, v))
+
+      case others => None
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 6b6960d..fb9df70 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -162,7 +162,38 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           } else {
             Sort(sort.order, sort.global, child)
           }
-
+        case union: Union
+          if !(union.children(0).isInstanceOf[CarbonDictionaryTempDecoder] ||
+            union.children(1).isInstanceOf[CarbonDictionaryTempDecoder]) =>
+          val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+          val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+          union.children(0).output.foreach(attr =>
+            leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+          union.children(1).output.foreach(attr =>
+            rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+          var leftPlan = union.children(0)
+          var rightPlan = union.children(1)
+          if (hasCarbonRelation(leftPlan) && leftCondAttrs.size() > 0 &&
+            !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+            leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
+              new util.HashSet[AttributeReferenceWrapper](),
+              union.children(0))
+          }
+          if (hasCarbonRelation(rightPlan) && rightCondAttrs.size() > 0 &&
+            !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
+            rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
+              new util.HashSet[AttributeReferenceWrapper](),
+              union.children(1))
+          }
+          if (!decoder) {
+            decoder = true
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
+              Union(leftPlan, rightPlan),
+              isOuter = true)
+          } else {
+            Union(leftPlan, rightPlan)
+          }
         case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
           val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
           agg.aggregateExpressions.map {
@@ -456,39 +487,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
       case cd: CarbonDictionaryCatalystDecoder =>
         cd
       case sort: Sort =>
+        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+        if (sort.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+          val tempDecoder = sort.child.asInstanceOf[CarbonDictionaryTempDecoder]
+          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+        }
         val sortExprs = sort.order.map { s =>
           s.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }.asInstanceOf[SortOrder]
         }
         Sort(sortExprs, sort.global, sort.child)
       case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryCatalystDecoder] =>
+        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+        if (agg.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+          val tempDecoder = agg.child.asInstanceOf[CarbonDictionaryTempDecoder]
+          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+        }
+
         val aggExps = agg.aggregateExpressions.map { aggExp =>
           aggExp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }.asInstanceOf[Seq[NamedExpression]]
 
         val grpExps = agg.groupingExpressions.map { gexp =>
           gexp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }
         Aggregate(grpExps, aggExps, agg.child)
       case expand: Expand =>
+        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+        if (expand.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+          val tempDecoder = expand.child.asInstanceOf[CarbonDictionaryTempDecoder]
+          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+        }
         expand.transformExpressions {
           case attr: AttributeReference =>
-            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+            if(tempAttr.isDefined) {
+              tempAttr.get
+            } else {
+              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+            }
         }
       case filter: Filter =>
-        val filterExps = filter.condition transform {
-          case attr: AttributeReference =>
-            updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
-        }
-        Filter(filterExps, filter.child)
+        filter
       case j: Join =>
         marker.pushBinaryMarker(allAttrsNotDecode)
         j
@@ -496,36 +559,71 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
         marker.pushBinaryMarker(allAttrsNotDecode)
         u
       case p: Project if relations.nonEmpty =>
+        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+        if (p.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+          val tempDecoder = p.child.asInstanceOf[CarbonDictionaryTempDecoder]
+          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+        }
         val prExps = p.projectList.map { prExp =>
           prExp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }.asInstanceOf[Seq[NamedExpression]]
         Project(prExps, p.child)
       case wd: Window if relations.nonEmpty =>
+        val tmpAttrMap = new mutable.HashMap[AttributeReferenceWrapper, Attribute]()
+        if (wd.child.isInstanceOf[CarbonDictionaryTempDecoder]) {
+          val tempDecoder = wd.child.asInstanceOf[CarbonDictionaryTempDecoder]
+          tempDecoder.attrList.asScala.foreach{attr => tmpAttrMap.put(attr, attr.attr)}
+        }
         val prExps = wd.output.map { prExp =>
           prExp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }.asInstanceOf[Seq[Attribute]]
         val wdExps = wd.windowExpressions.map { gexp =>
           gexp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }.asInstanceOf[Seq[NamedExpression]]
         val partitionSpec = wd.partitionSpec.map{ exp =>
           exp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }
         val orderSpec = wd.orderSpec.map { exp =>
           exp.transform {
             case attr: AttributeReference =>
-              updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              val tempAttr = tmpAttrMap.get(AttributeReferenceWrapper(attr))
+              if(tempAttr.isDefined) {
+                tempAttr.get
+              } else {
+                updateDataType(attr, attrMap, allAttrsNotDecode, aliasMap)
+              }
           }
         }.asInstanceOf[Seq[SortOrder]]
         Window(wdExps, partitionSpec, orderSpec, wd.child)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a9553e6b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
index aaa0a20..13c0257 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/CarbonDataSourceSuite.scala
@@ -26,7 +26,7 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
   override def beforeAll(): Unit = {
     spark = SparkSession
       .builder()
-      .master("local[4]")
+      .master("local")
       .appName("CarbonExample")
       .enableHiveSupport()
       .config(CarbonCommonConstants.STORE_LOCATION,
@@ -48,10 +48,23 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
          |    bigintField long,
          |    doubleField double,
          |    stringField string,
-         |    decimalField decimal(13, 0)
-         | )
+         |    decimalField decimal(13, 0),
+         |    timestampField string)
          | USING org.apache.spark.sql.CarbonSource
        """.stripMargin)
+
+    spark.sql(
+      s"""
+         | CREATE TABLE csv_table
+         | (  shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string,
+         |    decimalField decimal(13, 0),
+         |    timestampField string)
+         | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+       """.stripMargin)
   }
 
   override def afterAll(): Unit = {
@@ -64,10 +77,17 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
     spark.sql("select * from carbon_testtable").collect()
   }
 
-
   test("agg") {
     spark.sql("select stringField, sum(intField) , sum(decimalField) " +
       "from carbon_testtable group by stringField").collect()
+
+    spark.sql(
+      s"""
+         | INSERT INTO TABLE carbon_testtable
+         | SELECT shortField, intField, bigintField, doubleField, stringField,
+         | decimalField, from_unixtime(unix_timestamp(timestampField,'yyyy/M/dd')) timestampField
+         | FROM csv_table
+       """.stripMargin)
   }
 
 }


[2/2] incubator-carbondata git commit: [CARBONDATA-497][SPARK2]fix datatype issue of CarbonLateDecoderRule This closes #403

Posted by ja...@apache.org.
[CARBONDATA-497][SPARK2]fix datatype issue of CarbonLateDecoderRule This closes #403


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/47658b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/47658b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/47658b17

Branch: refs/heads/master
Commit: 47658b17d9dfd9bb150f8e3ea38bcb1c26dd89e4
Parents: ac45755 a9553e6
Author: jackylk <ja...@huawei.com>
Authored: Wed Dec 7 15:22:15 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed Dec 7 15:22:15 2016 +0800

----------------------------------------------------------------------
 examples/spark2/src/main/resources/data.csv     |  20 +--
 .../carbondata/examples/CarbonExample.scala     |  10 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   1 -
 .../execution/CarbonLateDecodeStrategy.scala    | 157 ++++++++++---------
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 128 +++++++++++++--
 .../carbondata/CarbonDataSourceSuite.scala      |  28 +++-
 6 files changed, 236 insertions(+), 108 deletions(-)
----------------------------------------------------------------------