You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:59 UTC
[11/56] [abbrv] incubator-carbondata git commit: Update SQL planning
in carbon-spark (#682)
Update SQL planning in carbon-spark (#682)
* [Issue-660] Show segments query should not fail, if table name is case insensitive (#662)
* Show segments should not fail, if table name is case insensitive
* Corrected test case
* [issue-656] fix load data when int column contains integer.min_value (#657)
* load data when int column contains min Integer
* fixed test case
* fix test bigint
* fix test bigint
* removed no used DATA_BIGINT case
* removed no used condition for unCompressMaxMin
* [issue- 664] select count(joinDate) from table_x is failing for direct dictionary column (#665)
* Supported Spark 1.6 by changing aggregation interfaces
* Fixed compile issue after rebase
* optmizing the flow with unsafe row
* Fixed bugs in push up
* Fixed compiler issues after rebasing
* Fixed merging issue after rebase
* Fixed scan query pushdown
* keep pushup strategy only
* keep only on QueryRDD
* rename QueryRDD to ScanRDD and clean up code
* fix scalastyle
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a83dba34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a83dba34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a83dba34
Branch: refs/heads/master
Commit: a83dba3433525eaf2f255912184a6e1a6d7dbdea
Parents: ead0076
Author: Jacky Li <ja...@huawei.com>
Authored: Fri Jun 17 16:02:55 2016 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Fri Jun 17 13:32:55 2016 +0530
----------------------------------------------------------------------
.../org/apache/spark/sql/CarbonContext.scala | 5 +-
.../org/apache/spark/sql/CarbonOperators.scala | 676 ++++---------------
.../apache/spark/sql/CarbonRawOperators.scala | 332 ---------
.../spark/sql/execution/joins/CarbonJoins.scala | 140 ----
.../spark/sql/hive/CarbonRawStrategies.scala | 217 ------
.../spark/sql/hive/CarbonSQLDialect.scala | 42 ++
.../spark/sql/hive/CarbonStrategies.scala | 505 +++++---------
.../apache/spark/sql/hive/CarbonStrategy.scala | 54 --
.../carbondata/spark/rdd/CarbonQueryRDD.scala | 241 -------
.../spark/rdd/CarbonRawQueryRDD.scala | 128 ----
.../carbondata/spark/rdd/CarbonScanRDD.scala | 219 ++++++
.../testsuite/joinquery/EquiJoinTestCase.scala | 41 --
12 files changed, 566 insertions(+), 2034 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 2bf50da..ffc5655 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -55,7 +55,10 @@ class CarbonContext(val sc: SparkContext, val storePath: String) extends HiveCon
protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
- experimental.extraStrategies = CarbonStrategy.getStrategy(self)
+ experimental.extraStrategies = {
+ val carbonStrategy = new CarbonStrategies(self)
+ Seq(carbonStrategy.CarbonTableScan, carbonStrategy.DDLStrategies)
+ }
@transient
val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index 8796707..cb20246 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -23,278 +23,70 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, _}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Max, Min, Sum}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.LeafNode
import org.apache.spark.sql.hive.CarbonMetastoreCatalog
import org.apache.spark.sql.types.{DataType, Decimal}
import org.apache.spark.unsafe.types.UTF8String
-import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.AbsoluteTableIdentifier
import org.carbondata.core.constants.CarbonCommonConstants
import org.carbondata.core.util.CarbonProperties
-import org.carbondata.hadoop.CarbonInputFormat
-import org.carbondata.query.aggregator.MeasureAggregator
-import org.carbondata.query.aggregator.impl.avg.AbstractAvgAggregator
-import org.carbondata.query.aggregator.impl.count.CountAggregator
-import org.carbondata.query.aggregator.impl.max.{MaxAggregator, MaxBigDecimalAggregator, MaxLongAggregator}
-import org.carbondata.query.aggregator.impl.min.{MinAggregator, MinBigDecimalAggregator, MinLongAggregator}
-import org.carbondata.query.aggregator.impl.sum.{SumBigDecimalAggregator, SumDoubleAggregator, SumLongAggregator}
-import org.carbondata.query.carbon.model.{CarbonQueryPlan, QueryDimension, QueryMeasure, QueryModel, SortOrderType}
-import org.carbondata.query.carbon.result.RowResult
-import org.carbondata.query.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.carbondata.query.expression.arithmetic.{AddExpression, DivideExpression, MultiplyExpression, SubstractExpression}
-import org.carbondata.query.expression.conditional._
-import org.carbondata.query.expression.logical.{AndExpression, OrExpression}
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.{KeyVal, KeyValImpl}
-import org.carbondata.spark.rdd.CarbonQueryRDD
-import org.carbondata.spark.util.{CarbonScalaUtil, QueryPlanUtil}
-
-case class CarbonTableScan(
- var attributes: Seq[Attribute],
- relation: CarbonRelation,
- dimensionPredicates: Seq[Expression],
- aggExprs: Option[Seq[Expression]],
- sortExprs: Option[Seq[SortOrder]],
- limitExpr: Option[Expression],
- isGroupByPresent: Boolean,
- detailQuery: Boolean = false)(@transient val oc: SQLContext)
- extends LeafNode {
-
- val cubeName = relation.cubeName
- val carbonTable = relation.metaData.carbonTable
+import org.carbondata.query.carbon.model._
+import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl}
+import org.carbondata.spark.rdd.CarbonScanRDD
+
+case class CarbonScan(
+ var attributesRaw: Seq[Attribute],
+ relationRaw: CarbonRelation,
+ dimensionPredicatesRaw: Seq[Expression],
+ aggExprsRaw: Option[Seq[Expression]],
+ useBinaryAggregator: Boolean)(@transient val ocRaw: SQLContext) extends LeafNode {
+ val carbonTable = relationRaw.metaData.carbonTable
val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
- var outputColumns = scala.collection.mutable.MutableList[Attribute]()
- var extraPreds: Seq[Expression] = Nil
- val allDims = new scala.collection.mutable.HashSet[String]()
- @transient val carbonCatalog = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
- def processAggregateExpr(plan: CarbonQueryPlan,
- currentAggregate: AggregateExpression,
- queryOrder: Int,
- aggCount: Int): Int = {
- currentAggregate match {
- case AggregateExpression(Sum(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.SUM)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "sum", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(
- Sum(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.SUM)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "sum", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(
- CarbonCount(p@PositionLiteral(attr: AttributeReference, _), None), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.COUNT)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "count", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(
- CarbonCount(lt: Literal, Some(p@PositionLiteral(attr: AttributeReference, _))), _, false)
- if lt.value == "*" || lt.value == 1 =>
- outputColumns += attr
- val m1 = new QueryMeasure("count(*)")
- m1.setAggregateFunction(CarbonCommonConstants.COUNT)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- plan.setCountStartQuery(true)
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
+ @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
- case AggregateExpression(
- CarbonAverage(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.AVERAGE)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "avg", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(
- CarbonAverage(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.AVERAGE)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "avg", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(Min(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.MIN)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims != null) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "min", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
+ val attributesNeedToDecode = new java.util.HashSet[AttributeReference]()
+ val unprocessedExprs = new ArrayBuffer[Expression]()
- case AggregateExpression(
- Min(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.MIN)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims != null) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "min", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(Max(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.MAX)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "max", d1.getQueryOrder)
- }
- }
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case AggregateExpression(
- Max(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
- outputColumns += attr
- val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (msrs.nonEmpty) {
- val m1 = new QueryMeasure(attr.name)
- m1.setAggregateFunction(CarbonCommonConstants.MAX)
- m1.setQueryOrder(queryOrder)
- plan.addMeasure(m1)
- } else {
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dims.nonEmpty) {
- val d1 = new QueryDimension(attr.name)
- d1.setQueryOrder(queryOrder)
- plan.addAggDimAggInfo(d1.getColumnName, "max", d1.getQueryOrder)
- }
+ val buildCarbonPlan: CarbonQueryPlan = {
+ val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.schemaName, relationRaw.tableName)
+
+ val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+ val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+ val dimAttr = new Array[Attribute](dimensions.size())
+ val msrAttr = new Array[Attribute](measures.size())
+ attributesRaw.foreach { attr =>
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if(carbonDimension != null) {
+ dimAttr(dimensions.indexOf(carbonDimension)) = attr
+ } else {
+ val carbonMeasure =
+ carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+ if(carbonMeasure != null) {
+ msrAttr(measures.indexOf(carbonMeasure)) = attr
}
- p.setPosition(queryOrder + aggCount)
- queryOrder + 1
-
- case _ => throw new
- Exception("Some Aggregate functions cannot be pushed, force to detailequery")
+ }
}
- }
-
- val buildCarbonPlan: CarbonQueryPlan = {
- val plan: CarbonQueryPlan = new CarbonQueryPlan(relation.schemaName, relation.cubeName)
+ attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
- var forceDetailedQuery = detailQuery
var queryOrder: Integer = 0
- attributes.map(
- attr => {
- val carbonDimension = carbonTable.getDimensionByName(carbonTable.getFactTableName
- , attr.name)
+ attributesRaw.foreach { attr =>
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if (carbonDimension != null) {
- allDims += attr.name
val dim = new QueryDimension(attr.name)
dim.setQueryOrder(queryOrder)
queryOrder = queryOrder + 1
selectedDims += dim
} else {
- val carbonMeasure = carbonTable.getMeasureByName(carbonTable.getFactTableName
- , attr.name)
+ val carbonMeasure =
+ carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
if (carbonMeasure != null) {
val m1 = new QueryMeasure(attr.name)
m1.setQueryOrder(queryOrder)
@@ -302,271 +94,108 @@ case class CarbonTableScan(
selectedMsrs += m1
}
}
- })
- queryOrder = 0
-
- // It is required to calculate as spark aggregators uses joined row with the current aggregates.
- val aggCount = aggExprs match {
- case Some(a: Seq[Expression]) =>
- a.map {
- case Alias(AggregateExpression(CarbonAverage(_), _, _), name) => 2
- case Alias(agg: AggregateExpression, name) => 1
- case _ => 0
- }.reduceLeftOption((left, right) => left + right).getOrElse(0)
- case _ => 0
- }
- // Separately handle group by columns, known or unknown partial aggregations and other
- // expressions. All single column & known aggregate expressions will use native aggregates for
- // measure and dimensions
- // Unknown aggregates & Expressions will use custom aggregator
-
- aggExprs match {
- case Some(a: Seq[Expression]) if !forceDetailedQuery =>
- a.foreach {
- case attr@AttributeReference(_, _, _, _) => // Add all the references to carbon query
- addCarbonColumn(attr)
- outputColumns += attr
- case al@ Alias(agg: AggregateExpression, name) =>
- queryOrder = processAggregateExpr(plan, agg, queryOrder, aggCount)
- case _ => forceDetailedQuery = true
- }
- case _ => forceDetailedQuery = true
- }
-
- def addCarbonColumn(attr: Attribute): Unit = {
- val carbonDimension = selectedDims
- .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (carbonDimension.nonEmpty) {
- val dim = new QueryDimension(attr.name)
- dim.setQueryOrder(queryOrder)
- plan.addDimension(dim)
- queryOrder = queryOrder + 1
- } else {
- val carbonMeasure = selectedMsrs
- .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (carbonMeasure.nonEmpty) {
- // added by vishal as we are adding for dimension so need to add to measure list
- // Carbon does not support group by on measure column so throwing exception to
- // make it detail query
- throw new
- Exception("Some Aggregate functions cannot be pushed, force to detailequery")
- }
- else {
- // Some unknown attribute name is found. this may be a derived column.
- // So, let's fall back to detailed query flow
- throw new Exception(
- "Some attributes referred looks derived columns. So, force to detailequery " +
- attr.name)
- }
- }
- }
-
- if (forceDetailedQuery) {
- // First clear the model if Msrs, Expressions and AggDimAggInfo filled
- plan.getDimensions.clear()
- plan.getMeasures.clear()
- plan.getDimAggregatorInfos.clear()
-
- // Fill the selected dimensions & measures obtained from
- // attributes to query plan for detailed query
- selectedDims.foreach(plan.addDimension)
- selectedMsrs.foreach(plan.addMeasure)
- }
- else {
- attributes.foreach { attr =>
- if (!outputColumns.exists(_.name.equals(attr.name))) {
- addCarbonColumn(attr)
- outputColumns += attr
- }
}
- attributes = outputColumns
- }
-
- val orderList = new ArrayList[QueryDimension]()
-
- var allSortExprPushed = true
- sortExprs match {
- case Some(a: Seq[SortOrder]) =>
- a.foreach {
- case SortOrder(Sum(attr: AttributeReference), order) => plan.getMeasures
- .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
- .setSortOrder(getSortDirection(order))
- case SortOrder(CarbonCount(attr: AttributeReference, _), order) => plan.getMeasures
- .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
- .setSortOrder(getSortDirection(order))
- case SortOrder(CarbonAverage(attr: AttributeReference), order) => plan.getMeasures
- .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
- .setSortOrder(getSortDirection(order))
- case SortOrder(attr: AttributeReference, order) =>
- val dim = plan.getDimensions
- .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if (dim.nonEmpty) {
- dim.head.setSortOrder(getSortDirection(order))
- orderList.add(dim.head)
- } else {
- allSortExprPushed = false
+ // Just find out that any aggregation functions are present on dimensions.
+ aggExprsRaw match {
+ case Some(aggExprs) =>
+ aggExprs.foreach {
+ case Alias(agg: AggregateExpression, name) =>
+ agg.collect {
+ case attr: AttributeReference =>
+ val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+ if(dims.nonEmpty) {
+ plan.addAggDimAggInfo(dims.head.getColumnName,
+ dims.head.getAggregateFunction,
+ dims.head.getQueryOrder)
+ }
}
- case _ => allSortExprPushed = false;
+ case _ =>
}
case _ =>
}
- plan.setSortedDimemsions(orderList)
+ // Fill the selected dimensions & measures obtained from
+ // attributes to query plan for detailed query
+ selectedDims.foreach(plan.addDimension)
+ selectedMsrs.foreach(plan.addMeasure)
- // limit can be pushed down only if sort is not present or all sort expressions are pushed
- if (sortExprs.isEmpty && forceDetailedQuery) {
- limitExpr match {
- case Some(IntegerLiteral(limit)) =>
- // if (plan.getMeasures.size() == 0 && plan.getDimAggregatorInfos.size() == 0) {
- plan.setLimit(limit)
- // }
- case _ =>
- }
- }
- plan.setDetailQuery(forceDetailedQuery)
+ plan.setSortedDimemsions(new ArrayList[QueryDimension])
+
+ plan.setRawDetailQuery(true)
plan.setOutLocationPath(
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
plan.setQueryId(System.nanoTime() + "")
- if (dimensionPredicates.nonEmpty) {
- val exps = preProcessExpressions(dimensionPredicates)
- val expressionVal = transformExpression(exps.head)
- // adding dimension used in expression in querystats
- expressionVal.getChildren.asScala.filter { x => x.isInstanceOf[CarbonColumnExpression] }
- .map { y => allDims += y.asInstanceOf[CarbonColumnExpression].getColumnName }
- plan.setFilterExpression(expressionVal)
- }
+ processFilterExpressions(plan)
plan
}
- def preProcessExpressions(expressions: Seq[Expression]): Seq[Expression] = {
- expressions match {
- case left :: right :: rest => preProcessExpressions(List(And(left, right)) ::: rest)
- case List(left, right) => List(And(left, right))
-
- case _ => expressions
- }
- }
-
- def transformExpression(expr: Expression): CarbonExpression = {
- expr match {
- case Or(left, right) => new
- OrExpression(transformExpression(left), transformExpression(right))
- case And(left, right) => new
- AndExpression(transformExpression(left), transformExpression(right))
- case EqualTo(left, right) => new
- EqualToExpression(transformExpression(left), transformExpression(right))
- case Not(EqualTo(left, right)) => new
- NotEqualsExpression(transformExpression(left), transformExpression(right))
- case IsNotNull(child) => new
- NotEqualsExpression(transformExpression(child), transformExpression(Literal(null)))
- case Not(In(left, right)) => new NotInExpression(transformExpression(left),
- new ListExpression(right.map(transformExpression).asJava))
- case In(left, right) => new InExpression(transformExpression(left),
- new ListExpression(right.map(transformExpression).asJava))
- case Add(left, right) => new
- AddExpression(transformExpression(left), transformExpression(right))
- case Subtract(left, right) => new
- SubstractExpression(transformExpression(left), transformExpression(right))
- case Multiply(left, right) => new
- MultiplyExpression(transformExpression(left), transformExpression(right))
- case Divide(left, right) => new
- DivideExpression(transformExpression(left), transformExpression(right))
- case GreaterThan(left, right) => new
- GreaterThanExpression(transformExpression(left), transformExpression(right))
- case LessThan(left, right) => new
- LessThanExpression(transformExpression(left), transformExpression(right))
- case GreaterThanOrEqual(left, right) => new
- GreaterThanEqualToExpression(transformExpression(left), transformExpression(right))
- case LessThanOrEqual(left, right) => new
- LessThanEqualToExpression(transformExpression(left), transformExpression(right))
- // convert StartWith('abc') or like(col 'abc%') to col >= 'abc' and col < 'abd'
- case StartsWith(left, right @ Literal(pattern, dataType)) if (pattern.toString.size > 0) =>
- val l = new GreaterThanEqualToExpression(
- transformExpression(left), transformExpression(right))
- val value = pattern.toString
- val maxValueLimit = value.substring(0, value.length - 1) +
- (value.charAt(value.length - 1).toInt + 1).toChar
- val r = new LessThanExpression(
- transformExpression(left),
- new CarbonLiteralExpression(maxValueLimit,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
- new AndExpression(l, r)
- case AttributeReference(name, dataType, _, _) => new CarbonColumnExpression(name.toString,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataType))
- case Literal(name, dataType) => new
- CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType))
- case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
- case aggExpr: AggregateExpression =>
- throw new UnsupportedOperationException(s"Cannot evaluate expression: $aggExpr")
- case _ =>
- new SparkUnknownExpression(expr.transform {
- case AttributeReference(name, dataType, _, _) =>
- CarbonBoundReference(new CarbonColumnExpression(name.toString,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
- })
+ def processFilterExpressions(plan: CarbonQueryPlan) {
+ if (dimensionPredicatesRaw.nonEmpty) {
+ val expressionVal = CarbonFilters.processExpression(
+ dimensionPredicatesRaw,
+ attributesNeedToDecode,
+ unprocessedExprs,
+ carbonTable)
+ expressionVal match {
+ case Some(ce) =>
+ // adding dimension used in expression in querystats
+ plan.setFilterExpression(ce)
+ case _ =>
+ }
}
+ processExtraAttributes(plan)
}
- private def getSortDirection(sort: SortDirection) = {
- sort match {
- case Ascending => SortOrderType.ASC
- case Descending => SortOrderType.DSC
+ private def processExtraAttributes(plan: CarbonQueryPlan) {
+ if (attributesNeedToDecode.size() > 0) {
+ val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+
+ attributesNeedToDecode.asScala.map { attr =>
+ val dims = plan.getDimensions.asScala.filter(f => f.getColumnName.equals(attr.name))
+ val msrs = plan.getMeasures.asScala.filter(f => f.getColumnName.equals(attr.name))
+ var order = plan.getDimensions.size() + plan.getMeasures.size()
+ if (dims.isEmpty && msrs.isEmpty) {
+ val dimension = carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (dimension != null) {
+ val qDim = new QueryDimension(dimension.getColName)
+ qDim.setQueryOrder(order)
+ plan.addDimension(qDim)
+ attributeOut += attr
+ order += 1
+ } else {
+ val measure = carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+ if (measure != null) {
+ val qMsr = new QueryMeasure(measure.getColName)
+ qMsr.setQueryOrder(order)
+ plan.addMeasure(qMsr)
+ order += 1
+ attributeOut += attr
+ }
+ }
+ }
+ }
+ attributesRaw = attributeOut
}
}
- def addPushdownFilters(keys: Seq[Expression], filters: Array[Array[Expression]],
- conditions: Option[Expression]) {
-
- // TODO Values in the IN filter is duplicate. replace the list with set
- val buffer = new ArrayBuffer[Expression]
- keys.zipWithIndex.foreach { a =>
- buffer += In(a._1, filters(a._2)).asInstanceOf[Expression]
- }
-
- // Let's not pushdown condition. Only filter push down is sufficient.
- // Conditions can be applied on hash join result.
- val cond = if (buffer.size > 1) {
- val e = buffer.remove(0)
- buffer.fold(e)(And(_, _))
- } else {
- buffer.asJava.get(0)
- }
-
- extraPreds = Seq(cond)
- }
-
- def inputRdd: CarbonQueryRDD[CarbonKey, CarbonValue] = {
- val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- // Update the FilterExpressions with extra conditions added through join pushdown
- if (extraPreds.nonEmpty) {attributes
- val exps = preProcessExpressions(extraPreds)
- val expressionVal = transformExpression(exps.head)
- val oldExpressionVal = buildCarbonPlan.getFilterExpression
- if (null == oldExpressionVal) {
- buildCarbonPlan.setFilterExpression(expressionVal)
- } else {
- buildCarbonPlan.setFilterExpression(new AndExpression(oldExpressionVal, expressionVal))
- }
- }
+ def inputRdd: CarbonScanRDD[Array[Any], Any] = {
val conf = new Configuration()
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
+ buildCarbonPlan.getDimAggregatorInfos.clear()
val model = QueryModel.createModel(
absoluteTableIdentifier, buildCarbonPlan, carbonTable)
- val kv: KeyVal[CarbonKey, CarbonValue] = new KeyValImpl()
+ val kv: RawKey[Array[Any], Any] = new RawKeyImpl()
// setting queryid
- buildCarbonPlan.setQueryId(oc.getConf("queryId", System.nanoTime() + ""))
-
- LOG.info("Selected Table to Query ****** "
- + model.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
-
- val cubeCreationTime = carbonCatalog.getCubeCreationTime(relation.schemaName, cubeName)
- val schemaLastUpdatedTime =
- carbonCatalog.getSchemaLastUpdatedTime(relation.schemaName, cubeName)
- val big = new CarbonQueryRDD(
- oc.sparkContext,
+ buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+
+ val cubeCreationTime = carbonCatalog
+ .getCubeCreationTime(relationRaw.schemaName, relationRaw.tableName)
+ val schemaLastUpdatedTime = carbonCatalog
+ .getSchemaLastUpdatedTime(relationRaw.schemaName, relationRaw.tableName)
+ val big = new CarbonScanRDD(
+ ocRaw.sparkContext,
model,
buildCarbonPlan.getFilterExpression,
kv,
@@ -578,78 +207,35 @@ case class CarbonTableScan(
}
- override def outputsUnsafeRows: Boolean = false
+ override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
- def doExecute(): RDD[InternalRow] = {
+ override def doExecute(): RDD[InternalRow] = {
def toType(obj: Any): Any = {
obj match {
case s: String => UTF8String.fromString(s)
- case avg: AbstractAvgAggregator =>
- if (avg.isFirstTime) {
- null
- } else {
- new GenericArrayData(avg.getAvgState.asInstanceOf[Array[Any]])
- }
- case c: CountAggregator => c.getLongValue
- case s: SumDoubleAggregator => s.getDoubleValue
- case s: SumBigDecimalAggregator => Decimal(s.getBigDecimalValue)
- case s: SumLongAggregator => s.getLongValue
- case m: MaxBigDecimalAggregator => Decimal(m.getBigDecimalValue)
- case m: MaxLongAggregator => m.getLongValue
- case m: MaxAggregator => toType(m.getValueObject)
- case m: MinBigDecimalAggregator => Decimal(m.getBigDecimalValue)
- case m: MinLongAggregator => m.getLongValue
- case m: MinAggregator => toType(m.getValueObject)
- case m: MeasureAggregator => toType(m.getValueObject)
case _ => obj
}
}
+ val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
+ inputRdd.mapPartitions { iter =>
+ val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = iter.hasNext
-// val unsafeProjection = UnsafeProjection.create(attributes.map(_.dataType).toArray)
- // count(*) query executed in driver by querying from Btree
- if (isCountQuery) {
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) =
- QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
- // get row count
- val rowCount = carbonInputFormat.getRowCount(job)
- val countAgg = new CountAggregator()
- countAgg.setNewValue(rowCount)
- sparkContext.parallelize(
- Seq(new GenericMutableRow(Seq(countAgg.getLongValue).toArray.asInstanceOf[Array[Any]]))
- )
- } else {
- // all the other queries are sent to executor
- inputRdd.mapPartitions { iter =>
- new Iterator[InternalRow] {
-// val unsafeProjection = UnsafeProjection.create(attributes.map(_.dataType).toArray)
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): InternalRow = {
- new GenericMutableRow(iter.next()._1.getKey.map(toType))
+ override def next(): InternalRow =
+ if (outUnsafeRows) {
+ unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType)))
+ } else {
+ new GenericMutableRow(iter.next()._1.map(toType))
}
- }
}
}
}
- /**
- * return true if query is count queryUtils
- *
- * @return
- */
- def isCountQuery: Boolean = {
- if (buildCarbonPlan.isCountStarQuery() && null == buildCarbonPlan.getFilterExpression &&
- buildCarbonPlan.getDimensions.size() < 1 && buildCarbonPlan.getMeasures.size() < 2 &&
- buildCarbonPlan.getDimAggregatorInfos.size() < 1) {
- true
- } else {
- false
- }
- }
-
def output: Seq[Attribute] = {
- attributes
+ attributesRaw
}
+
}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
deleted file mode 100644
index 2005300..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.ArrayList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-import org.apache.spark.sql.types.{DataType, Decimal}
-import org.apache.spark.unsafe.types.UTF8String
-
-import org.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.carbondata.core.constants.CarbonCommonConstants
-import org.carbondata.core.util.CarbonProperties
-import org.carbondata.query.carbon.model._
-import org.carbondata.query.carbon.result.BatchRawResult
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper
-import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl, RawKeyVal, RawKeyValImpl}
-import org.carbondata.spark.rdd.CarbonRawQueryRDD
-
-
-case class CarbonRawTableScan(
- var attributesRaw: Seq[Attribute],
- relationRaw: CarbonRelation,
- dimensionPredicatesRaw: Seq[Expression],
- aggExprsRaw: Option[Seq[Expression]],
- useBinaryAggregator: Boolean)(@transient val ocRaw: SQLContext) extends LeafNode
-{
- val carbonTable = relationRaw.metaData.carbonTable
- val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
- val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
- @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
- val attributesNeedToDecode = new java.util.HashSet[AttributeReference]()
- val unprocessedExprs = new ArrayBuffer[Expression]()
-
- val buildCarbonPlan: CarbonQueryPlan = {
- val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.schemaName, relationRaw.tableName)
-
- val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- val dimAttr = new Array[Attribute](dimensions.size())
- val msrAttr = new Array[Attribute](measures.size())
- attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if(carbonDimension != null) {
- dimAttr(dimensions.indexOf(carbonDimension)) = attr
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if(carbonMeasure != null) {
- msrAttr(measures.indexOf(carbonMeasure)) = attr
- }
- }
- }
-
- attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
-
- var queryOrder: Integer = 0
- attributesRaw.foreach { attr =>
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null) {
- val dim = new QueryDimension(attr.name)
- dim.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedDims += dim
- } else {
- val carbonMeasure =
- carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if (carbonMeasure != null) {
- val m1 = new QueryMeasure(attr.name)
- m1.setQueryOrder(queryOrder)
- queryOrder = queryOrder + 1
- selectedMsrs += m1
- }
- }
- }
- // Just find out that any aggregation functions are present on dimensions.
- aggExprsRaw match {
- case Some(aggExprs) =>
- aggExprs.foreach {
- case Alias(agg: AggregateExpression, name) =>
- agg.collect {
- case attr: AttributeReference =>
- val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
- if(dims.nonEmpty) {
- plan.addAggDimAggInfo(dims.head.getColumnName,
- dims.head.getAggregateFunction,
- dims.head.getQueryOrder)
- }
- }
- case _ =>
- }
- case _ =>
- }
-
- // Fill the selected dimensions & measures obtained from
- // attributes to query plan for detailed query
- selectedDims.foreach(plan.addDimension)
- selectedMsrs.foreach(plan.addMeasure)
-
- plan.setSortedDimemsions(new ArrayList[QueryDimension])
-
- plan.setRawDetailQuery(true)
- plan.setOutLocationPath(
- CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
- plan.setQueryId(System.nanoTime() + "")
- processFilterExpressions(plan)
- plan
- }
-
- def processFilterExpressions(plan: CarbonQueryPlan) {
- if (dimensionPredicatesRaw.nonEmpty) {
- val expressionVal = CarbonFilters.processExpression(
- dimensionPredicatesRaw,
- attributesNeedToDecode,
- unprocessedExprs,
- carbonTable)
- expressionVal match {
- case Some(ce) =>
- // adding dimension used in expression in querystats
- plan.setFilterExpression(ce)
- case _ =>
- }
- }
- processExtraAttributes(plan)
- }
-
- private def processExtraAttributes(plan: CarbonQueryPlan) {
- if (attributesNeedToDecode.size() > 0) {
- val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
-
- attributesNeedToDecode.asScala.map { attr =>
- val dims = plan.getDimensions.asScala.filter(f => f.getColumnName.equals(attr.name))
- val msrs = plan.getMeasures.asScala.filter(f => f.getColumnName.equals(attr.name))
- var order = plan.getDimensions.size() + plan.getMeasures.size()
- if (dims.isEmpty && msrs.isEmpty) {
- val dimension = carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (dimension != null) {
- val qDim = new QueryDimension(dimension.getColName)
- qDim.setQueryOrder(order)
- plan.addDimension(qDim)
- attributeOut += attr
- order += 1
- } else {
- val measure = carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
- if (measure != null) {
- val qMsr = new QueryMeasure(measure.getColName)
- qMsr.setQueryOrder(order)
- plan.addMeasure(qMsr)
- order += 1
- attributeOut += attr
- }
- }
- }
- }
- attributesRaw = attributeOut
- }
- }
-
-
- def inputRdd: CarbonRawQueryRDD[Array[Any], Any] = {
-
- val conf = new Configuration()
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- buildCarbonPlan.getDimAggregatorInfos.clear()
- val model = QueryModel.createModel(
- absoluteTableIdentifier, buildCarbonPlan, carbonTable)
- val kv: RawKey[Array[Any], Any] = new RawKeyImpl()
- // setting queryid
- buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
- val cubeCreationTime = carbonCatalog
- .getCubeCreationTime(relationRaw.schemaName, relationRaw.tableName)
- val schemaLastUpdatedTime = carbonCatalog
- .getSchemaLastUpdatedTime(relationRaw.schemaName, relationRaw.tableName)
- val big = new CarbonRawQueryRDD(
- ocRaw.sparkContext,
- model,
- buildCarbonPlan.getFilterExpression,
- kv,
- conf,
- cubeCreationTime,
- schemaLastUpdatedTime,
- carbonCatalog.storePath)
- big
- }
-
-
- override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
-
- override def doExecute(): RDD[InternalRow] = {
- def toType(obj: Any): Any = {
- obj match {
- case s: String => UTF8String.fromString(s)
- case _ => obj
- }
- }
- val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
- inputRdd.mapPartitions { iter =>
- val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
- new Iterator[InternalRow] {
- override def hasNext: Boolean = iter.hasNext
-
- override def next(): InternalRow =
- if (outUnsafeRows) {
- unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType)))
- } else {
- new GenericMutableRow(iter.next()._1.map(toType))
- }
- }
- }
- }
-
- def output: Seq[Attribute] = {
- attributesRaw
- }
-
-}
-
-class CarbonRawMutableRow(values: Array[Array[Object]],
- val schema: QuerySchemaInfo) extends GenericMutableRow(values.asInstanceOf[Array[Any]]) {
-
- val dimsLen = schema.getQueryDimensions.length - 1
- val order = schema.getQueryOrder
- var counter = 0
- val size = {
- if (values.nonEmpty) {
- values.head.length
- } else {
- 0
- }
- }
-
- def getKey: ByteArrayWrapper = values.head(counter).asInstanceOf[ByteArrayWrapper]
-
- def parseKey(key: ByteArrayWrapper, aggData: Array[Object], order: Array[Int]): Array[Object] = {
- BatchRawResult.parseData(key, aggData, schema, order)
- }
-
- def hasNext: Boolean = {
- counter < size
- }
-
- def next(): Unit = {
- counter += 1
- }
-
- override def numFields: Int = dimsLen + schema.getQueryMeasures.length
-
- override def anyNull: Boolean = true
-
- override def get(ordinal: Int, dataType: DataType): AnyRef = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[AnyRef]
- }
-
- override def getUTF8String(ordinal: Int): UTF8String = {
- UTF8String
- .fromString(values(
- order(ordinal) - dimsLen)(counter)
- .asInstanceOf[String])
- }
-
- override def getDouble(ordinal: Int): Double = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Double]
- }
-
- override def getFloat(ordinal: Int): Float = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Float]
- }
-
- override def getLong(ordinal: Int): Long = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Long]
- }
-
- override def getByte(ordinal: Int): Byte = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Byte]
- }
-
- override def getDecimal(ordinal: Int,
- precision: Int,
- scale: Int): Decimal = {
- values(order(ordinal) - dimsLen)(counter).asInstanceOf[Decimal]
- }
-
- override def getBoolean(ordinal: Int): Boolean = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Boolean]
- }
-
- override def getShort(ordinal: Int): Short = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Short]
- }
-
- override def getInt(ordinal: Int): Int = {
- values(order(ordinal) - dimsLen)(counter)
- .asInstanceOf[Int]
- }
-
- override def isNullAt(ordinal: Int): Boolean = values(order(ordinal) - dimsLen)(counter) == null
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala
deleted file mode 100644
index 79ba157..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.Array.canBuildFrom
-
-import org.apache.spark.{InternalAccumulator, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonTableScan
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, Literal}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.ThreadUtils
-
-case class BroadCastFilterPushJoin(
- leftKeys: Seq[Expression],
- rightKeys: Seq[Expression],
- buildSide: BuildSide,
- left: SparkPlan,
- right: SparkPlan,
- condition: Option[Expression]) extends BinaryNode with HashJoin {
-
- override private[sql] lazy val metrics = Map(
- "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
- "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
- "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
- val timeout: Duration = {
- val timeoutValue = sqlContext.conf.broadcastTimeout
- if (timeoutValue < 0) {
- Duration.Inf
- } else {
- timeoutValue.seconds
- }
- }
- private lazy val (input: Array[InternalRow], inputCopy: Array[InternalRow]) = {
- val numBuildRows = buildSide match {
- case BuildLeft => longMetric("numLeftRows")
- case BuildRight => longMetric("numRightRows")
- }
- val buildPlanOutput = buildPlan.execute()
- val input: Array[InternalRow] = buildPlanOutput.map(_.copy()).collect()
- val inputCopy: Array[InternalRow] = buildPlanOutput.map(_.copy()).collect()
- (input, inputCopy)
- }
- // Use lazy so that we won't do broadcast when calling explain but still cache the broadcast value
- // for the same query.
- @transient
- private lazy val broadcastFuture = {
- // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
- val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- future {
- // This will run in another thread. Set the execution id so that we can connect these jobs
- // with the correct execution.
- SQLExecution.withExecutionId(sparkContext, executionId) {
- // The following line doesn't run in a job so we cannot track the metric value. However, we
- // have already tracked it in the above lines. So here we can use
- // `SQLMetrics.nullLongMetric` to ignore it.
- val hashed = HashedRelation(
- input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
- sparkContext.broadcast(hashed)
- }
- }(BroadCastFilterPushJoin.broadcastHashJoinExecutionContext)
- }
-
- override def doExecute(): RDD[InternalRow] = {
-
- val numOutputRows = longMetric("numOutputRows")
- val (numBuildRows, numStreamedRows) = buildSide match {
- case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
- case BuildRight => (longMetric("numRightRows"), longMetric("numLeftRows"))
- }
-
- val keys = buildKeys.map { a =>
- BindReferences.bindReference(a, buildPlan.output)
- }.toArray
- val filters = keys.map {
- k =>
- inputCopy.map(
- r => {
- val curr = k.eval(r)
- if (curr.isInstanceOf[UTF8String]) {
- Literal(curr.toString).asInstanceOf[Expression]
- } else {
- Literal(curr).asInstanceOf[Expression]
- }
- })
- }
- val carbonScan = buildSide match {
- case BuildLeft => right
- case BuildRight => left
- }
-
- val cubeScan = carbonScan.collectFirst { case a: CarbonTableScan => a }
- if (cubeScan.isDefined) {
- cubeScan.get.addPushdownFilters(streamedKeys, filters, condition)
- }
-
- val streamedPlanOutput = streamedPlan.execute()
- // scalastyle:off
- val broadcastRelation = Await.result(broadcastFuture, timeout)
- // scalastyle:on
- streamedPlanOutput.mapPartitions { streamedIter =>
- val hashedRelation = broadcastRelation.value
- hashedRelation match {
- case unsafe: UnsafeHashedRelation =>
- TaskContext.get().internalMetricsToAccumulators(
- InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
- case _ =>
- }
- hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
- }
-
- }
-}
-
-object BroadCastFilterPushJoin {
-
- private[joins] val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("filterpushhash-join", 128))
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
deleted file mode 100644
index c01a937..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql.hive
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
-import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{Filter, Project, SparkPlan}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
-
-import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
-
-class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
-
- override def strategies: Seq[Strategy] = getStrategies
-
- val LOGGER = LogServiceFactory.getLogService("CarbonRawStrategies")
-
- def getStrategies: Seq[Strategy] = {
- val total = sqlContext.planner.strategies :+ CarbonRawTableScans
- total
- }
-
- /**
- * Carbon strategies for Carbon cube scanning
- */
- private[sql] object CarbonRawTableScans extends Strategy {
-
- def apply(plan: LogicalPlan): Seq[SparkPlan] = {
- plan match {
- case PhysicalOperation(projectList, predicates,
- l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) =>
- if (isStarQuery(plan)) {
- carbonRawScanForStarQuery(projectList, predicates, carbonRelation, l)(sqlContext) :: Nil
- } else {
- carbonRawScan(projectList,
- predicates,
- carbonRelation,
- l,
- None,
- detailQuery = true,
- useBinaryAggregation = false)(sqlContext)._1 :: Nil
- }
- case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
- CarbonDictionaryDecoder(relations,
- profile,
- aliasMap,
- planLater(child))(sqlContext) :: Nil
- case _ =>
- Nil
- }
- }
-
- /**
- * Create carbon scan
- */
- private def carbonRawScan(projectList: Seq[NamedExpression],
- predicates: Seq[Expression],
- relation: CarbonDatasourceRelation,
- logicalRelation: LogicalRelation,
- groupExprs: Option[Seq[Expression]],
- detailQuery: Boolean,
- useBinaryAggregation: Boolean)(sc: SQLContext): (SparkPlan, Boolean) = {
-
- val tableName: String =
- relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
- // Check out any expressions are there in project list. if they are present then we need to
- // decode them as well.
- val projectSet = AttributeSet(projectList.flatMap(_.references))
- val scan = CarbonRawTableScan(projectSet.toSeq,
- relation.carbonRelation,
- predicates,
- groupExprs,
- useBinaryAggregation)(sqlContext)
- val dimAggrsPresence: Boolean = scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0
- projectList.map {
- case attr: AttributeReference =>
- case Alias(attr: AttributeReference, _) =>
- case others =>
- others.references
- .map(f => scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]))
- }
- if (!detailQuery) {
- if (scan.attributesNeedToDecode.size > 0) {
- val decoder = getCarbonDecoder(logicalRelation,
- sc,
- tableName,
- scan.attributesNeedToDecode.asScala.toSeq,
- scan)
- if (scan.unprocessedExprs.nonEmpty) {
- val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
- (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true)
- } else {
- (Project(projectList, decoder), true)
- }
- } else {
- (scan, dimAggrsPresence)
- }
- } else {
- if (scan.attributesNeedToDecode.size() > 0) {
- val decoder = getCarbonDecoder(logicalRelation,
- sc,
- tableName,
- scan.attributesNeedToDecode.asScala.toSeq,
- scan)
- if (scan.unprocessedExprs.nonEmpty) {
- val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
- (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true)
- } else {
- (Project(projectList, decoder), true)
- }
- } else {
- (Project(projectList, scan), dimAggrsPresence)
- }
- }
- }
-
- /**
- * Create carbon scan for star query
- */
- private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
- predicates: Seq[Expression],
- relation: CarbonDatasourceRelation,
- logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-
- val tableName: String =
- relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
- // Check out any expressions are there in project list. if they are present then we need to
- // decode them as well.
- val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
- val scan = CarbonRawTableScan(projectList.map(_.toAttribute),
- relation.carbonRelation,
- predicates,
- None,
- useBinaryAggregator = false)(sqlContext)
- projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
- if (projectExprsNeedToDecode.size() > 0) {
- val decoder = getCarbonDecoder(logicalRelation,
- sc,
- tableName,
- projectExprsNeedToDecode.asScala.toSeq,
- scan)
- if (scan.unprocessedExprs.nonEmpty) {
- val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
- filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
- } else {
- decoder
- }
- } else {
- scan
- }
- }
-
- def getCarbonDecoder(logicalRelation: LogicalRelation,
- sc: SQLContext,
- tableName: String,
- projectExprsNeedToDecode: Seq[Attribute],
- scan: CarbonRawTableScan): CarbonDictionaryDecoder = {
- val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
- logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
- val attrs = projectExprsNeedToDecode.map { attr =>
- val newAttr = AttributeReference(attr.name,
- attr.dataType,
- attr.nullable,
- attr.metadata)(attr.exprId, Seq(tableName))
- relation.addAttribute(newAttr)
- newAttr
- }
- CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
- CarbonAliasDecoderRelation(), scan)(sc)
- }
-
- private def isStarQuery(plan: LogicalPlan) = {
- plan match {
- case LogicalFilter(condition,
- LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => true
- case LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) => true
- case _ => false
- }
- }
-
- private def isGroupByPresentOnMeasures(groupingExpressions: Seq[Expression],
- carbonTable: CarbonTable): Boolean = {
- groupingExpressions.map { g =>
- g.collect {
- case attr: AttributeReference
- if carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name) != null =>
- return true
- }
- }
- false
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
new file mode 100644
index 0000000..edfaa90
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.CarbonSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.carbondata.spark.exception.MalformedCarbonCommandException
+
+private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialect(context) {
+
+ @transient
+ protected val sqlParser = new CarbonSqlParser
+
+ override def parse(sqlText: String): LogicalPlan = {
+
+ try {
+ sqlParser.parse(sqlText)
+ } catch {
+ // MalformedCarbonCommandException need to throw directly
+ // because hive can no parse carbon command
+ case ce: MalformedCarbonCommandException =>
+ throw ce
+ case _ => super.parse(sqlText)
+ }
+ }
+}