You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/08 06:46:18 UTC

[1/2] incubator-carbondata git commit: Fixed aggregate queries with using subqueries

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master ffd911fb3 -> ea3169e8b


Fixed aggregate queries with using subqueries


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/685c2780
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/685c2780
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/685c2780

Branch: refs/heads/master
Commit: 685c278014399bde78df55a068e74b7a61df544c
Parents: ffd911f
Author: ravipesala <ra...@gmail.com>
Authored: Sun Aug 7 17:27:25 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Aug 7 17:27:25 2016 +0530

----------------------------------------------------------------------
 .../spark/sql/CarbonDictionaryDecoder.scala     |   5 +-
 .../CarbonDecoderOptimizerHelper.scala          |  40 ++++++-
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 116 ++++++++++---------
 .../org/carbondata/spark/CarbonFilters.scala    |   9 +-
 .../AllDataTypesTestCaseAggregate.scala         |   6 +
 5 files changed, 106 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c47e4f7..3faf0c7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -121,14 +121,13 @@ case class CarbonDictionaryDecoder(
     val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
       val attr = aliasMap.getOrElse(a, a)
       val relation = relations.find(p => p.contains(attr))
-      if(relation.isDefined) {
+      if(relation.isDefined && canBeDecoded(attr)) {
         val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
         val carbonDimension =
           carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null &&
             carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-            canBeDecoded(attr)) {
+            !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
           (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
               carbonDimension.getDataType)
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 3784f84..6be8369 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -33,12 +33,24 @@ case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[Abst
   extends AbstractNode
 
 case class CarbonDictionaryTempDecoder(
-    attrList: util.Set[Attribute],
-    attrsNotDecode: util.Set[Attribute],
+    attrList: util.Set[AttributeReferenceWrapper],
+    attrsNotDecode: util.Set[AttributeReferenceWrapper],
     child: LogicalPlan,
     isOuter: Boolean = false) extends UnaryNode {
   var processed = false
 
+  def getAttrsNotDecode: util.Set[Attribute] = {
+    val set = new util.HashSet[Attribute]()
+    attrsNotDecode.asScala.foreach(f => set.add(f.attr))
+    set
+  }
+
+  def getAttrList: util.Set[Attribute] = {
+    val set = new util.HashSet[Attribute]()
+    attrList.asScala.foreach(f => set.add(f.attr))
+    set
+  }
+
   override def output: Seq[Attribute] = child.output
 }
 
@@ -68,20 +80,20 @@ class CarbonDecoderProcessor {
 
   def updateDecoders(nodeList: util.List[AbstractNode]): Unit = {
     val scalaList = nodeList.asScala
-    val decoderNotDecode = new util.HashSet[Attribute]
+    val decoderNotDecode = new util.HashSet[AttributeReferenceWrapper]
     updateDecoderInternal(scalaList, decoderNotDecode)
   }
 
   private def updateDecoderInternal(scalaList: mutable.Buffer[AbstractNode],
-      decoderNotDecode: util.HashSet[Attribute]): Unit = {
+      decoderNotDecode: util.HashSet[AttributeReferenceWrapper]): Unit = {
     scalaList.reverseMap {
       case Node(cd: CarbonDictionaryTempDecoder) =>
         decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
         decoderNotDecode.asScala.foreach(cd.attrList.remove)
         decoderNotDecode.addAll(cd.attrList)
       case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
-        val leftNotDecode = new util.HashSet[Attribute]
-        val rightNotDecode = new util.HashSet[Attribute]
+        val leftNotDecode = new util.HashSet[AttributeReferenceWrapper]
+        val rightNotDecode = new util.HashSet[AttributeReferenceWrapper]
         updateDecoderInternal(left.asScala, leftNotDecode)
         updateDecoderInternal(right.asScala, rightNotDecode)
         decoderNotDecode.addAll(leftNotDecode)
@@ -91,6 +103,22 @@ class CarbonDecoderProcessor {
 
 }
 
+case class AttributeReferenceWrapper(attr: Attribute) {
+
+  override def equals(other: Any): Boolean = other match {
+    case ar: AttributeReferenceWrapper =>
+      attr.name == ar.attr.name && attr.exprId == ar.attr.exprId
+    case _ => false
+  }
+  override def hashCode: Int = {
+    var h = 17
+    h = h * 37 + attr.exprId.hashCode()
+    h
+  }
+
+
+}
+
 case class Marker(set: util.Set[Attribute], binary: Boolean = false)
 
 class CarbonPlanMarker {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 2d20b73..d75eeda 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -96,23 +96,23 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           decoder = true
           cd
         case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnSort = new util.HashSet[Attribute]()
+          val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
           sort.order.map { s =>
             s.collect {
               case attr: AttributeReference
                 if isDictionaryEncoded(attr, relations, aliasMap) =>
-                attrsOnSort.add(aliasMap.getOrElse(attr, attr))
+                attrsOnSort.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
             }
           }
           var child = sort.child
           if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
             child = CarbonDictionaryTempDecoder(attrsOnSort,
-              new util.HashSet[Attribute](), sort.child)
+              new util.HashSet[AttributeReferenceWrapper](), sort.child)
           }
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               Sort(sort.order, sort.global, child),
               isOuter = true)
           } else {
@@ -122,28 +122,30 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
         case union: Union
           if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
                union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
-          val leftCondAttrs = new util.HashSet[Attribute]
-          val rightCondAttrs = new util.HashSet[Attribute]
-          union.left.output.foreach(attr => leftCondAttrs.add(aliasMap.getOrElse(attr, attr)))
-          union.right.output.foreach(attr => rightCondAttrs.add(aliasMap.getOrElse(attr, attr)))
+          val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+          val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+          union.left.output.foreach(attr =>
+            leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+          union.right.output.foreach(attr =>
+            rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
           var leftPlan = union.left
           var rightPlan = union.right
           if (leftCondAttrs.size() > 0 &&
               !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
             leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               union.left)
           }
           if (rightCondAttrs.size() > 0 &&
               !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
             rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               union.right)
           }
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               Union(leftPlan, rightPlan),
               isOuter = true)
           } else {
@@ -151,7 +153,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           }
 
         case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOndimAggs = new util.HashSet[Attribute]
+          val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
           agg.aggregateExpressions.map {
             case attr: AttributeReference =>
             case a@Alias(attr: AttributeReference, name) =>
@@ -165,27 +167,27 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
               others.collect {
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
-                  attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+                  attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
               }
           }
           var child = agg.child
           // Incase if the child also aggregate then push down decoder to child
           if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
             child = CarbonDictionaryTempDecoder(attrsOndimAggs,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               agg.child)
           }
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
               isOuter = true)
           } else {
             Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
           }
         case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnExpand = new util.HashSet[Attribute]
