You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:59 UTC

[11/56] [abbrv] incubator-carbondata git commit: Update SQL planning in carbon-spark (#682)

Update SQL planning in carbon-spark (#682)

* [Issue-660] Show segments query should not fail, if table name is case insensitive (#662)

* Show segments should not fail, if table name is case insensitive

* Corrected test case

* [issue-656] fix load data when int column contains integer.min_value (#657)

* load data when int column contains min Integer

* fixed test case

*  fix test bigint

*  fix test bigint

* removed no used DATA_BIGINT case

* removed no used condition for unCompressMaxMin

* [issue- 664] select count(joinDate) from table_x is failing for direct dictionary column (#665)

* Supported Spark 1.6 by changing aggregation interfaces

* Fixed compile issue after rebase

* optmizing the flow with unsafe row

* Fixed bugs in push up

* Fixed compiler issues after rebasing

* Fixed merging issue after rebase

* Fixed scan query pushdown

* keep pushup strategy only

* keep only on QueryRDD

* rename QueryRDD to ScanRDD and clean up code

* fix scalastyle


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a83dba34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a83dba34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a83dba34

Branch: refs/heads/master
Commit: a83dba3433525eaf2f255912184a6e1a6d7dbdea
Parents: ead0076
Author: Jacky Li <ja...@huawei.com>
Authored: Fri Jun 17 16:02:55 2016 +0800
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Fri Jun 17 13:32:55 2016 +0530

----------------------------------------------------------------------
 .../org/apache/spark/sql/CarbonContext.scala    |   5 +-
 .../org/apache/spark/sql/CarbonOperators.scala  | 676 ++++---------------
 .../apache/spark/sql/CarbonRawOperators.scala   | 332 ---------
 .../spark/sql/execution/joins/CarbonJoins.scala | 140 ----
 .../spark/sql/hive/CarbonRawStrategies.scala    | 217 ------
 .../spark/sql/hive/CarbonSQLDialect.scala       |  42 ++
 .../spark/sql/hive/CarbonStrategies.scala       | 505 +++++---------
 .../apache/spark/sql/hive/CarbonStrategy.scala  |  54 --
 .../carbondata/spark/rdd/CarbonQueryRDD.scala   | 241 -------
 .../spark/rdd/CarbonRawQueryRDD.scala           | 128 ----
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 219 ++++++
 .../testsuite/joinquery/EquiJoinTestCase.scala  |  41 --
 12 files changed, 566 insertions(+), 2034 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index 2bf50da..ffc5655 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -55,7 +55,10 @@ class CarbonContext(val sc: SparkContext, val storePath: String) extends HiveCon
 
   protected[sql] override def getSQLDialect(): ParserDialect = new CarbonSQLDialect(this)
 
-  experimental.extraStrategies = CarbonStrategy.getStrategy(self)
+  experimental.extraStrategies = {
+    val carbonStrategy = new CarbonStrategies(self)
+    Seq(carbonStrategy.CarbonTableScan, carbonStrategy.DDLStrategies)
+  }
 
   @transient
   val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index 8796707..cb20246 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -23,278 +23,70 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.agg.{CarbonAverage, CarbonCount}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, _}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Max, Min, Sum}
-import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.execution.LeafNode
 import org.apache.spark.sql.hive.CarbonMetastoreCatalog
 import org.apache.spark.sql.types.{DataType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
-import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.AbsoluteTableIdentifier
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.util.CarbonProperties
-import org.carbondata.hadoop.CarbonInputFormat
-import org.carbondata.query.aggregator.MeasureAggregator
-import org.carbondata.query.aggregator.impl.avg.AbstractAvgAggregator
-import org.carbondata.query.aggregator.impl.count.CountAggregator
-import org.carbondata.query.aggregator.impl.max.{MaxAggregator, MaxBigDecimalAggregator, MaxLongAggregator}
-import org.carbondata.query.aggregator.impl.min.{MinAggregator, MinBigDecimalAggregator, MinLongAggregator}
-import org.carbondata.query.aggregator.impl.sum.{SumBigDecimalAggregator, SumDoubleAggregator, SumLongAggregator}
-import org.carbondata.query.carbon.model.{CarbonQueryPlan, QueryDimension, QueryMeasure, QueryModel, SortOrderType}
-import org.carbondata.query.carbon.result.RowResult
-import org.carbondata.query.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
-import org.carbondata.query.expression.arithmetic.{AddExpression, DivideExpression, MultiplyExpression, SubstractExpression}
-import org.carbondata.query.expression.conditional._
-import org.carbondata.query.expression.logical.{AndExpression, OrExpression}
-import org.carbondata.query.scanner.impl.{CarbonKey, CarbonValue}
-import org.carbondata.spark.{KeyVal, KeyValImpl}
-import org.carbondata.spark.rdd.CarbonQueryRDD
-import org.carbondata.spark.util.{CarbonScalaUtil, QueryPlanUtil}
-
-case class CarbonTableScan(
-    var attributes: Seq[Attribute],
-    relation: CarbonRelation,
-    dimensionPredicates: Seq[Expression],
-    aggExprs: Option[Seq[Expression]],
-    sortExprs: Option[Seq[SortOrder]],
-    limitExpr: Option[Expression],
-    isGroupByPresent: Boolean,
-    detailQuery: Boolean = false)(@transient val oc: SQLContext)
-  extends LeafNode {
-
-  val cubeName = relation.cubeName
-  val carbonTable = relation.metaData.carbonTable
+import org.carbondata.query.carbon.model._
+import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl}
+import org.carbondata.spark.rdd.CarbonScanRDD
+
+case class CarbonScan(
+    var attributesRaw: Seq[Attribute],
+    relationRaw: CarbonRelation,
+    dimensionPredicatesRaw: Seq[Expression],
+    aggExprsRaw: Option[Seq[Expression]],
+    useBinaryAggregator: Boolean)(@transient val ocRaw: SQLContext) extends LeafNode {
+  val carbonTable = relationRaw.metaData.carbonTable
   val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
   val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
-  var outputColumns = scala.collection.mutable.MutableList[Attribute]()
-  var extraPreds: Seq[Expression] = Nil
-  val allDims = new scala.collection.mutable.HashSet[String]()
-  @transient val carbonCatalog = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
-  def processAggregateExpr(plan: CarbonQueryPlan,
-      currentAggregate: AggregateExpression,
-      queryOrder: Int,
-      aggCount: Int): Int = {
-    currentAggregate match {
-      case AggregateExpression(Sum(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.SUM)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "sum", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(
-        Sum(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.SUM)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "sum", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(
-        CarbonCount(p@PositionLiteral(attr: AttributeReference, _), None), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.COUNT)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "count", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(
-        CarbonCount(lt: Literal, Some(p@PositionLiteral(attr: AttributeReference, _))), _, false)
-        if lt.value == "*" || lt.value == 1 =>
-        outputColumns += attr
-        val m1 = new QueryMeasure("count(*)")
-        m1.setAggregateFunction(CarbonCommonConstants.COUNT)
-        m1.setQueryOrder(queryOrder)
-        plan.addMeasure(m1)
-        plan.setCountStartQuery(true)
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
+  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
 
-      case AggregateExpression(
-        CarbonAverage(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.AVERAGE)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "avg", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(
-        CarbonAverage(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.AVERAGE)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "avg", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(Min(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.MIN)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims != null) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "min", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
+  val attributesNeedToDecode = new java.util.HashSet[AttributeReference]()
+  val unprocessedExprs = new ArrayBuffer[Expression]()
 
-      case AggregateExpression(
-        Min(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.MIN)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims != null) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "min", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(Max(p@PositionLiteral(attr: AttributeReference, _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.MAX)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "max", d1.getQueryOrder)
-          }
-        }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case AggregateExpression(
-        Max(Cast(p@PositionLiteral(attr: AttributeReference, _), _)), _, false) =>
-        outputColumns += attr
-        val msrs = selectedMsrs.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (msrs.nonEmpty) {
-          val m1 = new QueryMeasure(attr.name)
-          m1.setAggregateFunction(CarbonCommonConstants.MAX)
-          m1.setQueryOrder(queryOrder)
-          plan.addMeasure(m1)
-        } else {
-          val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-          if (dims.nonEmpty) {
-            val d1 = new QueryDimension(attr.name)
-            d1.setQueryOrder(queryOrder)
-            plan.addAggDimAggInfo(d1.getColumnName, "max", d1.getQueryOrder)
-          }
+  val buildCarbonPlan: CarbonQueryPlan = {
+    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.schemaName, relationRaw.tableName)
+
+    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+    val dimAttr = new Array[Attribute](dimensions.size())
+    val msrAttr = new Array[Attribute](measures.size())
+    attributesRaw.foreach { attr =>
+      val carbonDimension =
+        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+      if(carbonDimension != null) {
+        dimAttr(dimensions.indexOf(carbonDimension)) = attr
+      } else {
+        val carbonMeasure =
+          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+        if(carbonMeasure != null) {
+          msrAttr(measures.indexOf(carbonMeasure)) = attr
         }
-        p.setPosition(queryOrder + aggCount)
-        queryOrder + 1
-
-      case _ => throw new
-          Exception("Some Aggregate functions cannot be pushed, force to detailequery")
+      }
     }
-  }
-
-  val buildCarbonPlan: CarbonQueryPlan = {
-    val plan: CarbonQueryPlan = new CarbonQueryPlan(relation.schemaName, relation.cubeName)
 
+    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
 
-    var forceDetailedQuery = detailQuery
     var queryOrder: Integer = 0
-    attributes.map(
-      attr => {
-        val carbonDimension = carbonTable.getDimensionByName(carbonTable.getFactTableName
-          , attr.name)
+    attributesRaw.foreach { attr =>
+        val carbonDimension =
+          carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
         if (carbonDimension != null) {
-          allDims += attr.name
           val dim = new QueryDimension(attr.name)
           dim.setQueryOrder(queryOrder)
           queryOrder = queryOrder + 1
           selectedDims += dim
         } else {
-          val carbonMeasure = carbonTable.getMeasureByName(carbonTable.getFactTableName
-            , attr.name)
+          val carbonMeasure =
+            carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
           if (carbonMeasure != null) {
             val m1 = new QueryMeasure(attr.name)
             m1.setQueryOrder(queryOrder)
@@ -302,271 +94,108 @@ case class CarbonTableScan(
             selectedMsrs += m1
           }
         }
-      })
-    queryOrder = 0
-
-    // It is required to calculate as spark aggregators uses joined row with the current aggregates.
-    val aggCount = aggExprs match {
-      case Some(a: Seq[Expression]) =>
-        a.map {
-          case Alias(AggregateExpression(CarbonAverage(_), _, _), name) => 2
-          case Alias(agg: AggregateExpression, name) => 1
-          case _ => 0
-        }.reduceLeftOption((left, right) => left + right).getOrElse(0)
-      case _ => 0
-    }
-    // Separately handle group by columns, known or unknown partial aggregations and other
-    // expressions. All single column & known aggregate expressions will use native aggregates for
-    // measure and dimensions
-    // Unknown aggregates & Expressions will use custom aggregator
-
-    aggExprs match {
-      case Some(a: Seq[Expression]) if !forceDetailedQuery =>
-        a.foreach {
-          case attr@AttributeReference(_, _, _, _) => // Add all the references to carbon query
-            addCarbonColumn(attr)
-            outputColumns += attr
-          case al@ Alias(agg: AggregateExpression, name) =>
-            queryOrder = processAggregateExpr(plan, agg, queryOrder, aggCount)
-          case _ => forceDetailedQuery = true
-        }
-      case _ => forceDetailedQuery = true
-    }
-
-    def addCarbonColumn(attr: Attribute): Unit = {
-      val carbonDimension = selectedDims
-        .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-      if (carbonDimension.nonEmpty) {
-        val dim = new QueryDimension(attr.name)
-        dim.setQueryOrder(queryOrder)
-        plan.addDimension(dim)
-        queryOrder = queryOrder + 1
-      } else {
-        val carbonMeasure = selectedMsrs
-          .filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-        if (carbonMeasure.nonEmpty) {
-          // added by vishal as we are adding for dimension so need to add to measure list
-          // Carbon does not support group by on measure column so throwing exception to
-          // make it detail query
-          throw new
-              Exception("Some Aggregate functions cannot be pushed, force to detailequery")
-        }
-        else {
-          // Some unknown attribute name is found. this may be a derived column.
-          // So, let's fall back to detailed query flow
-          throw new Exception(
-            "Some attributes referred looks derived columns. So, force to detailequery " +
-            attr.name)
-        }
-      }
-    }
-
-    if (forceDetailedQuery) {
-      // First clear the model if Msrs, Expressions and AggDimAggInfo filled
-      plan.getDimensions.clear()
-      plan.getMeasures.clear()
-      plan.getDimAggregatorInfos.clear()
-
-      // Fill the selected dimensions & measures obtained from
-      // attributes to query plan  for detailed query
-      selectedDims.foreach(plan.addDimension)
-      selectedMsrs.foreach(plan.addMeasure)
-    }
-    else {
-      attributes.foreach { attr =>
-        if (!outputColumns.exists(_.name.equals(attr.name))) {
-          addCarbonColumn(attr)
-          outputColumns += attr
-        }
       }
-      attributes = outputColumns
-    }
-
-    val orderList = new ArrayList[QueryDimension]()
-
-    var allSortExprPushed = true
-    sortExprs match {
-      case Some(a: Seq[SortOrder]) =>
-        a.foreach {
-          case SortOrder(Sum(attr: AttributeReference), order) => plan.getMeasures
-            .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
-            .setSortOrder(getSortDirection(order))
-          case SortOrder(CarbonCount(attr: AttributeReference, _), order) => plan.getMeasures
-            .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
-            .setSortOrder(getSortDirection(order))
-          case SortOrder(CarbonAverage(attr: AttributeReference), order) => plan.getMeasures
-            .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name)).head
-            .setSortOrder(getSortDirection(order))
-          case SortOrder(attr: AttributeReference, order) =>
-            val dim = plan.getDimensions
-              .asScala.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-            if (dim.nonEmpty) {
-              dim.head.setSortOrder(getSortDirection(order))
-              orderList.add(dim.head)
-            } else {
-              allSortExprPushed = false
+    // Just find out that any aggregation functions are present on dimensions.
+    aggExprsRaw match {
+      case Some(aggExprs) =>
+        aggExprs.foreach {
+          case Alias(agg: AggregateExpression, name) =>
+            agg.collect {
+              case attr: AttributeReference =>
+                val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
+                if(dims.nonEmpty) {
+                  plan.addAggDimAggInfo(dims.head.getColumnName,
+                    dims.head.getAggregateFunction,
+                    dims.head.getQueryOrder)
+                }
             }
-          case _ => allSortExprPushed = false;
+          case _ =>
         }
       case _ =>
     }
 
-    plan.setSortedDimemsions(orderList)
+    // Fill the selected dimensions & measures obtained from
+    // attributes to query plan  for detailed query
+    selectedDims.foreach(plan.addDimension)
+    selectedMsrs.foreach(plan.addMeasure)
 
-    // limit can be pushed down only if sort is not present or all sort expressions are pushed
-    if (sortExprs.isEmpty && forceDetailedQuery) {
-      limitExpr match {
-        case Some(IntegerLiteral(limit)) =>
-          // if (plan.getMeasures.size() == 0 && plan.getDimAggregatorInfos.size() == 0) {
-          plan.setLimit(limit)
-        // }
-        case _ =>
-      }
-    }
-    plan.setDetailQuery(forceDetailedQuery)
+    plan.setSortedDimemsions(new ArrayList[QueryDimension])
+
+    plan.setRawDetailQuery(true)
     plan.setOutLocationPath(
       CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
     plan.setQueryId(System.nanoTime() + "")
-    if (dimensionPredicates.nonEmpty) {
-      val exps = preProcessExpressions(dimensionPredicates)
-      val expressionVal = transformExpression(exps.head)
-      // adding dimension used in expression in querystats
-      expressionVal.getChildren.asScala.filter { x => x.isInstanceOf[CarbonColumnExpression] }
-        .map { y => allDims += y.asInstanceOf[CarbonColumnExpression].getColumnName }
-      plan.setFilterExpression(expressionVal)
-    }
+    processFilterExpressions(plan)
     plan
   }
 
-  def preProcessExpressions(expressions: Seq[Expression]): Seq[Expression] = {
-    expressions match {
-      case left :: right :: rest => preProcessExpressions(List(And(left, right)) ::: rest)
-      case List(left, right) => List(And(left, right))
-
-      case _ => expressions
-    }
-  }
-
-  def transformExpression(expr: Expression): CarbonExpression = {
-    expr match {
-      case Or(left, right) => new
-          OrExpression(transformExpression(left), transformExpression(right))
-      case And(left, right) => new
-          AndExpression(transformExpression(left), transformExpression(right))
-      case EqualTo(left, right) => new
-          EqualToExpression(transformExpression(left), transformExpression(right))
-      case Not(EqualTo(left, right)) => new
-          NotEqualsExpression(transformExpression(left), transformExpression(right))
-      case IsNotNull(child) => new
-          NotEqualsExpression(transformExpression(child), transformExpression(Literal(null)))
-      case Not(In(left, right)) => new NotInExpression(transformExpression(left),
-        new ListExpression(right.map(transformExpression).asJava))
-      case In(left, right) => new InExpression(transformExpression(left),
-        new ListExpression(right.map(transformExpression).asJava))
-      case Add(left, right) => new
-          AddExpression(transformExpression(left), transformExpression(right))
-      case Subtract(left, right) => new
-          SubstractExpression(transformExpression(left), transformExpression(right))
-      case Multiply(left, right) => new
-          MultiplyExpression(transformExpression(left), transformExpression(right))
-      case Divide(left, right) => new
-          DivideExpression(transformExpression(left), transformExpression(right))
-      case GreaterThan(left, right) => new
-          GreaterThanExpression(transformExpression(left), transformExpression(right))
-      case LessThan(left, right) => new
-          LessThanExpression(transformExpression(left), transformExpression(right))
-      case GreaterThanOrEqual(left, right) => new
-          GreaterThanEqualToExpression(transformExpression(left), transformExpression(right))
-      case LessThanOrEqual(left, right) => new
-          LessThanEqualToExpression(transformExpression(left), transformExpression(right))
-      // convert StartWith('abc') or like(col 'abc%') to col >= 'abc' and col < 'abd'
-      case StartsWith(left, right @ Literal(pattern, dataType)) if (pattern.toString.size > 0) =>
-        val l = new GreaterThanEqualToExpression(
-          transformExpression(left), transformExpression(right))
-        val value = pattern.toString
-        val maxValueLimit = value.substring(0, value.length - 1) +
-          (value.charAt(value.length - 1).toInt + 1).toChar
-        val r = new LessThanExpression(
-          transformExpression(left),
-            new CarbonLiteralExpression(maxValueLimit,
-              CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
-        new AndExpression(l, r)
-      case AttributeReference(name, dataType, _, _) => new CarbonColumnExpression(name.toString,
-        CarbonScalaUtil.convertSparkToCarbonDataType(dataType))
-      case Literal(name, dataType) => new
-          CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType))
-      case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
-      case aggExpr: AggregateExpression =>
-          throw new UnsupportedOperationException(s"Cannot evaluate expression: $aggExpr")
-      case _ =>
-        new SparkUnknownExpression(expr.transform {
-          case AttributeReference(name, dataType, _, _) =>
-            CarbonBoundReference(new CarbonColumnExpression(name.toString,
-              CarbonScalaUtil.convertSparkToCarbonDataType(dataType)), dataType, expr.nullable)
-        })
+  def processFilterExpressions(plan: CarbonQueryPlan) {
+    if (dimensionPredicatesRaw.nonEmpty) {
+      val expressionVal = CarbonFilters.processExpression(
+        dimensionPredicatesRaw,
+        attributesNeedToDecode,
+        unprocessedExprs,
+        carbonTable)
+      expressionVal match {
+        case Some(ce) =>
+          // adding dimension used in expression in querystats
+          plan.setFilterExpression(ce)
+        case _ =>
+      }
     }
+    processExtraAttributes(plan)
   }
 
-  private def getSortDirection(sort: SortDirection) = {
-    sort match {
-      case Ascending => SortOrderType.ASC
-      case Descending => SortOrderType.DSC
+  private def processExtraAttributes(plan: CarbonQueryPlan) {
+    if (attributesNeedToDecode.size() > 0) {
+      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+
+      attributesNeedToDecode.asScala.map { attr =>
+        val dims = plan.getDimensions.asScala.filter(f => f.getColumnName.equals(attr.name))
+        val msrs = plan.getMeasures.asScala.filter(f => f.getColumnName.equals(attr.name))
+        var order = plan.getDimensions.size() + plan.getMeasures.size()
+        if (dims.isEmpty && msrs.isEmpty) {
+          val dimension = carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+          if (dimension != null) {
+            val qDim = new QueryDimension(dimension.getColName)
+            qDim.setQueryOrder(order)
+            plan.addDimension(qDim)
+            attributeOut += attr
+            order += 1
+          } else {
+            val measure = carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
+            if (measure != null) {
+              val qMsr = new QueryMeasure(measure.getColName)
+              qMsr.setQueryOrder(order)
+              plan.addMeasure(qMsr)
+              order += 1
+              attributeOut += attr
+            }
+          }
+        }
+      }
+      attributesRaw = attributeOut
     }
   }
 
 
-  def addPushdownFilters(keys: Seq[Expression], filters: Array[Array[Expression]],
-      conditions: Option[Expression]) {
-
-    // TODO Values in the IN filter is duplicate. replace the list with set
-    val buffer = new ArrayBuffer[Expression]
-    keys.zipWithIndex.foreach { a =>
-      buffer += In(a._1, filters(a._2)).asInstanceOf[Expression]
-    }
-
-    // Let's not pushdown condition. Only filter push down is sufficient.
-    // Conditions can be applied on hash join result.
-    val cond = if (buffer.size > 1) {
-      val e = buffer.remove(0)
-      buffer.fold(e)(And(_, _))
-    } else {
-      buffer.asJava.get(0)
-    }
-
-    extraPreds = Seq(cond)
-  }
-
-  def inputRdd: CarbonQueryRDD[CarbonKey, CarbonValue] = {
-    val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    // Update the FilterExpressions with extra conditions added through join pushdown
-    if (extraPreds.nonEmpty) {attributes
-      val exps = preProcessExpressions(extraPreds)
-      val expressionVal = transformExpression(exps.head)
-      val oldExpressionVal = buildCarbonPlan.getFilterExpression
-      if (null == oldExpressionVal) {
-        buildCarbonPlan.setFilterExpression(expressionVal)
-      } else {
-        buildCarbonPlan.setFilterExpression(new AndExpression(oldExpressionVal, expressionVal))
-      }
-    }
+  def inputRdd: CarbonScanRDD[Array[Any], Any] = {
 
     val conf = new Configuration()
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
+    buildCarbonPlan.getDimAggregatorInfos.clear()
     val model = QueryModel.createModel(
       absoluteTableIdentifier, buildCarbonPlan, carbonTable)
-    val kv: KeyVal[CarbonKey, CarbonValue] = new KeyValImpl()
+    val kv: RawKey[Array[Any], Any] = new RawKeyImpl()
     // setting queryid
-    buildCarbonPlan.setQueryId(oc.getConf("queryId", System.nanoTime() + ""))
-
-    LOG.info("Selected Table to Query ****** "
-             + model.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
-
-    val cubeCreationTime = carbonCatalog.getCubeCreationTime(relation.schemaName, cubeName)
-    val schemaLastUpdatedTime =
-      carbonCatalog.getSchemaLastUpdatedTime(relation.schemaName, cubeName)
-    val big = new CarbonQueryRDD(
-      oc.sparkContext,
+    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
+
+    val cubeCreationTime = carbonCatalog
+      .getCubeCreationTime(relationRaw.schemaName, relationRaw.tableName)
+    val schemaLastUpdatedTime = carbonCatalog
+      .getSchemaLastUpdatedTime(relationRaw.schemaName, relationRaw.tableName)
+    val big = new CarbonScanRDD(
+      ocRaw.sparkContext,
       model,
       buildCarbonPlan.getFilterExpression,
       kv,
@@ -578,78 +207,35 @@ case class CarbonTableScan(
   }
 
 
-  override def outputsUnsafeRows: Boolean = false
+  override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
 
-  def doExecute(): RDD[InternalRow] = {
+  override def doExecute(): RDD[InternalRow] = {
     def toType(obj: Any): Any = {
       obj match {
         case s: String => UTF8String.fromString(s)
-        case avg: AbstractAvgAggregator =>
-          if (avg.isFirstTime) {
-            null
-          } else {
-            new GenericArrayData(avg.getAvgState.asInstanceOf[Array[Any]])
-          }
-        case c: CountAggregator => c.getLongValue
-        case s: SumDoubleAggregator => s.getDoubleValue
-        case s: SumBigDecimalAggregator => Decimal(s.getBigDecimalValue)
-        case s: SumLongAggregator => s.getLongValue
-        case m: MaxBigDecimalAggregator => Decimal(m.getBigDecimalValue)
-        case m: MaxLongAggregator => m.getLongValue
-        case m: MaxAggregator => toType(m.getValueObject)
-        case m: MinBigDecimalAggregator => Decimal(m.getBigDecimalValue)
-        case m: MinLongAggregator => m.getLongValue
-        case m: MinAggregator => toType(m.getValueObject)
-        case m: MeasureAggregator => toType(m.getValueObject)
         case _ => obj
       }
     }
+    val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
+    inputRdd.mapPartitions { iter =>
+      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+      new Iterator[InternalRow] {
+        override def hasNext: Boolean = iter.hasNext
 
-//    val unsafeProjection = UnsafeProjection.create(attributes.map(_.dataType).toArray)
-    // count(*) query executed in driver by querying from Btree
-    if (isCountQuery) {
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) =
-        QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
-      // get row count
-      val rowCount = carbonInputFormat.getRowCount(job)
-      val countAgg = new CountAggregator()
-      countAgg.setNewValue(rowCount)
-      sparkContext.parallelize(
-        Seq(new GenericMutableRow(Seq(countAgg.getLongValue).toArray.asInstanceOf[Array[Any]]))
-      )
-    } else {
-      // all the other queries are sent to executor
-      inputRdd.mapPartitions { iter =>
-        new Iterator[InternalRow] {
-//          val unsafeProjection = UnsafeProjection.create(attributes.map(_.dataType).toArray)
-          override def hasNext: Boolean = iter.hasNext
-
-          override def next(): InternalRow = {
-            new GenericMutableRow(iter.next()._1.getKey.map(toType))
+        override def next(): InternalRow =
+          if (outUnsafeRows) {
+            unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType)))
+          } else {
+            new GenericMutableRow(iter.next()._1.map(toType))
           }
-        }
       }
     }
   }
 
-  /**
-   * return true if query is count queryUtils
- *
-   * @return
-   */
-  def isCountQuery: Boolean = {
-    if (buildCarbonPlan.isCountStarQuery() && null == buildCarbonPlan.getFilterExpression &&
-        buildCarbonPlan.getDimensions.size() < 1 && buildCarbonPlan.getMeasures.size() < 2 &&
-        buildCarbonPlan.getDimAggregatorInfos.size() < 1) {
-      true
-    } else {
-      false
-    }
-  }
-
   def output: Seq[Attribute] = {
-    attributes
+    attributesRaw
   }
+
 }
 
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
deleted file mode 100644
index 2005300..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonRawOperators.scala
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.ArrayList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-import org.apache.spark.sql.types.{DataType, Decimal}
-import org.apache.spark.unsafe.types.UTF8String
-
-import org.carbondata.core.carbon.AbsoluteTableIdentifier
-import org.carbondata.core.constants.CarbonCommonConstants
-import org.carbondata.core.util.CarbonProperties
-import org.carbondata.query.carbon.model._
-import org.carbondata.query.carbon.result.BatchRawResult
-import org.carbondata.query.carbon.wrappers.ByteArrayWrapper
-import org.carbondata.spark.{CarbonFilters, RawKey, RawKeyImpl, RawKeyVal, RawKeyValImpl}
-import org.carbondata.spark.rdd.CarbonRawQueryRDD
-
-
-case class CarbonRawTableScan(
-    var attributesRaw: Seq[Attribute],
-    relationRaw: CarbonRelation,
-    dimensionPredicatesRaw: Seq[Expression],
-    aggExprsRaw: Option[Seq[Expression]],
-    useBinaryAggregator: Boolean)(@transient val ocRaw: SQLContext) extends LeafNode
-{
-  val carbonTable = relationRaw.metaData.carbonTable
-  val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
-  val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
-  @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
-
-  val attributesNeedToDecode = new java.util.HashSet[AttributeReference]()
-  val unprocessedExprs = new ArrayBuffer[Expression]()
-
-  val buildCarbonPlan: CarbonQueryPlan = {
-    val plan: CarbonQueryPlan = new CarbonQueryPlan(relationRaw.schemaName, relationRaw.tableName)
-
-    val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-    val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
-    val dimAttr = new Array[Attribute](dimensions.size())
-    val msrAttr = new Array[Attribute](measures.size())
-    attributesRaw.foreach { attr =>
-      val carbonDimension =
-        carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-      if(carbonDimension != null) {
-        dimAttr(dimensions.indexOf(carbonDimension)) = attr
-      } else {
-        val carbonMeasure =
-          carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-        if(carbonMeasure != null) {
-          msrAttr(measures.indexOf(carbonMeasure)) = attr
-        }
-      }
-    }
-
-    attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
-
-    var queryOrder: Integer = 0
-    attributesRaw.foreach { attr =>
-        val carbonDimension =
-          carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-        if (carbonDimension != null) {
-          val dim = new QueryDimension(attr.name)
-          dim.setQueryOrder(queryOrder)
-          queryOrder = queryOrder + 1
-          selectedDims += dim
-        } else {
-          val carbonMeasure =
-            carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-          if (carbonMeasure != null) {
-            val m1 = new QueryMeasure(attr.name)
-            m1.setQueryOrder(queryOrder)
-            queryOrder = queryOrder + 1
-            selectedMsrs += m1
-          }
-        }
-      }
-    // Just find out that any aggregation functions are present on dimensions.
-    aggExprsRaw match {
-      case Some(aggExprs) =>
-        aggExprs.foreach {
-          case Alias(agg: AggregateExpression, name) =>
-            agg.collect {
-              case attr: AttributeReference =>
-                val dims = selectedDims.filter(m => m.getColumnName.equalsIgnoreCase(attr.name))
-                if(dims.nonEmpty) {
-                  plan.addAggDimAggInfo(dims.head.getColumnName,
-                    dims.head.getAggregateFunction,
-                    dims.head.getQueryOrder)
-                }
-            }
-          case _ =>
-        }
-      case _ =>
-    }
-
-    // Fill the selected dimensions & measures obtained from
-    // attributes to query plan  for detailed query
-    selectedDims.foreach(plan.addDimension)
-    selectedMsrs.foreach(plan.addMeasure)
-
-    plan.setSortedDimemsions(new ArrayList[QueryDimension])
-
-    plan.setRawDetailQuery(true)
-    plan.setOutLocationPath(
-      CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION_HDFS))
-    plan.setQueryId(System.nanoTime() + "")
-    processFilterExpressions(plan)
-    plan
-  }
-
-  def processFilterExpressions(plan: CarbonQueryPlan) {
-    if (dimensionPredicatesRaw.nonEmpty) {
-      val expressionVal = CarbonFilters.processExpression(
-        dimensionPredicatesRaw,
-        attributesNeedToDecode,
-        unprocessedExprs,
-        carbonTable)
-      expressionVal match {
-        case Some(ce) =>
-          // adding dimension used in expression in querystats
-          plan.setFilterExpression(ce)
-        case _ =>
-      }
-    }
-    processExtraAttributes(plan)
-  }
-
-  private def processExtraAttributes(plan: CarbonQueryPlan) {
-    if (attributesNeedToDecode.size() > 0) {
-      val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
-
-      attributesNeedToDecode.asScala.map { attr =>
-        val dims = plan.getDimensions.asScala.filter(f => f.getColumnName.equals(attr.name))
-        val msrs = plan.getMeasures.asScala.filter(f => f.getColumnName.equals(attr.name))
-        var order = plan.getDimensions.size() + plan.getMeasures.size()
-        if (dims.isEmpty && msrs.isEmpty) {
-          val dimension = carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
-          if (dimension != null) {
-            val qDim = new QueryDimension(dimension.getColName)
-            qDim.setQueryOrder(order)
-            plan.addDimension(qDim)
-            attributeOut += attr
-            order += 1
-          } else {
-            val measure = carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name)
-            if (measure != null) {
-              val qMsr = new QueryMeasure(measure.getColName)
-              qMsr.setQueryOrder(order)
-              plan.addMeasure(qMsr)
-              order += 1
-              attributeOut += attr
-            }
-          }
-        }
-      }
-      attributesRaw = attributeOut
-    }
-  }
-
-
-  def inputRdd: CarbonRawQueryRDD[Array[Any], Any] = {
-
-    val conf = new Configuration()
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    buildCarbonPlan.getDimAggregatorInfos.clear()
-    val model = QueryModel.createModel(
-      absoluteTableIdentifier, buildCarbonPlan, carbonTable)
-    val kv: RawKey[Array[Any], Any] = new RawKeyImpl()
-    // setting queryid
-    buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
-    val cubeCreationTime = carbonCatalog
-      .getCubeCreationTime(relationRaw.schemaName, relationRaw.tableName)
-    val schemaLastUpdatedTime = carbonCatalog
-      .getSchemaLastUpdatedTime(relationRaw.schemaName, relationRaw.tableName)
-    val big = new CarbonRawQueryRDD(
-      ocRaw.sparkContext,
-      model,
-      buildCarbonPlan.getFilterExpression,
-      kv,
-      conf,
-      cubeCreationTime,
-      schemaLastUpdatedTime,
-      carbonCatalog.storePath)
-    big
-  }
-
-
-  override def outputsUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
-
-  override def doExecute(): RDD[InternalRow] = {
-    def toType(obj: Any): Any = {
-      obj match {
-        case s: String => UTF8String.fromString(s)
-        case _ => obj
-      }
-    }
-    val outUnsafeRows: Boolean = attributesNeedToDecode.size() == 0
-    inputRdd.mapPartitions { iter =>
-      val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
-      new Iterator[InternalRow] {
-        override def hasNext: Boolean = iter.hasNext
-
-        override def next(): InternalRow =
-          if (outUnsafeRows) {
-            unsafeProjection(new GenericMutableRow(iter.next()._1.map(toType)))
-          } else {
-            new GenericMutableRow(iter.next()._1.map(toType))
-          }
-      }
-    }
-  }
-
-  def output: Seq[Attribute] = {
-    attributesRaw
-  }
-
-}
-
-class CarbonRawMutableRow(values: Array[Array[Object]],
-    val schema: QuerySchemaInfo) extends GenericMutableRow(values.asInstanceOf[Array[Any]]) {
-
-  val dimsLen = schema.getQueryDimensions.length - 1
-  val order = schema.getQueryOrder
-  var counter = 0
-  val size = {
-    if (values.nonEmpty) {
-      values.head.length
-    } else {
-      0
-    }
-  }
-
-  def getKey: ByteArrayWrapper = values.head(counter).asInstanceOf[ByteArrayWrapper]
-
-  def parseKey(key: ByteArrayWrapper, aggData: Array[Object], order: Array[Int]): Array[Object] = {
-    BatchRawResult.parseData(key, aggData, schema, order)
-  }
-
-  def hasNext: Boolean = {
-    counter < size
-  }
-
-  def next(): Unit = {
-    counter += 1
-  }
-
-  override def numFields: Int = dimsLen + schema.getQueryMeasures.length
-
-  override def anyNull: Boolean = true
-
-  override def get(ordinal: Int, dataType: DataType): AnyRef = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[AnyRef]
-  }
-
-  override def getUTF8String(ordinal: Int): UTF8String = {
-    UTF8String
-      .fromString(values(
-        order(ordinal) - dimsLen)(counter)
-        .asInstanceOf[String])
-  }
-
-  override def getDouble(ordinal: Int): Double = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Double]
-  }
-
-  override def getFloat(ordinal: Int): Float = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Float]
-  }
-
-  override def getLong(ordinal: Int): Long = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Long]
-  }
-
-  override def getByte(ordinal: Int): Byte = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Byte]
-  }
-
-  override def getDecimal(ordinal: Int,
-      precision: Int,
-      scale: Int): Decimal = {
-    values(order(ordinal) - dimsLen)(counter).asInstanceOf[Decimal]
-  }
-
-  override def getBoolean(ordinal: Int): Boolean = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Boolean]
-  }
-
-  override def getShort(ordinal: Int): Short = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Short]
-  }
-
-  override def getInt(ordinal: Int): Int = {
-    values(order(ordinal) - dimsLen)(counter)
-      .asInstanceOf[Int]
-  }
-
-  override def isNullAt(ordinal: Int): Boolean = values(order(ordinal) - dimsLen)(counter) == null
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala
deleted file mode 100644
index 79ba157..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/CarbonJoins.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.joins
-
-import scala.concurrent._
-import scala.concurrent.duration._
-import scala.Array.canBuildFrom
-
-import org.apache.spark.{InternalAccumulator, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonTableScan
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, Literal}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
-import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.ThreadUtils
-
-case class BroadCastFilterPushJoin(
-    leftKeys: Seq[Expression],
-    rightKeys: Seq[Expression],
-    buildSide: BuildSide,
-    left: SparkPlan,
-    right: SparkPlan,
-    condition: Option[Expression]) extends BinaryNode with HashJoin {
-
-  override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
-
-  val timeout: Duration = {
-    val timeoutValue = sqlContext.conf.broadcastTimeout
-    if (timeoutValue < 0) {
-      Duration.Inf
-    } else {
-      timeoutValue.seconds
-    }
-  }
-  private lazy val (input: Array[InternalRow], inputCopy: Array[InternalRow]) = {
-    val numBuildRows = buildSide match {
-      case BuildLeft => longMetric("numLeftRows")
-      case BuildRight => longMetric("numRightRows")
-    }
-    val buildPlanOutput = buildPlan.execute()
-    val input: Array[InternalRow] = buildPlanOutput.map(_.copy()).collect()
-    val inputCopy: Array[InternalRow] = buildPlanOutput.map(_.copy()).collect()
-    (input, inputCopy)
-  }
-  // Use lazy so that we won't do broadcast when calling explain but still cache the broadcast value
-  // for the same query.
-  @transient
-  private lazy val broadcastFuture = {
-    // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
-    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    future {
-      // This will run in another thread. Set the execution id so that we can connect these jobs
-      // with the correct execution.
-      SQLExecution.withExecutionId(sparkContext, executionId) {
-        // The following line doesn't run in a job so we cannot track the metric value. However, we
-        // have already tracked it in the above lines. So here we can use
-        // `SQLMetrics.nullLongMetric` to ignore it.
-        val hashed = HashedRelation(
-          input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
-        sparkContext.broadcast(hashed)
-      }
-    }(BroadCastFilterPushJoin.broadcastHashJoinExecutionContext)
-  }
-
-  override def doExecute(): RDD[InternalRow] = {
-
-    val numOutputRows = longMetric("numOutputRows")
-    val (numBuildRows, numStreamedRows) = buildSide match {
-      case BuildLeft => (longMetric("numLeftRows"), longMetric("numRightRows"))
-      case BuildRight => (longMetric("numRightRows"), longMetric("numLeftRows"))
-    }
-
-    val keys = buildKeys.map { a =>
-      BindReferences.bindReference(a, buildPlan.output)
-    }.toArray
-    val filters = keys.map {
-      k =>
-        inputCopy.map(
-          r => {
-            val curr = k.eval(r)
-            if (curr.isInstanceOf[UTF8String]) {
-              Literal(curr.toString).asInstanceOf[Expression]
-            } else {
-              Literal(curr).asInstanceOf[Expression]
-            }
-          })
-    }
-    val carbonScan = buildSide match {
-      case BuildLeft => right
-      case BuildRight => left
-    }
-
-    val cubeScan = carbonScan.collectFirst { case a: CarbonTableScan => a }
-    if (cubeScan.isDefined) {
-      cubeScan.get.addPushdownFilters(streamedKeys, filters, condition)
-    }
-
-    val streamedPlanOutput = streamedPlan.execute()
-    // scalastyle:off
-    val broadcastRelation = Await.result(broadcastFuture, timeout)
-    // scalastyle:on
-    streamedPlanOutput.mapPartitions { streamedIter =>
-      val hashedRelation = broadcastRelation.value
-      hashedRelation match {
-        case unsafe: UnsafeHashedRelation =>
-          TaskContext.get().internalMetricsToAccumulators(
-            InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
-        case _ =>
-      }
-      hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
-    }
-
-  }
-}
-
-object BroadCastFilterPushJoin {
-
-  private[joins] val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("filterpushhash-join", 128))
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
deleted file mode 100644
index c01a937..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.spark.sql.hive
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
-import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{Filter, Project, SparkPlan}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
-
-import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
-
-class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
-
-  override def strategies: Seq[Strategy] = getStrategies
-
-  val LOGGER = LogServiceFactory.getLogService("CarbonRawStrategies")
-
-  def getStrategies: Seq[Strategy] = {
-    val total = sqlContext.planner.strategies :+ CarbonRawTableScans
-    total
-  }
-
-  /**
-   * Carbon strategies for Carbon cube scanning
-   */
-  private[sql] object CarbonRawTableScans extends Strategy {
-
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = {
-      plan match {
-        case PhysicalOperation(projectList, predicates,
-        l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) =>
-          if (isStarQuery(plan)) {
-            carbonRawScanForStarQuery(projectList, predicates, carbonRelation, l)(sqlContext) :: Nil
-          } else {
-            carbonRawScan(projectList,
-              predicates,
-              carbonRelation,
-              l,
-              None,
-              detailQuery = true,
-              useBinaryAggregation = false)(sqlContext)._1 :: Nil
-          }
-        case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
-          CarbonDictionaryDecoder(relations,
-            profile,
-            aliasMap,
-            planLater(child))(sqlContext) :: Nil
-        case _ =>
-          Nil
-      }
-    }
-
-    /**
-     * Create carbon scan
-     */
-    private def carbonRawScan(projectList: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        relation: CarbonDatasourceRelation,
-        logicalRelation: LogicalRelation,
-        groupExprs: Option[Seq[Expression]],
-        detailQuery: Boolean,
-        useBinaryAggregation: Boolean)(sc: SQLContext): (SparkPlan, Boolean) = {
-
-      val tableName: String =
-        relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
-      // Check out any expressions are there in project list. if they are present then we need to
-      // decode them as well.
-      val projectSet = AttributeSet(projectList.flatMap(_.references))
-      val scan = CarbonRawTableScan(projectSet.toSeq,
-        relation.carbonRelation,
-        predicates,
-        groupExprs,
-        useBinaryAggregation)(sqlContext)
-      val dimAggrsPresence: Boolean = scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0
-      projectList.map {
-        case attr: AttributeReference =>
-        case Alias(attr: AttributeReference, _) =>
-        case others =>
-          others.references
-            .map(f => scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]))
-      }
-      if (!detailQuery) {
-        if (scan.attributesNeedToDecode.size > 0) {
-          val decoder = getCarbonDecoder(logicalRelation,
-            sc,
-            tableName,
-            scan.attributesNeedToDecode.asScala.toSeq,
-            scan)
-          if (scan.unprocessedExprs.nonEmpty) {
-            val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-            (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true)
-          } else {
-            (Project(projectList, decoder), true)
-          }
-        } else {
-          (scan, dimAggrsPresence)
-        }
-      } else {
-        if (scan.attributesNeedToDecode.size() > 0) {
-          val decoder = getCarbonDecoder(logicalRelation,
-            sc,
-            tableName,
-            scan.attributesNeedToDecode.asScala.toSeq,
-            scan)
-          if (scan.unprocessedExprs.nonEmpty) {
-            val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-            (Project(projectList, filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)), true)
-          } else {
-            (Project(projectList, decoder), true)
-          }
-        } else {
-          (Project(projectList, scan), dimAggrsPresence)
-        }
-      }
-    }
-
-    /**
-     * Create carbon scan for star query
-     */
-    private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        relation: CarbonDatasourceRelation,
-        logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
-
-      val tableName: String =
-        relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
-      // Check out any expressions are there in project list. if they are present then we need to
-      // decode them as well.
-      val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
-      val scan = CarbonRawTableScan(projectList.map(_.toAttribute),
-        relation.carbonRelation,
-        predicates,
-        None,
-        useBinaryAggregator = false)(sqlContext)
-      projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
-      if (projectExprsNeedToDecode.size() > 0) {
-        val decoder = getCarbonDecoder(logicalRelation,
-          sc,
-          tableName,
-          projectExprsNeedToDecode.asScala.toSeq,
-          scan)
-        if (scan.unprocessedExprs.nonEmpty) {
-          val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
-          filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
-        } else {
-          decoder
-        }
-      } else {
-        scan
-      }
-    }
-
-    def getCarbonDecoder(logicalRelation: LogicalRelation,
-        sc: SQLContext,
-        tableName: String,
-        projectExprsNeedToDecode: Seq[Attribute],
-        scan: CarbonRawTableScan): CarbonDictionaryDecoder = {
-      val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
-        logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
-      val attrs = projectExprsNeedToDecode.map { attr =>
-        val newAttr = AttributeReference(attr.name,
-          attr.dataType,
-          attr.nullable,
-          attr.metadata)(attr.exprId, Seq(tableName))
-        relation.addAttribute(newAttr)
-        newAttr
-      }
-      CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
-        CarbonAliasDecoderRelation(), scan)(sc)
-    }
-
-    private def isStarQuery(plan: LogicalPlan) = {
-      plan match {
-        case LogicalFilter(condition,
-        LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => true
-        case LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) => true
-        case _ => false
-      }
-    }
-
-    private def isGroupByPresentOnMeasures(groupingExpressions: Seq[Expression],
-        carbonTable: CarbonTable): Boolean = {
-      groupingExpressions.map { g =>
-       g.collect {
-         case attr: AttributeReference
-           if carbonTable.getMeasureByName(carbonTable.getFactTableName, attr.name) != null =>
-           return true
-       }
-      }
-      false
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
new file mode 100644
index 0000000..edfaa90
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSQLDialect.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.CarbonSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.carbondata.spark.exception.MalformedCarbonCommandException
+
+private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialect(context) {
+
+  @transient
+  protected val sqlParser = new CarbonSqlParser
+
+  override def parse(sqlText: String): LogicalPlan = {
+
+    try {
+      sqlParser.parse(sqlText)
+    } catch {
+      // MalformedCarbonCommandException need to throw directly
+      // because hive can no parse carbon command
+      case ce: MalformedCarbonCommandException =>
+        throw ce
+      case _ => super.parse(sqlText)
+    }
+  }
+}