You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:50 UTC

[02/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported Spark 1.6 in Carbondata (#670)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 84f050e..30a1070 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -30,6 +30,7 @@ import scala.util.parsing.combinator.RegexParsers
 
 import org.apache.spark
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.{AggregateTableAttributes, Partitioner}
@@ -120,28 +121,13 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
     false
   }
 
-  def lookupRelation1(
-      databaseName: Option[String],
-      tableName: String,
-      alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
-    val db = databaseName match {
-      case Some(name) => name
-      case _ => null
-    }
-    if (db == null) {
-      lookupRelation2(Seq(tableName), alias)(sqlContext)
-    } else {
-      lookupRelation2(Seq(db, tableName), alias)(sqlContext)
-    }
-  }
-
-  override def lookupRelation(tableIdentifier: Seq[String],
+  override def lookupRelation(tableIdentifier: TableIdentifier,
       alias: Option[String] = None): LogicalPlan = {
     try {
       super.lookupRelation(tableIdentifier, alias)
     } catch {
       case s: java.lang.Exception =>
-        lookupRelation2(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
+        lookupRelation1(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
     }
   }
 
@@ -153,64 +139,34 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
     cubeCreationTime
   }
 
+  def lookupRelation1(dbName: Option[String],
+      tableName: String)(sqlContext: SQLContext): LogicalPlan = {
+    lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
+  }
 
-  def lookupRelation2(tableIdentifier: Seq[String],
+  def lookupRelation1(tableIdentifier: TableIdentifier,
       alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
     checkSchemasModifiedTimeAndReloadCubes()
-    tableIdentifier match {
-      case Seq(schemaName, cubeName) =>
-        val cubes = metadata.cubesMeta.filter(
-          c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
-               c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
-        if (cubes.nonEmpty) {
-          CarbonRelation(schemaName, cubeName,
-            CarbonSparkUtil.createSparkMeta(cubes.head.carbonTable), cubes.head, alias)(sqlContext)
-        } else {
-          LOGGER.audit(s"Table Not Found: $schemaName $cubeName")
-          throw new NoSuchTableException
-        }
-      case Seq(cubeName) =>
-        val currentDatabase = getDB.getDatabaseName(None, sqlContext)
-        val cubes = metadata.cubesMeta.filter(
-          c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
-               c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
-        if (cubes.nonEmpty) {
-          CarbonRelation(currentDatabase, cubeName,
-            CarbonSparkUtil.createSparkMeta(cubes.head.carbonTable), cubes.head, alias)(sqlContext)
-        } else {
-          LOGGER.audit(s"Table Not Found: $cubeName")
-          throw new NoSuchTableException
-        }
-      case _ =>
-        LOGGER.audit(s"Table Not Found: $tableIdentifier")
-        throw new NoSuchTableException
-    }
-  }
-
-  def cubeExists(db: Option[String], tableName: String)(sqlContext: SQLContext): Boolean = {
-    if (db.isEmpty || db.get == null || db.get == "") {
-      cubeExists(Seq(tableName))(sqlContext)
+    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val cubes = metadata.cubesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    if (cubes.nonEmpty) {
+      CarbonRelation(database, tableIdentifier.table,
+        CarbonSparkUtil.createSparkMeta(cubes.head.carbonTable), cubes.head, alias)(sqlContext)
     } else {
-      cubeExists(Seq(db.get, tableName))(sqlContext)
+      LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
+      throw new NoSuchTableException
     }
   }
 
-  def cubeExists(tableIdentifier: Seq[String])(sqlContext: SQLContext): Boolean = {
+  def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
     checkSchemasModifiedTimeAndReloadCubes()
-    tableIdentifier match {
-      case Seq(schemaName, cubeName) =>
-        val cubes = metadata.cubesMeta.filter(
-          c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
-               c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
-        cubes.nonEmpty
-      case Seq(cubeName) =>
-        val currentDatabase = getDB.getDatabaseName(None, sqlContext)
-        val cubes = metadata.cubesMeta.filter(
-          c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
-               c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
-        cubes.nonEmpty
-      case _ => false
-    }
+    val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+    val cubes = metadata.cubesMeta.filter(
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+    cubes.nonEmpty
   }
 
   def loadMetadata(metadataPath: String): MetaData = {
@@ -338,7 +294,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
       dbName: String, tableName: String, partitioner: Partitioner)
     (sqlContext: SQLContext): String = {
 
-    if (cubeExists(Seq(dbName, tableName))(sqlContext)) {
+    if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
       sys.error(s"Table [$tableName] already exists under Database [$dbName]")
     }
 
@@ -506,7 +462,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
   /**
    * Shows all schemas which has Database name like
    */
-  def showSchemas(schemaLike: Option[String]): Seq[String] = {
+  def showDatabases(schemaLike: Option[String]): Seq[String] = {
     checkSchemasModifiedTimeAndReloadCubes()
     metadata.cubesMeta.map { c =>
       schemaLike match {
@@ -526,7 +482,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
   /**
    * Shows all cubes for given schema.
    */
-  def getCubes(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
+  def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
 
     val schemaName = databaseName
       .getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
@@ -539,21 +495,25 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
   /**
    * Shows all cubes in all schemas.
    */
-  def getAllCubes()(sqlContext: SQLContext): Seq[(String, String)] = {
+  def getAllTables()(sqlContext: SQLContext): Seq[TableIdentifier] = {
     checkSchemasModifiedTimeAndReloadCubes()
-    metadata.cubesMeta
-      .map { c => (c.carbonTableIdentifier.getDatabaseName, c.carbonTableIdentifier.getTableName) }
+    metadata.cubesMeta.map { c =>
+        TableIdentifier(c.carbonTableIdentifier.getTableName,
+          Some(c.carbonTableIdentifier.getDatabaseName))
+    }
   }
 
-  def dropCube(partitionCount: Int, tableStorePath: String, schemaName: String, cubeName: String)
+  def dropTable(partitionCount: Int, tableStorePath: String, tableIdentifier: TableIdentifier)
     (sqlContext: SQLContext) {
-    if (!cubeExists(Seq(schemaName, cubeName))(sqlContext)) {
-      LOGGER.audit(s"Drop Table failed. Table with $schemaName.$cubeName does not exist")
-      sys.error(s"Table with $schemaName.$cubeName does not exist")
+    val dbName = tableIdentifier.database.get
+    val tableName = tableIdentifier.table
+    if (!tableExists(tableIdentifier)(sqlContext)) {
+      LOGGER.audit(s"Drop Table failed. Table with ${dbName}.$tableName does not exist")
+      sys.error(s"Table with $dbName.$tableName does not exist")
     }
 
     val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .getCarbonTable(schemaName + "_" + cubeName)
+      .getCarbonTable(dbName + "_" + tableName)
 
     if (null != carbonTable) {
       val metadatFilePath = carbonTable.getMetaDataFilepath
@@ -561,12 +521,12 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
 
       if (FileFactory.isFileExist(metadatFilePath, fileType)) {
         val file = FileFactory.getCarbonFile(metadatFilePath, fileType)
-        CarbonUtil.renameCubeForDeletion(partitionCount, tableStorePath, schemaName, cubeName)
+        CarbonUtil.renameCubeForDeletion(partitionCount, tableStorePath, dbName, tableName)
         CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
       }
 
       val partitionLocation = tableStorePath + File.separator + "partition" + File.separator +
-                              schemaName + File.separator + cubeName
+                              dbName + File.separator + tableName
       val partitionFileType = FileFactory.getFileType(partitionLocation)
       if (FileFactory.isFileExist(partitionLocation, partitionFileType)) {
         CarbonUtil
@@ -575,20 +535,20 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
     }
 
     try {
-      sqlContext.sql(s"DROP TABLE $schemaName.$cubeName").collect()
+      sqlContext.sql(s"DROP TABLE $dbName.$tableName").collect()
     } catch {
       case e: Exception =>
         LOGGER.audit(
-          s"Error While deleting the table $schemaName.$cubeName during drop Table" + e.getMessage)
+          s"Error While deleting the table $dbName.$tableName during drop Table" + e.getMessage)
     }
 
     metadata.cubesMeta -= metadata.cubesMeta.filter(
-      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
-           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))(0)
+      c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) &&
+           c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
     org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
-      .removeTable(schemaName + "_" + cubeName)
-    logInfo(s"Table $cubeName of $schemaName Database dropped syccessfully.")
-    LOGGER.info("Table " + cubeName + " of " + schemaName + " Database dropped syccessfully.")
+      .removeTable(dbName + "_" + tableName)
+    logInfo(s"Table $tableName of $dbName Database dropped syccessfully.")
+    LOGGER.info("Table " + tableName + " of " + dbName + " Database dropped syccessfully.")
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
index b1c3924..c01a937 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
@@ -63,19 +63,7 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
               detailQuery = true,
               useBinaryAggregation = false)(sqlContext)._1 :: Nil
           }
-
-        case catalyst.planning.PartialAggregation(
-        namedGroupingAttributes,
-        rewrittenAggregateExpressions,
-        groupingExpressions,
-        partialComputation,
-        PhysicalOperation(projectList, predicates,
-        l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
-          handleRawAggregation(plan, plan, projectList, predicates, carbonRelation,
-            l, partialComputation, groupingExpressions, namedGroupingAttributes,
-            rewrittenAggregateExpressions)
-        case CarbonDictionaryCatalystDecoder(relations, profile,
-               aliasMap, _, child) =>
+        case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
           CarbonDictionaryDecoder(relations,
             profile,
             aliasMap,
@@ -85,47 +73,6 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
       }
     }
 
-
-    def handleRawAggregation(plan: LogicalPlan,
-        aggPlan: LogicalPlan,
-        projectList: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        carbonRelation: CarbonDatasourceRelation,
-        logicalRelation: LogicalRelation,
-        partialComputation: Seq[NamedExpression],
-        groupingExpressions: Seq[Expression],
-        namedGroupingAttributes: Seq[Attribute],
-        rewrittenAggregateExpressions: Seq[NamedExpression]):
-    Seq[SparkPlan] = {
-      val groupByPresentOnMsr = isGroupByPresentOnMeasures(groupingExpressions,
-        carbonRelation.carbonRelation.metaData.carbonTable)
-      if(!groupByPresentOnMsr) {
-        val s = carbonRawScan(projectList,
-          predicates,
-          carbonRelation,
-          logicalRelation,
-          Some(partialComputation),
-          detailQuery = false,
-          useBinaryAggregation = true)(sqlContext)
-        // If any aggregate function present on dimnesions then don't use this plan.
-        if (!s._2) {
-          CarbonAggregate(
-            partial = false,
-            namedGroupingAttributes,
-            rewrittenAggregateExpressions,
-            CarbonRawAggregate(
-              partial = true,
-              groupingExpressions,
-              partialComputation,
-              s._1))(sqlContext) :: Nil
-        } else {
-          Nil
-        }
-      } else {
-        Nil
-      }
-    }
-
     /**
      * Create carbon scan
      */
@@ -141,13 +88,6 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
         relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
       // Check out any expressions are there in project list. if they are present then we need to
       // decode them as well.
-      val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
-      projectList.map {
-        case attr: AttributeReference =>
-        case Alias(attr: AttributeReference, _) =>
-        case others =>
-          others.references.map(f => projectExprsNeedToDecode.add(f))
-      }
       val projectSet = AttributeSet(projectList.flatMap(_.references))
       val scan = CarbonRawTableScan(projectSet.toSeq,
         relation.carbonRelation,
@@ -155,13 +95,19 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
         groupExprs,
         useBinaryAggregation)(sqlContext)
       val dimAggrsPresence: Boolean = scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0
-      projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
+      projectList.map {
+        case attr: AttributeReference =>
+        case Alias(attr: AttributeReference, _) =>
+        case others =>
+          others.references
+            .map(f => scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]))
+      }
       if (!detailQuery) {
-        if (projectExprsNeedToDecode.size > 0) {
+        if (scan.attributesNeedToDecode.size > 0) {
           val decoder = getCarbonDecoder(logicalRelation,
             sc,
             tableName,
-            projectExprsNeedToDecode.asScala.toSeq,
+            scan.attributesNeedToDecode.asScala.toSeq,
             scan)
           if (scan.unprocessedExprs.nonEmpty) {
             val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
@@ -173,11 +119,11 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
           (scan, dimAggrsPresence)
         }
       } else {
-        if (projectExprsNeedToDecode.size() > 0) {
+        if (scan.attributesNeedToDecode.size() > 0) {
           val decoder = getCarbonDecoder(logicalRelation,
             sc,
             tableName,
-            projectExprsNeedToDecode.asScala.toSeq,
+            scan.attributesNeedToDecode.asScala.toSeq,
             scan)
           if (scan.unprocessedExprs.nonEmpty) {
             val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 6c7e362..56deb8a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -21,12 +21,15 @@ package org.apache.spark.sql.hive
 import scala.math.BigInt.int2bigInt
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation, QueryPlanner}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Limit, LogicalPlan, Sort}
 import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, ExecutedCommand, Filter, Project, SparkPlan}
+import org.apache.spark.sql.execution.aggregate.Utils
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.execution.joins.{BroadCastFilterPushJoin, BuildLeft, BuildRight}
@@ -75,44 +78,35 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
 
       case Limit(IntegerLiteral(limit),
       Sort(order, _,
-      p@PartialAggregation(namedGroupingAttributes,
-      rewrittenAggregateExpressions,
-      groupingExpressions,
-      partialComputation,
+      p@CarbonAggregation(groupingExpressions,
+      aggregateExpressions,
       PhysicalOperation(
       projectList,
       predicates,
       l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))))) =>
         val aggPlan = handleAggregation(plan, p, projectList, predicates, carbonRelation,
-          partialComputation, groupingExpressions, namedGroupingAttributes,
-          rewrittenAggregateExpressions)
+          groupingExpressions, aggregateExpressions)
         org.apache.spark.sql.execution.TakeOrderedAndProject(limit,
           order,
           None,
           aggPlan.head) :: Nil
 
-      case Limit(IntegerLiteral(limit), p@PartialAggregation(
-      namedGroupingAttributes,
-      rewrittenAggregateExpressions,
+      case Limit(IntegerLiteral(limit), p@CarbonAggregation(
       groupingExpressions,
-      partialComputation,
+      aggregateExpressions,
       PhysicalOperation(projectList, predicates,
       l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))) =>
         val aggPlan = handleAggregation(plan, p, projectList, predicates, carbonRelation,
-          partialComputation, groupingExpressions, namedGroupingAttributes,
-          rewrittenAggregateExpressions)
+          groupingExpressions, aggregateExpressions)
         org.apache.spark.sql.execution.Limit(limit, aggPlan.head) :: Nil
 
-      case PartialAggregation(
-      namedGroupingAttributes,
-      rewrittenAggregateExpressions,
+      case CarbonAggregation(
       groupingExpressions,
-      partialComputation,
+      aggregateExpressions,
       PhysicalOperation(projectList, predicates,
       l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
         handleAggregation(plan, plan, projectList, predicates, carbonRelation,
-          partialComputation, groupingExpressions, namedGroupingAttributes,
-          rewrittenAggregateExpressions)
+          groupingExpressions, aggregateExpressions)
 
       case Limit(IntegerLiteral(limit),
       PhysicalOperation(projectList, predicates,
@@ -188,59 +182,138 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         projectList: Seq[NamedExpression],
         predicates: Seq[Expression],
         carbonRelation: CarbonDatasourceRelation,
-        partialComputation: Seq[NamedExpression],
         groupingExpressions: Seq[Expression],
-        namedGroupingAttributes: Seq[Attribute],
-        rewrittenAggregateExpressions: Seq[NamedExpression]):
+        namedGroupingAttributes: Seq[NamedExpression]):
     Seq[SparkPlan] = {
       val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan)
 
       val s =
         try {
           carbonScan(projectList, predicates, carbonRelation.carbonRelation,
-            Some(partialComputation), substitutesortExprs, limitExpr, groupingExpressions.nonEmpty)
+            Some(namedGroupingAttributes), substitutesortExprs,
+            limitExpr, groupingExpressions.nonEmpty)
         } catch {
           case e: Exception => null
         }
 
       if (s != null) {
-        CarbonAggregate(
-          partial = false,
-          namedGroupingAttributes,
-          rewrittenAggregateExpressions,
-          CarbonAggregate(
-            partial = true,
-            groupingExpressions,
-            partialComputation,
-            s)(sqlContext))(sqlContext) :: Nil
+        aggregatePlan(groupingExpressions, namedGroupingAttributes, s)
 
       } else {
         (aggPlan, true) match {
-          case PartialAggregation(
-          namedGroupingAttributes,
-          rewrittenAggregateExpressions,
+          case CarbonAggregation(
           groupingExpressions,
-          partialComputation,
+          namedGroupingAttributes,
           PhysicalOperation(projectList, predicates,
           l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
             val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan)
 
 
             val s = carbonScan(projectList, predicates, carbonRelation.carbonRelation,
-              Some(partialComputation), substitutesortExprs, limitExpr,
+              Some(namedGroupingAttributes), substitutesortExprs, limitExpr,
               groupingExpressions.nonEmpty, detailQuery = true)
 
-            CarbonAggregate(
-              partial = false,
-              namedGroupingAttributes,
-              rewrittenAggregateExpressions,
-              CarbonAggregate(
-                partial = true,
-                groupingExpressions,
-                partialComputation,
-                s)(sqlContext))(sqlContext) :: Nil
+            aggregatePlan(groupingExpressions, namedGroupingAttributes, s)
+        }
+      }
+    }
+
+    // TODO: It is duplicated code from spark. Need to find a way
+    private def aggregatePlan(groupingExpressions: Seq[Expression],
+        resultExpressions: Seq[NamedExpression],
+        child: SparkPlan) = {
+      // A single aggregate expression might appear multiple times in resultExpressions.
+      // In order to avoid evaluating an individual aggregate function multiple times, we'll
+      // build a set of the distinct aggregate expressions and build a function which can
+      // be used to re-write expressions so that they reference the single copy of the
+      // aggregate function which actually gets computed.
+      val aggregateExpressions = resultExpressions.flatMap { expr =>
+        expr.collect {
+          case agg: AggregateExpression => agg
         }
+      }.distinct
+      // For those distinct aggregate expressions, we create a map from the
+      // aggregate function to the corresponding attribute of the function.
+      val aggregateFunctionToAttribute = aggregateExpressions.map { agg =>
+        val aggregateFunction = agg.aggregateFunction
+        val attribute = Alias(aggregateFunction, aggregateFunction.toString)().toAttribute
+        (aggregateFunction, agg.isDistinct) -> attribute
+      }.toMap
+
+      val (functionsWithDistinct, functionsWithoutDistinct) =
+        aggregateExpressions.partition(_.isDistinct)
+      if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
+        // This is a sanity check. We should not reach here when we have multiple distinct
+        // column sets. Our MultipleDistinctRewriter should take care this case.
+        sys.error("You hit a query analyzer bug. Please report your query to " +
+                  "Spark user mailing list.")
       }
+
+      val namedGroupingExpressions = groupingExpressions.map {
+        case ne: NamedExpression => ne -> ne
+        // If the expression is not a NamedExpressions, we add an alias.
+        // So, when we generate the result of the operator, the Aggregate Operator
+        // can directly get the Seq of attributes representing the grouping expressions.
+        case other =>
+          val withAlias = Alias(other, other.toString)()
+          other -> withAlias
+      }
+      val groupExpressionMap = namedGroupingExpressions.toMap
+
+      // The original `resultExpressions` are a set of expressions which may reference
+      // aggregate expressions, grouping column values, and constants. When aggregate operator
+      // emits output rows, we will use `resultExpressions` to generate an output projection
+      // which takes the grouping columns and final aggregate result buffer as input.
+      // Thus, we must re-write the result expressions so that their attributes match up with
+      // the attributes of the final result projection's input row:
+      val rewrittenResultExpressions = resultExpressions.map { expr =>
+        expr.transformDown {
+          case AggregateExpression(aggregateFunction, _, isDistinct) =>
+            // The final aggregation buffer's attributes will be `finalAggregationAttributes`,
+            // so replace each aggregate expression by its corresponding attribute in the set:
+            aggregateFunctionToAttribute(aggregateFunction, isDistinct)
+          case expression =>
+            // Since we're using `namedGroupingAttributes` to extract the grouping key
+            // columns, we need to replace grouping key expressions with their corresponding
+            // attributes. We do not rely on the equality check at here since attributes may
+            // differ cosmetically. Instead, we use semanticEquals.
+            groupExpressionMap.collectFirst {
+              case (expr, ne) if expr semanticEquals expression => ne.toAttribute
+            }.getOrElse(expression)
+        }.asInstanceOf[NamedExpression]
+      }
+
+      val aggregateOperator =
+        if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
+          if (functionsWithDistinct.nonEmpty) {
+            sys.error("Distinct columns cannot exist in Aggregate operator containing " +
+                      "aggregate functions which don't support partial aggregation.")
+          } else {
+            Utils.planAggregateWithoutPartial(
+              namedGroupingExpressions.map(_._2),
+              aggregateExpressions,
+              aggregateFunctionToAttribute,
+              rewrittenResultExpressions,
+              child)
+          }
+        } else if (functionsWithDistinct.isEmpty) {
+          Utils.planAggregateWithoutDistinct(
+            namedGroupingExpressions.map(_._2),
+            aggregateExpressions,
+            aggregateFunctionToAttribute,
+            rewrittenResultExpressions,
+            child)
+        } else {
+          Utils.planAggregateWithOneDistinct(
+            namedGroupingExpressions.map(_._2),
+            functionsWithDistinct,
+            functionsWithoutDistinct,
+            aggregateFunctionToAttribute,
+            rewrittenResultExpressions,
+            child)
+        }
+
+      aggregateOperator
     }
 
     private def canPushDownJoin(otherRDDPlan: LogicalPlan,
@@ -333,8 +406,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         ExecutedCommand(ShowAllTablesDetail(schemaName, plan.output)) :: Nil
       case DropTable(tableName, ifNotExists)
         if CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .cubeExists(Seq(tableName.toLowerCase()))(sqlContext) =>
-        ExecutedCommand(DropCubeCommand(ifNotExists, None, tableName.toLowerCase())) :: Nil
+          .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) =>
+        ExecutedCommand(DropCubeCommand(ifNotExists, None, tableName.toLowerCase)) :: Nil
       case ShowAggregateTablesCommand(schemaName) =>
         ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil
       case ShowLoadsCommand(schemaName, cube, limit) =>
@@ -342,7 +415,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
       case LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath,
       partionValues, isOverwriteExist, inputSqlString) =>
         val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .cubeExists(schemaNameOp, cubeName)(sqlContext)
+          .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext)
         if (isCarbonTable || partionValues.nonEmpty) {
           ExecutedCommand(LoadCube(schemaNameOp, cubeName, factPathFromUser,
             dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: Nil
@@ -360,7 +433,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         }
       case DescribeFormattedCommand(sql, tblIdentifier) =>
         val isCube = CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .cubeExists(tblIdentifier)(sqlContext)
+          .tableExists(tblIdentifier)(sqlContext)
         if (isCube) {
           val describe =
             LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
index c4c214d..58c02ff 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
@@ -34,7 +34,7 @@ private[sql] object CarbonStrategy {
   }
 }
 
-private[spark] class CarbonSQLDialect extends HiveQLDialect {
+private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialect(context) {
 
   @transient
   protected val sqlParser = new CarbonSqlParser

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index a975317..d80c065 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions.{AggregateExpression, Attribute, _}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -122,7 +123,7 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
             }
             var child = agg.child
             // Incase if the child also aggregate then push down decoder to child
-            if (attrsOndimAggs.size() > 0 && !child.isInstanceOf[Aggregate]) {
+            if (attrsOndimAggs.size() > 0 && !(child.equals(agg))) {
               child = CarbonDictionaryTempDecoder(attrsOndimAggs,
                 new util.HashSet[Attribute](),
                 agg.child)
@@ -398,10 +399,10 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
         allAttrsNotDecode: util.Set[Attribute],
         aliasMap: CarbonAliasDecoderRelation) = {
       val uAttr = aliasMap.getOrElse(attr, attr)
-      val relation = relations.find(p => p.attributeMap.contains(uAttr))
+      val relation = relations.find(p => p.contains(uAttr))
       if (relation.isDefined) {
         relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
-          case Some(true) if !allAttrsNotDecode.contains(uAttr) =>
+          case Some(true) if !allAttrsNotDecode.asScala.exists(p => p.name.equals(uAttr.name)) =>
             val newAttr = AttributeReference(attr.name,
               IntegerType,
               attr.nullable,
@@ -419,7 +420,7 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
         relations: Seq[CarbonDecoderRelation],
         aliasMap: CarbonAliasDecoderRelation): Boolean = {
       val uAttr = aliasMap.getOrElse(attr, attr)
-      val relation = relations.find(p => p.attributeMap.contains(uAttr))
+      val relation = relations.find(p => p.contains(uAttr))
       if (relation.isDefined) {
         relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
           case Some(true) => true
@@ -459,9 +460,16 @@ case class CarbonDecoderRelation(
   }
 
   def contains(attr: Attribute): Boolean = {
-    attributeMap
-       .exists(entry => entry._1.name.equals(attr.name) && entry._1.exprId.equals(attr.exprId)) ||
-     extraAttrs.exists(entry => entry.name.equals(attr.name) && entry.exprId.equals(attr.exprId))
+    var exists =
+      attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
+                        entry._1.exprId.equals(attr.exprId)) ||
+      extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
+                                entry.exprId.equals(attr.exprId))
+    if(!exists) {
+      exists = attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name)) ||
+        extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) )
+    }
+    exists
   }
 }
 
@@ -474,7 +482,8 @@ case class CarbonAliasDecoderRelation() {
   }
 
   def getOrElse(key: Attribute, default: Attribute): Attribute = {
-    val value = attrMap.find(p => p._1.name.equals(key.name) && p._1.exprId.equals(key.exprId))
+    val value = attrMap.find(p =>
+      p._1.name.equalsIgnoreCase(key.name) && p._1.exprId.equals(key.exprId))
     value match {
       case Some((k, v)) => v
       case _ => default

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
index 8c6df4c..aee375a 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
@@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.optimizer.CarbonAliasDecoderRelation
 import org.apache.spark.sql.types.StructType
 
+import org.carbondata.core.carbon.metadata.datatype.DataType
+import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
 import org.carbondata.query.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.carbondata.query.expression.conditional._
 import org.carbondata.query.expression.logical.{AndExpression, OrExpression}
@@ -160,7 +163,8 @@ object CarbonFilters {
 
   def processExpression(exprs: Seq[Expression],
       attributesNeedToDecode: java.util.HashSet[AttributeReference],
-      unprocessedExprs: ArrayBuffer[Expression]): Option[CarbonExpression] = {
+      unprocessedExprs: ArrayBuffer[Expression],
+      carbonTable: CarbonTable): Option[CarbonExpression] = {
     def transformExpression(expr: Expression): Option[CarbonExpression] = {
       expr match {
         case Or(left, right) =>
@@ -208,8 +212,9 @@ object CarbonFilters {
             new ListExpression(list.map(transformExpression(_).get).asJava)))
 
         case AttributeReference(name, dataType, _, _) =>
-          Some(new CarbonColumnExpression(name.toString,
-            CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+          Some(new CarbonColumnExpression(name,
+            CarbonScalaUtil.convertSparkToCarbonDataType(
+              getActualCarbonDataType(name, carbonTable))))
         case FakeCarbonCast(literal, dataType) => transformExpression(literal)
         case Literal(name, dataType) => Some(new
             CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
@@ -225,4 +230,19 @@ object CarbonFilters {
     exprs.flatMap(transformExpression).reduceOption(new AndExpression(_, _))
   }
 
+  private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
+    var carbonColumn: CarbonColumn =
+      carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
+    val dataType = if (carbonColumn != null) {
+      carbonColumn.getDataType
+    } else {
+      carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
+      carbonColumn.getDataType match {
+        case DataType.LONG => DataType.LONG
+        case DataType.DECIMAL => DataType.DECIMAL
+        case _ => DataType.DOUBLE
+      }
+    }
+    CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
index b91cf98..12f3dc4 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
@@ -21,7 +21,7 @@ package org.carbondata.spark
  * Contains all options for Spark data source
  */
 class CarbonOption(options: Map[String, String]) {
-  def tableIdentifier: String = options.getOrElse("cubeName", s"$dbName.$tableName")
+  def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
 
   def dbName: String = options.getOrElse("dbName", "default")
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
index d906745..6ee882b 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
@@ -47,6 +47,14 @@ class RawKeyValImpl extends RawKeyVal[BatchRawResult, Any] {
   override def getKey(key: BatchRawResult, value: Any): (BatchRawResult, Any) = (key, value)
 }
 
+trait RawKey[K, V] extends Serializable {
+  def getKey(key: Array[Any], value: Any): (K, V)
+
+}
+
+class RawKeyImpl extends RawKey[Array[Any], Any] {
+  override def getKey(key: Array[Any], value: Any): (Array[Any], Any) = (key, value)
+}
 trait Result[K, V] extends Serializable {
   def getKey(key: Int, value: LoadMetadataDetails): (K, V)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
deleted file mode 100644
index 65e7538..0000000
--- a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
+++ /dev/null
@@ -1,807 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.agg
-
-import scala.language.implicitConversions
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types._
-
-import org.carbondata.query.aggregator.MeasureAggregator
-import org.carbondata.query.aggregator.impl._
-
-case class CountCarbon(child: Expression) extends UnaryExpression with PartialAggregate1 {
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"COUNT($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialCount = Alias(CountCarbon(child), "PartialCount")()
-    SplitEvaluation(CountCarbonFinal(partialCount.toAttribute, LongType), partialCount :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new CountFunctionCarbon(child, this, false)
-
-  implicit def toAggregates(aggregate: MeasureAggregator): Double = aggregate.getDoubleValue()
-}
-
-case class CountCarbonFinal(child: Expression, origDataType: DataType)
-  extends AggregateExpression1 {
-
-  override def children: Seq[Expression] = child :: Nil
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"COUNT($child)"
-
-  override def newInstance(): AggregateFunction1 = new CountFunctionCarbon(child, this, true)
-}
-
-
-case class CountDistinctCarbon(child: Expression) extends PartialAggregate1 {
-  override def children: Seq[Expression] = child :: Nil
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"COUNT(DISTINCT ($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialSet = Alias(CountDistinctCarbon(child), "partialSets")()
-    SplitEvaluation(
-      CountDistinctCarbonFinal(partialSet.toAttribute, LongType),
-      partialSet :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new CountDistinctFunctionCarbon(child, this)
-}
-
-case class CountDistinctCarbonFinal(inputSet: Expression, origDataType: DataType)
-  extends AggregateExpression1 {
-  override def children: Seq[Expression] = inputSet :: Nil
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"COUNTFINAL(DISTINCT ${ inputSet }})"
-
-  override def newInstance(): AggregateFunction1 = {
-    new CountDistinctFunctionCarbonFinal(inputSet, this)
-  }
-}
-
-case class AverageCarbon(child: Expression, castedDataType: DataType = null)
-  extends UnaryExpression with PartialAggregate1 {
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"AVGCarbon($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialSum = Alias(AverageCarbon(child), "PartialAverage")()
-    SplitEvaluation(
-      AverageCarbonFinal(partialSum.toAttribute,
-        child.dataType match {
-          case IntegerType | StringType | LongType | TimestampType => DoubleType
-          case _ => child.dataType
-        }),
-      partialSum :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new AverageFunctionCarbon(child, this, false)
-}
-
-case class AverageCarbonFinal(child: Expression, origDataType: DataType)
-  extends AggregateExpression1 {
-  override def children: Seq[Expression] = child :: Nil
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"AVG($child)"
-
-  override def newInstance(): AggregateFunction1 = new AverageFunctionCarbon(child, this, true)
-}
-
-case class SumCarbon(child: Expression, castedDataType: DataType = null)
-  extends UnaryExpression with PartialAggregate1 {
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"SUMCarbon($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialSum = Alias(SumCarbon(child), "PartialSum")()
-    SplitEvaluation(
-      SumCarbonFinal(partialSum.toAttribute,
-        if (castedDataType != null) castedDataType else child.dataType),
-      partialSum :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new SumFunctionCarbon(child, this, false)
-
-  implicit def toAggregates(aggregate: MeasureAggregator): Double = aggregate.getDoubleValue()
-}
-
-case class SumCarbonFinal(child: Expression, origDataType: DataType) extends AggregateExpression1 {
-  override def children: Seq[Expression] = child :: Nil
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"SUMCarbonFinal($child)"
-
-  override def newInstance(): AggregateFunction1 = new SumFunctionCarbon(child, this, true)
-}
-
-case class MaxCarbon(child: Expression, castedDataType: DataType = null)
-  extends UnaryExpression with PartialAggregate1 {
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"MaxCarbon($child)"
-
-  // to do partialSum to PartialMax many places
-  override def asPartial: SplitEvaluation = {
-    val partialSum = Alias(MaxCarbon(child), "PartialMax")()
-    SplitEvaluation(
-      MaxCarbonFinal(partialSum.toAttribute,
-        if (castedDataType != null) castedDataType else child.dataType),
-      partialSum :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new MaxFunctionCarbon(child, this, false)
-}
-
-case class MaxCarbonFinal(child: Expression, origDataType: DataType) extends AggregateExpression1 {
-  override def children: Seq[Expression] = child :: Nil
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"MaxCarbonFinal($child)"
-
-  override def newInstance(): AggregateFunction1 = new MaxFunctionCarbon(child, this, true)
-}
-
-case class MinCarbon(child: Expression, castedDataType: DataType = null)
-  extends UnaryExpression with PartialAggregate1 {
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"MinCarbon($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialSum = Alias(MinCarbon(child), "PartialMin")()
-    SplitEvaluation(
-      MinCarbonFinal(partialSum.toAttribute,
-        if (castedDataType != null) castedDataType else child.dataType),
-      partialSum :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new MinFunctionCarbon(child, this, false)
-}
-
-case class MinCarbonFinal(child: Expression, origDataType: DataType) extends AggregateExpression1 {
-  override def children: Seq[Expression] = child :: Nil
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"MinCarbonFinal($child)"
-
-  override def newInstance(): AggregateFunction1 = new MinFunctionCarbon(child, this, true)
-}
-
-case class SumDistinctCarbon(child: Expression, castedDataType: DataType = null)
-  extends UnaryExpression with PartialAggregate1 {
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"PARTIAL_SUM_DISTINCT($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialSum = Alias(SumDistinctCarbon(child), "PartialSumDistinct")()
-    SplitEvaluation(
-      SumDistinctFinalCarbon(partialSum.toAttribute,
-        if (castedDataType != null) {
-          castedDataType
-        } else {
-          child.dataType
-        }),
-      partialSum :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = {
-    new SumDisctinctFunctionCarbon(child, this, false)
-  }
-}
-
-case class SumDistinctFinalCarbon(child: Expression, origDataType: DataType)
-  extends AggregateExpression1 {
-  override def children: Seq[Expression] = child :: Nil
-
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = false
-
-  override def dataType: DataType = origDataType
-
-  override def toString: String = s"FINAL_SUM_DISTINCT($child)"
-
-  override def newInstance(): AggregateFunction1 = new SumDisctinctFunctionCarbon(child, this, true)
-}
-
-case class FirstCarbon(child: Expression, origDataType: DataType = MeasureAggregatorUDT)
-  extends UnaryExpression with PartialAggregate1 {
-  override def references: AttributeSet = child.references
-
-  override def nullable: Boolean = child.nullable
-
-  override def dataType: DataType = MeasureAggregatorUDT
-
-  override def toString: String = s"FIRST($child)"
-
-  override def asPartial: SplitEvaluation = {
-    val partialFirst = Alias(FirstCarbon(child), "PartialFirst")()
-    SplitEvaluation(
-      FirstCarbon(partialFirst.toAttribute, child.dataType),
-      partialFirst :: Nil)
-  }
-
-  override def newInstance(): AggregateFunction1 = new FirstFunctionCarbon(child, this)
-}
-
-case class AverageFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
-  extends AggregateFunction1 {
-
-  def this() = this(null, null, false) // Required for serialization.
-
-  //  var count: Int = _
-  private var avg: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case s =>
-        var dc: MeasureAggregator = null
-        if (s != null) {
-          s match {
-            case v: java.math.BigDecimal =>
-              dc = new AvgBigDecimalAggregator
-              dc.agg(new java.math.BigDecimal(s.toString))
-              dc.setNewValue(new java.math.BigDecimal(s.toString))
-            case l: Long =>
-              dc = new AvgLongAggregator
-              dc.agg(s.toString.toLong)
-              dc.setNewValue(s.toString.toLong)
-            case _ =>
-              dc = new AvgDoubleAggregator
-              dc.agg(s.toString.toDouble)
-              dc.setNewValue(s.toString.toDouble)
-          }
-        }
-        else {
-          dc = new AvgDoubleAggregator()
-        }
-        dc
-    }
-    if (avg == null) {
-      avg = agg
-    } else {
-      avg.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = {
-    if (finalAgg) {
-      if (avg.isFirstTime) {
-        null
-      } else {
-        avg match {
-          case avg: AvgBigDecimalAggregator =>
-            Cast(Literal(avg.getBigDecimalValue), base.dataType).eval(null)
-          case avg: AvgLongAggregator =>
-            Cast(Literal(avg.getLongValue), base.dataType).eval(null)
-          case _ =>
-            Cast(Literal(avg.getDoubleValue), base.dataType).eval(null)
-        }
-      }
-    } else {
-      avg
-    }
-  }
-}
-
-case class CountFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
-  extends AggregateFunction1 {
-  def this() = this(null, null, false) // Required for serialization.
-
-  private var count: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case m: MeasureAggregator => m
-      case others =>
-        val agg1: MeasureAggregator = new CountAggregator
-        if (others != null) {
-          agg1.agg(0)
-        }
-        agg1
-    }
-    if (count == null) {
-      count = agg
-    } else {
-      count.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = {
-    if (finalAgg && count != null) {
-      if (count.isFirstTime) {
-        0L
-      } else {
-        Cast(Literal(count.getDoubleValue), base.dataType).eval(null)
-      }
-    } else {
-      count
-    }
-  }
-
-}
-
-
-case class SumFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
-  extends AggregateFunction1 {
-  def this() = this(null, null, false) // Required for serialization.
-
-  private var sum: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case s =>
-        var dc: MeasureAggregator = null
-        if (s != null) {
-          s match {
-            case bd: java.math.BigDecimal =>
-              dc = new SumBigDecimalAggregator
-              dc.agg(new java.math.BigDecimal(s.toString))
-              dc.setNewValue(new java.math.BigDecimal(s.toString))
-            case l: Long =>
-              dc = new SumLongAggregator
-              dc.agg(s.toString.toLong)
-              dc.setNewValue(s.toString.toLong)
-            case _ =>
-              dc = new SumDoubleAggregator
-              dc.agg(s.toString.toDouble)
-              dc.setNewValue(s.toString.toDouble)
-          }
-        }
-        else {
-          dc = new SumDoubleAggregator
-        }
-        dc
-    }
-    if (sum == null) {
-      sum = agg
-    } else {
-      sum.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = {
-    if (finalAgg && sum != null) {
-      if (sum.isFirstTime) {
-        null
-      } else {
-        sum match {
-          case s: SumBigDecimalAggregator =>
-            Cast(Literal(sum.getBigDecimalValue), base.dataType).eval(input)
-          case s: SumLongAggregator =>
-            Cast(Literal(sum.getLongValue), base.dataType).eval(input)
-          case _ =>
-            Cast(Literal(sum.getDoubleValue), base.dataType).eval(input)
-        }
-      }
-    } else {
-      sum
-    }
-  }
-}
-
-case class MaxFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
-  extends AggregateFunction1 {
-  def this() = this(null, null, false) // Required for serialization.
-
-  private var max: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case s =>
-        val dc = new MaxAggregator
-        if (s != null) {
-          dc.agg(s.toString.toDouble)
-          dc.setNewValue(s.toString.toDouble)
-        }
-        dc
-    }
-    if (max == null) {
-      max = agg
-    } else {
-      max.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = {
-    if (finalAgg && max != null) {
-      if (max.isFirstTime) {
-        null
-      } else {
-        Cast(Literal(max.getValueObject), base.dataType).eval(null)
-      }
-    } else {
-      max
-    }
-  }
-}
-
-case class MinFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
-  extends AggregateFunction1 {
-  def this() = this(null, null, false) // Required for serialization.
-
-  private var min: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case s =>
-        val dc: MeasureAggregator = new MinAggregator
-        if (s != null) {
-          dc.agg(s.toString.toDouble)
-          dc.setNewValue(s.toString.toDouble)
-        }
-        dc
-    }
-    if (min == null) {
-      min = agg
-    } else {
-      min.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = {
-    if (finalAgg && min != null) {
-      if (min.isFirstTime) {
-        null
-      } else {
-        Cast(Literal(min.getValueObject), base.dataType).eval(null)
-      }
-    } else {
-      min
-    }
-  }
-}
-
-case class SumDisctinctFunctionCarbon(expr: Expression, base: AggregateExpression1,
-    isFinal: Boolean)
-  extends AggregateFunction1 {
-
-  def this() = this(null, null, false) // Required for serialization.
-
-  private var distinct: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case null => null
-      case s =>
-        var dc: MeasureAggregator = null
-        s match {
-          case Double =>
-            dc = new SumDistinctDoubleAggregator
-            dc.setNewValue(s.toString.toDouble)
-          case Int =>
-            dc = new SumDistinctLongAggregator
-            dc.setNewValue(s.toString.toLong)
-          case bd: java.math.BigDecimal =>
-            dc = new SumDistinctBigDecimalAggregator
-            dc.setNewValue(new java.math.BigDecimal(s.toString))
-          case _ =>
-        }
-        dc
-    }
-    if (distinct == null) {
-      distinct = agg
-    } else {
-      distinct.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any =
-  // in case of empty load it was failing so added null check.
-  {
-    if (isFinal && distinct != null) {
-      if (distinct.isFirstTime) {
-        null
-      }
-      else {
-      Cast(Literal(distinct.getValueObject), base.dataType).eval(null)
-      }
-    }
-    else {
-      distinct
-    }
-  }
-}
-
-case class CountDistinctFunctionCarbon(expr: Expression, base: AggregateExpression1)
-  extends AggregateFunction1 {
-
-  def this() = this(null, null) // Required for serialization.
-
-  private var count: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case null => null
-      case s =>
-        val dc = new DistinctCountAggregatorObjectSet
-        dc.setNewValue(s.toString)
-        dc
-    }
-    if (count == null) {
-      count = agg
-    } else {
-      count.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = count
-}
-
-case class CountDistinctFunctionCarbonFinal(expr: Expression, base: AggregateExpression1)
-  extends AggregateFunction1 {
-
-  def this() = this(null, null) // Required for serialization.
-
-  private var count: MeasureAggregator = null
-
-  override def update(input: InternalRow): Unit = {
-    val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-    val resolution =
-      if (br.isDefined) {
-        input.get(br.get.ordinal, MeasureAggregatorUDT)
-      } else {
-        expr.eval(input)
-      }
-    val agg = resolution match {
-      case s: MeasureAggregator => s
-      case null => null
-      case s =>
-        val dc = new DistinctCountAggregatorObjectSet
-        dc.setNewValue(s.toString)
-        dc
-    }
-    if (count == null) {
-      count = agg
-    } else {
-      count.merge(agg)
-    }
-  }
-
-  override def eval(input: InternalRow): Any = {
-    if (count == null) {
-      Cast(Literal(0), base.dataType).eval(null)
-    } else if (count.isFirstTime) {
-      Cast(Literal(0), base.dataType).eval(null)
-    } else {
-      Cast(Literal(count.getDoubleValue), base.dataType).eval(null)
-    }
-  }
-}
-
-case class FirstFunctionCarbon(expr: Expression, base: AggregateExpression1)
-  extends AggregateFunction1 {
-  def this() = this(null, null) // Required for serialization.
-
-  var result: Any = null
-
-  override def update(input: InternalRow): Unit = {
-    if (result == null) {
-      val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
-      val resolution =
-        if (br.isDefined) {
-          input.get(br.get.ordinal, MeasureAggregatorUDT)
-        } else {
-          expr.eval(input)
-        }
-
-      result = resolution
-    }
-  }
-
-  override def eval(input: InternalRow): Any = Cast(Literal(result), base.dataType).eval(null)
-}
-
-case class FlattenExpr(expr: Expression) extends Expression with CodegenFallback {
-  self: Product =>
-
-  override def children: Seq[Expression] = Seq(expr)
-
-  override def dataType: DataType = expr.dataType
-
-  override def nullable: Boolean = expr.nullable
-
-  override def references: AttributeSet = AttributeSet(expr.flatMap(_.references.iterator))
-
-  override def foldable: Boolean = expr.foldable
-
-  override def toString: String = "Flatten(" + expr.toString + ")"
-
-  type EvaluatedType = Any
-
-  override def eval(input: InternalRow): Any = {
-    expr.eval(input) match {
-      case d: MeasureAggregator => d.getDoubleValue
-      case others => others
-    }
-  }
-}
-
-case class FlatAggregatorsExpr(expr: Expression) extends Expression with CodegenFallback {
-  self: Product =>
-
-  override def children: Seq[Expression] = Seq(expr)
-
-  override def dataType: DataType = expr.dataType
-
-  override def nullable: Boolean = expr.nullable
-
-  override def references: AttributeSet = AttributeSet(expr.flatMap(_.references.iterator))
-
-  override def foldable: Boolean = expr.foldable
-
-  override def toString: String = "FlattenAggregators(" + expr.toString + ")"
-
-  type EvaluatedType = Any
-
-  override def eval(input: InternalRow): Any = {
-    expr.eval(input) match {
-      case d: MeasureAggregator =>
-        d.setNewValue(d.getDoubleValue)
-        d
-      case others => others
-    }
-  }
-}
-
-case class PositionLiteral(expr: Expression, intermediateDataType: DataType)
-  extends LeafExpression with CodegenFallback {
-  override def dataType: DataType = expr.dataType
-
-  override def nullable: Boolean = false
-
-  type EvaluatedType = Any
-  var position = -1
-
-  def setPosition(pos: Int): Unit = position = pos
-
-  override def toString: String = s"PositionLiteral($position : $expr)"
-
-  override def eval(input: InternalRow): Any = {
-    if (position != -1) {
-      input.get(position, intermediateDataType)
-    } else {
-      expr.eval(input)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala b/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala
deleted file mode 100644
index ea0c8af..0000000
--- a/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.agg
-
-import org.apache.spark.sql.types._
-
-import org.carbondata.query.aggregator.MeasureAggregator
-
-/**
- * class to support user defined type for carbon measure aggregators
- * from spark 1.5, spark has made the data type strict and ANY is no more supported
- * for every data, we need to give the data type
- */
-class MeasureAggregatorUDT extends UserDefinedType[MeasureAggregator] {
-  // the default DoubleType is Ok as we are not going to pass to spark sql to
-  // evaluate,need to add this for compilation errors
-  override def sqlType: DataType = {
-    ArrayType(DoubleType, containsNull = false)
-  }
-
-  override def serialize(obj: Any): Any = {
-    obj match {
-      case p: MeasureAggregator => p
-    }
-  }
-
-  override def deserialize(datum: Any): MeasureAggregator = {
-    datum match {
-      case values =>
-        val xy = values.asInstanceOf[MeasureAggregator]
-        xy
-    }
-  }
-
-  override def userClass: Class[MeasureAggregator] = classOf[MeasureAggregator]
-
-  override def asNullable: MeasureAggregatorUDT = this
-}
-
-case object MeasureAggregatorUDT extends MeasureAggregatorUDT

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ded665e..0672281 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.concurrent.{Executors, ExecutorService}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ListBuffer
 import scala.util.control.Breaks._
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -189,9 +189,7 @@ object CarbonDataRDDFactory extends Logging {
         if (carbonLock.lockWithRetries()) {
           logInfo("Successfully got the table metadata file lock")
           if (updatedLoadMetadataDetailsList.nonEmpty) {
-            LoadAggregateTabAfterRetention(schemaName, cube.getFactTableName, cube.getFactTableName,
-              sqlContext, schema, updatedLoadMetadataDetailsList
-            )
+            // TODO: Load Aggregate tables after retention.
           }
 
           // write
@@ -217,56 +215,6 @@ object CarbonDataRDDFactory extends Logging {
     }
   }
 
-  def LoadAggregateTabAfterRetention(
-      schemaName: String,
-      cubeName: String,
-      factTableName: String,
-      sqlContext: SQLContext,
-      schema: CarbonDataLoadSchema,
-      list: ListBuffer[LoadMetadataDetails]) {
-    val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
-      Option(schemaName),
-      cubeName,
-      None
-    )(sqlContext).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $schemaName.$cubeName does not exist")
-    }
-    val carbonLoadModel = new CarbonLoadModel()
-    carbonLoadModel.setTableName(cubeName)
-    carbonLoadModel.setDatabaseName(schemaName)
-    val table = relation.cubeMeta.carbonTable
-    val aggTables = schema.getCarbonTable.getAggregateTablesName
-    if (null != aggTables && !aggTables.isEmpty) {
-      carbonLoadModel.setRetentionRequest(true)
-      carbonLoadModel.setLoadMetadataDetails(list.asJava)
-      carbonLoadModel.setTableName(table.getFactTableName)
-      carbonLoadModel
-        .setCarbonDataLoadSchema(new CarbonDataLoadSchema(relation.cubeMeta.carbonTable))
-      // TODO: need to fill dimension relation from data load sql command
-      var storeLocation = CarbonProperties.getInstance
-        .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-          System.getProperty("java.io.tmpdir")
-        )
-      storeLocation = storeLocation + "/carbonstore/" + System.currentTimeMillis()
-      val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
-      var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
-      if (null == kettleHomePath) {
-        kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
-      }
-      if (kettleHomePath == null) {
-        sys.error(s"carbon.kettle.home is not set")
-      }
-      CarbonDataRDDFactory.loadCarbonData(
-        sqlContext,
-        carbonLoadModel,
-        storeLocation,
-        relation.cubeMeta.storePath,
-        kettleHomePath,
-        relation.cubeMeta.partitioner, columinar, isAgg = true)
-    }
-  }
-
   def configSplitMaxSize(context: SparkContext, filePaths: String,
     hadoopConfiguration: Configuration): Unit = {
     val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
index 7dcb33b..5993677 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
@@ -25,8 +25,9 @@ import org.carbondata.core.iterator.CarbonIterator
 import org.carbondata.query.carbon.executor.QueryExecutorFactory
 import org.carbondata.query.carbon.model.QueryModel
 import org.carbondata.query.carbon.result.BatchRawResult
+import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor
 import org.carbondata.query.expression.Expression
-import org.carbondata.spark.RawKeyVal
+import org.carbondata.spark.{RawKey, RawKeyVal}
 
 
 /**
@@ -48,7 +49,7 @@ class CarbonRawQueryRDD[K, V](
     sc: SparkContext,
     queryModel: QueryModel,
     filterExpression: Expression,
-    keyClass: RawKeyVal[K, V],
+    keyClass: RawKey[K, V],
     @transient conf: Configuration,
     cubeCreationTime: Long,
     schemaLastUpdatedTime: Long,
@@ -66,7 +67,7 @@ class CarbonRawQueryRDD[K, V](
   override def compute(thepartition: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
     val iter = new Iterator[(K, V)] {
-      var rowIterator: CarbonIterator[BatchRawResult] = _
+      var rowIterator: CarbonIterator[Array[Any]] = _
       var queryStartTime: Long = 0
       try {
         val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
@@ -83,8 +84,9 @@ class CarbonRawQueryRDD[K, V](
               System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties");
           }
           // execute query
-          rowIterator = QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
-            .asInstanceOf[CarbonIterator[BatchRawResult]]
+          rowIterator = new ChunkRawRowIterartor(
+            QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
+            .asInstanceOf[CarbonIterator[BatchRawResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
         }
       } catch {
         case e: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 6cd9986..9bc1a64 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@ -105,18 +105,22 @@ object CarbonScalaUtil {
     }
   }
 
+  def convertValueToSparkDataType(value: Any,
+      dataType: org.apache.spark.sql.types.DataType): Any = {
+    dataType match {
+      case StringType => value.toString
+      case IntegerType => value.toString.toInt
+      case LongType => value.toString.toLong
+      case DoubleType => value.toString.toDouble
+      case FloatType => value.toString.toFloat
+      case _ => value.toString.toDouble
+    }
+  }
+
 
   case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
 
   object CarbonSparkUtil {
-    def createBaseRDD(carbonContext: CarbonContext, carbonTable: CarbonTable): TransformHolder = {
-      val relation = CarbonEnv.getInstance(carbonContext).carbonCatalog
-        .lookupRelation1(Option(carbonTable.getDatabaseName),
-          carbonTable.getFactTableName, None)(carbonContext).asInstanceOf[CarbonRelation]
-      val rdd = new SchemaRDD(carbonContext, relation)
-      rdd.registerTempTable(carbonTable.getFactTableName)
-      TransformHolder(rdd, createSparkMeta(carbonTable))
-    }
 
     def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
       val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 121a3a5..ebc8ead 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -395,7 +395,7 @@ object GlobalDictionaryUtil extends Logging {
 
     // update CarbonDataLoadSchema
     val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
-          model.table.getTableName, None)(sqlContext)
+          model.table.getTableName)(sqlContext)
         .asInstanceOf[CarbonRelation].cubeMeta.carbonTable
     carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/resources/datawithmaxinteger.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datawithmaxinteger.csv b/integration/spark/src/test/resources/datawithmaxinteger.csv
new file mode 100644
index 0000000..52dfdfc
--- /dev/null
+++ b/integration/spark/src/test/resources/datawithmaxinteger.csv
@@ -0,0 +1,12 @@
+imei,age
+1AA1,10
+1AA2,26
+1AA3,10
+1AA4,10
+1AA5,20
+1AA6,10
+1AA7,10
+1AA8,10
+1AA9,10
+1AA10,10
+1AA12,2147483647
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/resources/datawithmaxmininteger.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datawithmaxmininteger.csv b/integration/spark/src/test/resources/datawithmaxmininteger.csv
new file mode 100644
index 0000000..5677a40
--- /dev/null
+++ b/integration/spark/src/test/resources/datawithmaxmininteger.csv
@@ -0,0 +1,13 @@
+imei,age
+1AA1,10
+1AA2,26
+1AA3,10
+1AA4,10
+1AA5,20
+1AA6,10
+1AA7,10
+1AA8,10
+1AA9,10
+1AA10,10
+1AA11,-2147483648
+1AA12,2147483647
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/resources/datawithmininteger.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datawithmininteger.csv b/integration/spark/src/test/resources/datawithmininteger.csv
new file mode 100644
index 0000000..cc34efa
--- /dev/null
+++ b/integration/spark/src/test/resources/datawithmininteger.csv
@@ -0,0 +1,12 @@
+imei,age
+1AA1,10
+1AA2,26
+1AA3,10
+1AA4,10
+1AA5,20
+1AA6,10
+1AA7,10
+1AA8,10
+1AA9,10
+1AA10,10
+1AA11,-2147483648
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index eaa249e..5811d3a 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.common.util
 
 import java.util.{Locale, TimeZone}
 
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import scala.collection.JavaConversions._
+
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.columnar.InMemoryRelation
-
-import scala.collection.JavaConversions._
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 
 class QueryTest extends PlanTest {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala
new file mode 100644
index 0000000..26c88f8
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.integration.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for data loading when there are min integer value in int column
+ *
+ */
+class TestLoadDataWithMaxMinInteger extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+    sql("drop table if exists integer_table_01")
+    sql("drop table if exists integer_table_02")
+    sql("drop table if exists integer_table_03")
+  }
+  test("test carbon table data loading when the int column " +
+    "contains min integer value") {
+    sql(
+      """
+        CREATE TABLE integer_table_01(imei string,age int)
+        STORED BY 'org.apache.carbondata.format'
+      """)
+    sql(
+      """
+        LOAD DATA INPATH './src/test/resources/datawithmininteger.csv'
+        INTO table integer_table_01 options ('DELIMITER'=',',
+        'QUOTECHAR'='"', 'FILEHEADER'= 'imei,age')
+      """)
+    checkAnswer(sql("select age from integer_table_01"),
+      Seq(Row(10.0), Row(26.0), Row(10.0), Row(10.0), Row(20.0),
+        Row(10.0), Row(10.0), Row(10.0), Row(10.0), Row(10.0),
+        Row(-2147483648.0)))
+  }
+
+  test("test carbon table data loading when the int column " +
+    "contains max integer value") {
+    sql(
+      """
+        CREATE TABLE integer_table_02(imei string,age int)
+        STORED BY 'org.apache.carbondata.format'
+      """)
+    sql(
+      """
+        LOAD DATA INPATH './src/test/resources/datawithmaxinteger.csv'
+        INTO table integer_table_02 options ('DELIMITER'=',',
+        'QUOTECHAR'='"', 'FILEHEADER'= 'imei,age')
+      """)
+    checkAnswer(sql("select age from integer_table_02"),
+      Seq(Row(10.0), Row(26.0), Row(10.0), Row(10.0), Row(20.0),
+        Row(10.0), Row(10.0), Row(10.0), Row(10.0), Row(10.0),
+        Row(2147483647.0)))
+  }
+
+  test("test carbon table data loading when the int column " +
+    "contains min and max integer value") {
+    sql(
+      """
+        CREATE TABLE integer_table_03(imei string,age int)
+        STORED BY 'org.apache.carbondata.format'
+      """)
+    sql(
+      """
+        LOAD DATA INPATH './src/test/resources/datawithmaxmininteger.csv'
+        INTO table integer_table_03 options ('DELIMITER'=',',
+        'QUOTECHAR'='"', 'FILEHEADER'= 'imei,age')
+      """)
+    checkAnswer(sql("select age from integer_table_03"),
+      Seq(Row(10.0), Row(26.0), Row(10.0), Row(10.0), Row(20.0),
+        Row(10.0), Row(10.0), Row(10.0), Row(10.0), Row(10.0),
+        Row(-2147483648.0), Row(2147483647.0)))
+  }
+  override def afterAll {
+    sql("drop table if exists integer_table_01")
+    sql("drop table if exists integer_table_02")
+    sql("drop table if exists integer_table_03")
+  }
+}