+          val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
           expand.projections.map {s =>
             s.map {
               case attr: AttributeReference =>
@@ -194,41 +196,41 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
                 others.collect {
                   case attr: AttributeReference
                     if isDictionaryEncoded(attr, relations, aliasMap) =>
-                    attrsOnExpand.add(aliasMap.getOrElse(attr, attr))
+                    attrsOnExpand.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
                 }
             }
           }
           var child = expand.child
           if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) {
             child = CarbonDictionaryTempDecoder(attrsOnExpand,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               expand.child)
           }
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child),
               isOuter = true)
           } else {
             CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child)
           }
         case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnConds = new util.HashSet[Attribute]
+          val attrsOnConds = new util.HashSet[AttributeReferenceWrapper]
           CarbonFilters
             .selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
 
           var child = filter.child
           if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
             child = CarbonDictionaryTempDecoder(attrsOnConds,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               filter.child)
           }
 
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               Filter(filter.condition, child),
               isOuter = true)
           } else {
@@ -249,16 +251,16 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
             case _ =>
           }
 
-          val leftCondAttrs = new util.HashSet[Attribute]
-          val rightCondAttrs = new util.HashSet[Attribute]
+          val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+          val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
           if (attrsOnJoin.size() > 0) {
 
             attrsOnJoin.asScala.map { attr =>
               if (qualifierPresence(j.left, attr)) {
-                leftCondAttrs.add(attr)
+                leftCondAttrs.add(AttributeReferenceWrapper(attr))
               }
               if (qualifierPresence(j.right, attr)) {
-                rightCondAttrs.add(attr)
+                rightCondAttrs.add(AttributeReferenceWrapper(attr))
               }
             }
             var leftPlan = j.left
@@ -266,19 +268,19 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
             if (leftCondAttrs.size() > 0 &&
                 !leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
               leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
-                new util.HashSet[Attribute](),
+                new util.HashSet[AttributeReferenceWrapper](),
                 j.left)
             }
             if (rightCondAttrs.size() > 0 &&
                 !rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
               rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
-                new util.HashSet[Attribute](),
+                new util.HashSet[AttributeReferenceWrapper](),
                 j.right)
             }
             if (!decoder) {
               decoder = true
-              CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-                new util.HashSet[Attribute](),
+              CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+                new util.HashSet[AttributeReferenceWrapper](),
                 Join(leftPlan, rightPlan, j.joinType, j.condition),
                 isOuter = true)
             } else {
@@ -290,7 +292,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
 
         case p: Project
           if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnProjects = new util.HashSet[Attribute]
+          val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
           p.projectList.map {
             case attr: AttributeReference =>
             case a@Alias(attr: AttributeReference, name) =>
@@ -298,19 +300,19 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
               others.collect {
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
-                  attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
               }
           }
           var child = p.child
           if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
             child = CarbonDictionaryTempDecoder(attrsOnProjects,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               p.child)
           }
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               Project(p.projectList, child),
               isOuter = true)
           } else {
@@ -318,14 +320,14 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
           }
 
         case wd: Window if !wd.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
