You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/08/08 06:46:18 UTC
[1/2] incubator-carbondata git commit: Fixed aggregate queries with
using subqueries
Repository: incubator-carbondata
Updated Branches:
refs/heads/master ffd911fb3 -> ea3169e8b
Fixed aggregate queries with using subqueries
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/685c2780
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/685c2780
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/685c2780
Branch: refs/heads/master
Commit: 685c278014399bde78df55a068e74b7a61df544c
Parents: ffd911f
Author: ravipesala <ra...@gmail.com>
Authored: Sun Aug 7 17:27:25 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Aug 7 17:27:25 2016 +0530
----------------------------------------------------------------------
.../spark/sql/CarbonDictionaryDecoder.scala | 5 +-
.../CarbonDecoderOptimizerHelper.scala | 40 ++++++-
.../spark/sql/optimizer/CarbonOptimizer.scala | 116 ++++++++++---------
.../org/carbondata/spark/CarbonFilters.scala | 9 +-
.../AllDataTypesTestCaseAggregate.scala | 6 +
5 files changed, 106 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index c47e4f7..3faf0c7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -121,14 +121,13 @@ case class CarbonDictionaryDecoder(
val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
val attr = aliasMap.getOrElse(a, a)
val relation = relations.find(p => p.contains(attr))
- if(relation.isDefined) {
+ if(relation.isDefined && canBeDecoded(attr)) {
val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if (carbonDimension != null &&
carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- canBeDecoded(attr)) {
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
(carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
carbonDimension.getDataType)
} else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index 3784f84..6be8369 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -33,12 +33,24 @@ case class BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[Abst
extends AbstractNode
case class CarbonDictionaryTempDecoder(
- attrList: util.Set[Attribute],
- attrsNotDecode: util.Set[Attribute],
+ attrList: util.Set[AttributeReferenceWrapper],
+ attrsNotDecode: util.Set[AttributeReferenceWrapper],
child: LogicalPlan,
isOuter: Boolean = false) extends UnaryNode {
var processed = false
+ def getAttrsNotDecode: util.Set[Attribute] = {
+ val set = new util.HashSet[Attribute]()
+ attrsNotDecode.asScala.foreach(f => set.add(f.attr))
+ set
+ }
+
+ def getAttrList: util.Set[Attribute] = {
+ val set = new util.HashSet[Attribute]()
+ attrList.asScala.foreach(f => set.add(f.attr))
+ set
+ }
+
override def output: Seq[Attribute] = child.output
}
@@ -68,20 +80,20 @@ class CarbonDecoderProcessor {
def updateDecoders(nodeList: util.List[AbstractNode]): Unit = {
val scalaList = nodeList.asScala
- val decoderNotDecode = new util.HashSet[Attribute]
+ val decoderNotDecode = new util.HashSet[AttributeReferenceWrapper]
updateDecoderInternal(scalaList, decoderNotDecode)
}
private def updateDecoderInternal(scalaList: mutable.Buffer[AbstractNode],
- decoderNotDecode: util.HashSet[Attribute]): Unit = {
+ decoderNotDecode: util.HashSet[AttributeReferenceWrapper]): Unit = {
scalaList.reverseMap {
case Node(cd: CarbonDictionaryTempDecoder) =>
decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
decoderNotDecode.asScala.foreach(cd.attrList.remove)
decoderNotDecode.addAll(cd.attrList)
case BinaryCarbonNode(left: util.List[AbstractNode], right: util.List[AbstractNode]) =>
- val leftNotDecode = new util.HashSet[Attribute]
- val rightNotDecode = new util.HashSet[Attribute]
+ val leftNotDecode = new util.HashSet[AttributeReferenceWrapper]
+ val rightNotDecode = new util.HashSet[AttributeReferenceWrapper]
updateDecoderInternal(left.asScala, leftNotDecode)
updateDecoderInternal(right.asScala, rightNotDecode)
decoderNotDecode.addAll(leftNotDecode)
@@ -91,6 +103,22 @@ class CarbonDecoderProcessor {
}
+case class AttributeReferenceWrapper(attr: Attribute) {
+
+ override def equals(other: Any): Boolean = other match {
+ case ar: AttributeReferenceWrapper =>
+ attr.name == ar.attr.name && attr.exprId == ar.attr.exprId
+ case _ => false
+ }
+ override def hashCode: Int = {
+ var h = 17
+ h = h * 37 + attr.exprId.hashCode()
+ h
+ }
+
+
+}
+
case class Marker(set: util.Set[Attribute], binary: Boolean = false)
class CarbonPlanMarker {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index 2d20b73..d75eeda 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -96,23 +96,23 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
decoder = true
cd
case sort: Sort if !sort.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnSort = new util.HashSet[Attribute]()
+ val attrsOnSort = new util.HashSet[AttributeReferenceWrapper]()
sort.order.map { s =>
s.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnSort.add(aliasMap.getOrElse(attr, attr))
+ attrsOnSort.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
var child = sort.child
if (attrsOnSort.size() > 0 && !child.isInstanceOf[Sort]) {
child = CarbonDictionaryTempDecoder(attrsOnSort,
- new util.HashSet[Attribute](), sort.child)
+ new util.HashSet[AttributeReferenceWrapper](), sort.child)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Sort(sort.order, sort.global, child),
isOuter = true)
} else {
@@ -122,28 +122,30 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case union: Union
if !(union.left.isInstanceOf[CarbonDictionaryTempDecoder] ||
union.right.isInstanceOf[CarbonDictionaryTempDecoder]) =>
- val leftCondAttrs = new util.HashSet[Attribute]
- val rightCondAttrs = new util.HashSet[Attribute]
- union.left.output.foreach(attr => leftCondAttrs.add(aliasMap.getOrElse(attr, attr)))
- union.right.output.foreach(attr => rightCondAttrs.add(aliasMap.getOrElse(attr, attr)))
+ val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+ val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+ union.left.output.foreach(attr =>
+ leftCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
+ union.right.output.foreach(attr =>
+ rightCondAttrs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))))
var leftPlan = union.left
var rightPlan = union.right
if (leftCondAttrs.size() > 0 &&
!leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
union.left)
}
if (rightCondAttrs.size() > 0 &&
!rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
union.right)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Union(leftPlan, rightPlan),
isOuter = true)
} else {
@@ -151,7 +153,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
}
case agg: Aggregate if !agg.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOndimAggs = new util.HashSet[Attribute]
+ val attrsOndimAggs = new util.HashSet[AttributeReferenceWrapper]
agg.aggregateExpressions.map {
case attr: AttributeReference =>
case a@Alias(attr: AttributeReference, name) =>
@@ -165,27 +167,27 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+ attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
var child = agg.child
// Incase if the child also aggregate then push down decoder to child
if (attrsOndimAggs.size() > 0 && !child.equals(agg)) {
child = CarbonDictionaryTempDecoder(attrsOndimAggs,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
agg.child)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child),
isOuter = true)
} else {
Aggregate(agg.groupingExpressions, agg.aggregateExpressions, child)
}
case expand: Expand if !expand.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnExpand = new util.HashSet[Attribute]
+ val attrsOnExpand = new util.HashSet[AttributeReferenceWrapper]
expand.projections.map {s =>
s.map {
case attr: AttributeReference =>
@@ -194,41 +196,41 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnExpand.add(aliasMap.getOrElse(attr, attr))
+ attrsOnExpand.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
}
var child = expand.child
if (attrsOnExpand.size() > 0 && !child.isInstanceOf[Expand]) {
child = CarbonDictionaryTempDecoder(attrsOnExpand,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
expand.child)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child),
isOuter = true)
} else {
CodeGenerateFactory.getInstance().expandFactory.createExpand(expand, child)
}
case filter: Filter if !filter.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnConds = new util.HashSet[Attribute]
+ val attrsOnConds = new util.HashSet[AttributeReferenceWrapper]
CarbonFilters
.selectFilters(splitConjunctivePredicates(filter.condition), attrsOnConds, aliasMap)
var child = filter.child
if (attrsOnConds.size() > 0 && !child.isInstanceOf[Filter]) {
child = CarbonDictionaryTempDecoder(attrsOnConds,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
filter.child)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Filter(filter.condition, child),
isOuter = true)
} else {
@@ -249,16 +251,16 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case _ =>
}
- val leftCondAttrs = new util.HashSet[Attribute]
- val rightCondAttrs = new util.HashSet[Attribute]
+ val leftCondAttrs = new util.HashSet[AttributeReferenceWrapper]
+ val rightCondAttrs = new util.HashSet[AttributeReferenceWrapper]
if (attrsOnJoin.size() > 0) {
attrsOnJoin.asScala.map { attr =>
if (qualifierPresence(j.left, attr)) {
- leftCondAttrs.add(attr)
+ leftCondAttrs.add(AttributeReferenceWrapper(attr))
}
if (qualifierPresence(j.right, attr)) {
- rightCondAttrs.add(attr)
+ rightCondAttrs.add(AttributeReferenceWrapper(attr))
}
}
var leftPlan = j.left
@@ -266,19 +268,19 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
if (leftCondAttrs.size() > 0 &&
!leftPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
leftPlan = CarbonDictionaryTempDecoder(leftCondAttrs,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
j.left)
}
if (rightCondAttrs.size() > 0 &&
!rightPlan.isInstanceOf[CarbonDictionaryCatalystDecoder]) {
rightPlan = CarbonDictionaryTempDecoder(rightCondAttrs,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
j.right)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Join(leftPlan, rightPlan, j.joinType, j.condition),
isOuter = true)
} else {
@@ -290,7 +292,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case p: Project
if relations.nonEmpty && !p.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnProjects = new util.HashSet[Attribute]
+ val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
p.projectList.map {
case attr: AttributeReference =>
case a@Alias(attr: AttributeReference, name) =>
@@ -298,19 +300,19 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
var child = p.child
if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
child = CarbonDictionaryTempDecoder(attrsOnProjects,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
p.child)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Project(p.projectList, child),
isOuter = true)
} else {
@@ -318,14 +320,14 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
}
case wd: Window if !wd.child.isInstanceOf[CarbonDictionaryTempDecoder] =>
- val attrsOnProjects = new util.HashSet[Attribute]
+ val attrsOnProjects = new util.HashSet[AttributeReferenceWrapper]
wd.projectList.map {
case attr: AttributeReference =>
case others =>
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
wd.windowExpressions.map {
@@ -333,7 +335,7 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
wd.partitionSpec.map{
@@ -342,33 +344,33 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
others.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
wd.orderSpec.map { s =>
s.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
wd.partitionSpec.map { s =>
s.collect {
case attr: AttributeReference
if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOnProjects.add(aliasMap.getOrElse(attr, attr))
+ attrsOnProjects.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
var child = wd.child
if (attrsOnProjects.size() > 0 && !child.isInstanceOf[Project]) {
child = CarbonDictionaryTempDecoder(attrsOnProjects,
- new util.HashSet[Attribute](),
+ new util.HashSet[AttributeReferenceWrapper](),
wd.child)
}
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](),
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](),
Window(wd.projectList, wd.windowExpressions, wd.partitionSpec, wd.orderSpec, child),
isOuter = true)
} else {
@@ -378,8 +380,8 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceRelation] =>
if (!decoder) {
decoder = true
- CarbonDictionaryTempDecoder(new util.HashSet[Attribute](),
- new util.HashSet[Attribute](), l, isOuter = true)
+ CarbonDictionaryTempDecoder(new util.HashSet[AttributeReferenceWrapper](),
+ new util.HashSet[AttributeReferenceWrapper](), l, isOuter = true)
} else {
l
}
@@ -399,17 +401,17 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
plan transformDown {
case cd: CarbonDictionaryTempDecoder if !cd.processed =>
cd.processed = true
- allAttrsNotDecode = cd.attrsNotDecode
- marker.pushMarker(cd.attrsNotDecode)
+ allAttrsNotDecode = cd.getAttrsNotDecode
+ marker.pushMarker(cd.getAttrsNotDecode)
if (cd.isOuter) {
CarbonDictionaryCatalystDecoder(relations,
- ExcludeProfile(cd.attrsNotDecode.asScala.toSeq),
+ ExcludeProfile(cd.getAttrsNotDecode.asScala.toSeq),
aliasMap,
isOuter = true,
cd.child)
} else {
CarbonDictionaryCatalystDecoder(relations,
- IncludeProfile(cd.attrList.asScala.toSeq),
+ IncludeProfile(cd.getAttrList.asScala.toSeq),
aliasMap,
isOuter = false,
cd.child)
@@ -531,11 +533,11 @@ class ResolveCarbonFunctions(relations: Seq[CarbonDecoderRelation])
// Collect aggregates on dimensions so that we can add decoder to it.
private def collectDimensionAggregates(aggExp: AggregateExpression,
- attrsOndimAggs: util.HashSet[Attribute],
+ attrsOndimAggs: util.HashSet[AttributeReferenceWrapper],
aliasMap: CarbonAliasDecoderRelation) {
aggExp collect {
case attr: AttributeReference if isDictionaryEncoded(attr, relations, aliasMap) =>
- attrsOndimAggs.add(aliasMap.getOrElse(attr, attr))
+ attrsOndimAggs.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
index 689b88a..230bf31 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.optimizer.CarbonAliasDecoderRelation
+import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper, CarbonAliasDecoderRelation}
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType
@@ -114,7 +114,7 @@ object CarbonFilters {
// Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
// Mostly dimension filters are only pushed down since it is faster in carbon.
def selectFilters(filters: Seq[Expression],
- attrList: java.util.HashSet[Attribute],
+ attrList: java.util.HashSet[AttributeReferenceWrapper],
aliasMap: CarbonAliasDecoderRelation): Unit = {
def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
expr match {
@@ -126,7 +126,8 @@ object CarbonFilters {
Some( sources.Or(leftFilter.get, rightFilter.get))
} else {
or.collect {
- case attr: AttributeReference => attrList.add(aliasMap.getOrElse(attr, attr))
+ case attr: AttributeReference =>
+ attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
None
}
@@ -206,7 +207,7 @@ object CarbonFilters {
if (!or) {
others.collect {
case attr: AttributeReference =>
- attrList.add(aliasMap.getOrElse(attr, attr))
+ attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
}
}
None
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/685c2780/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index 95f36bb..7b9a910 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@ -1102,4 +1102,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists carbonunion")
})
+ test("select Min(imei) from (select imei from Carbon_automation_test order by imei) t")({
+ checkAnswer(
+ sql("select Min(imei) from (select imei from Carbon_automation_test order by imei) t"),
+ sql("select Min(imei) from (select imei from Carbon_automation_hive order by imei) t"))
+ })
+
}
\ No newline at end of file
[2/2] incubator-carbondata git commit: Fixed aggregate queries with
using subqueries in Spark 1.6.2 This closes #65
Posted by ch...@apache.org.
Fixed aggregate queries with using subqueries in Spark 1.6.2 This closes #65
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ea3169e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ea3169e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ea3169e8
Branch: refs/heads/master
Commit: ea3169e8b8fc84b9df499022a08d48dcb99234d5
Parents: ffd911f 685c278
Author: chenliang613 <ch...@apache.org>
Authored: Mon Aug 8 14:45:57 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Mon Aug 8 14:45:57 2016 +0800
----------------------------------------------------------------------
.../spark/sql/CarbonDictionaryDecoder.scala | 5 +-
.../CarbonDecoderOptimizerHelper.scala | 40 ++++++-
.../spark/sql/optimizer/CarbonOptimizer.scala | 116 ++++++++++---------
.../org/carbondata/spark/CarbonFilters.scala | 9 +-
.../AllDataTypesTestCaseAggregate.scala | 6 +
5 files changed, 106 insertions(+), 70 deletions(-)
----------------------------------------------------------------------