-          val attrsOnProjects = new util.HashSet[Attribute]
+          val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
           wd.projectList.map {
             case attr: AttributeReference =>
             case others =>
               others.collect {
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
-                  attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
               }
           }
           wd.windowExpressions.map {
@@ -333,7 +335,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
               others.collect {
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
-                  attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
               }
           }
           wd.partitionSpec.map{
@@ -342,33 +344,33 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
               others.collect {
                 case attr: AttributeReference
                   if isDictionaryEncoded(attr, relations, aliasMap) =>
-                  attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+                  attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
               }
           }
           wd.orderSpec.map { s =>
             s.collect {
               case attr: AttributeReference
                 if isDictionaryEncoded(attr, relations, aliasMap) =>
-                attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+                attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
             }
           }
           wd.partitionSpec.map { s =>
             s.collect {
               case attr: AttributeReference
                 if isDictionaryEncoded(attr, relations, aliasMap) =>
-                attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+                attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
             }
           }
           var child = wd.child
           if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
             child = CarbonDictionaryTempDecoder(attrsOnProjects,
-              new util.HashSet[Attribute](),
+              new util.HashSet[AttributeReferenceWrapper](),
               wd.child)
           }
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](),
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](),
               Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child),
               isOuter = true)
           } else {
@@ -378,8 +380,8 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
         case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
           if (!decoder) {
             decoder = true
-            CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
-              new util.HashSet[Attribute](), l, isOuter = true)
+            CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+              new util.HashSet[AttributeReferenceWrapper](), l, isOuter = true)
           } else {
             l
           }
@@ -399,17 +401,17 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
     plan transformDown {
       case cd: CarbonDictionaryTempDecoder if !cd.processed =>
         cd.processed = true
-        allAttrsNotDecode = cd.attrsNotDecode
-        marker.pushMarker(cd.attrsNotDecode)
+        allAttrsNotDecode = cd.getAttrsNotDecode
+        marker.pushMarker(cd.getAttrsNotDecode)
         if (cd.isOuter) {
           CarbonDictionaryCatalystDecoder(relations,
-            ExcludeProfile(cd.attrsNotDecode.asScala.toSeq),
+            ExcludeProfile(cd.getAttrsNotDecode.asScala.toSeq),
             aliasMap,
             isOuter = true,
             cd.child)
         } else {
           CarbonDictionaryCatalystDecoder(relations,
-            IncludeProfile(cd.attrList.asScala.toSeq),
+            IncludeProfile(cd.getAttrList.asScala.toSeq),
             aliasMap,
             isOuter = false,
             cd.child)
@@ -531,11 +533,11 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
 
   // Collect aggregates on dimensions so that we can add decoder to it.
   private def collectDimensionAggregates(aggExp: AggregateExpression,
-      attrsOndimAggs: util.HashSet[Attribute],
+      attrsOndimAggs: util.HashSet[AttributeReferenceWrapper],
       aliasMap: CarbonAliasDecoderRelation) {
     aggExp collect {
       case attr: AttributeReference if isDictionaryEncoded(attr, relations, aliasMap) =>
-        attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+        attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
index 689b88a..230bf31 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.CarbonAliasDecoderRelation
+import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper, CarbonAliasDecoderRelation}
 import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.StructType
 
@@ -114,7 +114,7 @@ object CarbonFilters {
   // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
   // Mostly dimension filters are only pushed down since it is faster in carbon.
   def selectFilters(filters: Seq[Expression],
-      attrList: java.util.HashSet[Attribute],
+      attrList: java.util.HashSet[AttributeReferenceWrapper],
       aliasMap: CarbonAliasDecoderRelation): Unit = {
     def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
       expr match {
@@ -126,7 +126,8 @@ object CarbonFilters {
             Some( sources.Or(leftFilter.get, rightFilter.get))
           } else {
             or.collect {
-              case attr: AttributeReference => attrList.add(aliasMap.getOrElse(attr, attr))
+              case attr: AttributeReference =>
+                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
             }
             None
           }
@@ -206,7 +207,7 @@ object CarbonFilters {
           if (!or) {
             others.collect {
               case attr: AttributeReference =>
-                attrList.add(aliasMap.getOrElse(attr, attr))
+                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
             }
           }
           None

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index 95f36bb..7b9a910 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -1102,4 +1102,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists carbonunion")
   })
 
+  test("select Min(imei) from (select imei from Carbon_automation_test order by imei) t")({
+    checkAnswer(
+      sql("select Min(imei) from (select imei from Carbon_automation_test order by imei) t"),
+      sql("select Min(imei) from (select imei from Carbon_automation_hive order by imei) t"))
+  })
+
 }
\ No newline at end of file


[2/2] incubator-carbondata git commit: Fixed aggregate queries with using subqueries in Spark 1.6.2 This closes #65

Posted by ch...@apache.org.
Fixed aggregate queries with using subqueries in Spark 1.6.2 This closes #65


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ea3169e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ea3169e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ea3169e8

Branch: refs/heads/master
Commit: ea3169e8b8fc84b9df499022a08d48dcb99234d5
Parents: ffd911f 685c278
Author: chenliang613 <ch...@apache.org>
Authored: Mon Aug 8 14:45:57 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Aug 8 14:45:57 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/CarbonDictionaryDecoder.scala     |   5 +-
 .../CarbonDecoderOptimizerHelper.scala          |  40 ++++++-
 .../spark/sql/optimizer/CarbonOptimizer.scala   | 116 ++++++++++---------
 .../org/carbondata/spark/CarbonFilters.scala    |   9 +-
 .../AllDataTypesTestCaseAggregate.scala         |   6 +
 5 files changed, 106 insertions(+), 70 deletions(-)
----------------------------------------------------------------------