You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by jb...@apache.org on 2016/06/23 14:15:50 UTC
[02/56] [abbrv] incubator-carbondata git commit: [Issue 618]Supported
Spark 1.6 in Carbondata (#670)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index 84f050e..30a1070 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@ -30,6 +30,7 @@ import scala.util.parsing.combinator.RegexParsers
import org.apache.spark
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{AggregateTableAttributes, Partitioner}
@@ -120,28 +121,13 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
false
}
- def lookupRelation1(
- databaseName: Option[String],
- tableName: String,
- alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
- val db = databaseName match {
- case Some(name) => name
- case _ => null
- }
- if (db == null) {
- lookupRelation2(Seq(tableName), alias)(sqlContext)
- } else {
- lookupRelation2(Seq(db, tableName), alias)(sqlContext)
- }
- }
-
- override def lookupRelation(tableIdentifier: Seq[String],
+ override def lookupRelation(tableIdentifier: TableIdentifier,
alias: Option[String] = None): LogicalPlan = {
try {
super.lookupRelation(tableIdentifier, alias)
} catch {
case s: java.lang.Exception =>
- lookupRelation2(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
+ lookupRelation1(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
}
}
@@ -153,64 +139,34 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
cubeCreationTime
}
+ def lookupRelation1(dbName: Option[String],
+ tableName: String)(sqlContext: SQLContext): LogicalPlan = {
+ lookupRelation1(TableIdentifier(tableName, dbName))(sqlContext)
+ }
- def lookupRelation2(tableIdentifier: Seq[String],
+ def lookupRelation1(tableIdentifier: TableIdentifier,
alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
checkSchemasModifiedTimeAndReloadCubes()
- tableIdentifier match {
- case Seq(schemaName, cubeName) =>
- val cubes = metadata.cubesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
- if (cubes.nonEmpty) {
- CarbonRelation(schemaName, cubeName,
- CarbonSparkUtil.createSparkMeta(cubes.head.carbonTable), cubes.head, alias)(sqlContext)
- } else {
- LOGGER.audit(s"Table Not Found: $schemaName $cubeName")
- throw new NoSuchTableException
- }
- case Seq(cubeName) =>
- val currentDatabase = getDB.getDatabaseName(None, sqlContext)
- val cubes = metadata.cubesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
- if (cubes.nonEmpty) {
- CarbonRelation(currentDatabase, cubeName,
- CarbonSparkUtil.createSparkMeta(cubes.head.carbonTable), cubes.head, alias)(sqlContext)
- } else {
- LOGGER.audit(s"Table Not Found: $cubeName")
- throw new NoSuchTableException
- }
- case _ =>
- LOGGER.audit(s"Table Not Found: $tableIdentifier")
- throw new NoSuchTableException
- }
- }
-
- def cubeExists(db: Option[String], tableName: String)(sqlContext: SQLContext): Boolean = {
- if (db.isEmpty || db.get == null || db.get == "") {
- cubeExists(Seq(tableName))(sqlContext)
+ val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+ val cubes = metadata.cubesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ if (cubes.nonEmpty) {
+ CarbonRelation(database, tableIdentifier.table,
+ CarbonSparkUtil.createSparkMeta(cubes.head.carbonTable), cubes.head, alias)(sqlContext)
} else {
- cubeExists(Seq(db.get, tableName))(sqlContext)
+ LOGGER.audit(s"Table Not Found: ${tableIdentifier.table}")
+ throw new NoSuchTableException
}
}
- def cubeExists(tableIdentifier: Seq[String])(sqlContext: SQLContext): Boolean = {
+ def tableExists(tableIdentifier: TableIdentifier)(sqlContext: SQLContext): Boolean = {
checkSchemasModifiedTimeAndReloadCubes()
- tableIdentifier match {
- case Seq(schemaName, cubeName) =>
- val cubes = metadata.cubesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
- cubes.nonEmpty
- case Seq(cubeName) =>
- val currentDatabase = getDB.getDatabaseName(None, sqlContext)
- val cubes = metadata.cubesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(currentDatabase) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))
- cubes.nonEmpty
- case _ => false
- }
+ val database = tableIdentifier.database.getOrElse(getDB.getDatabaseName(None, sqlContext))
+ val cubes = metadata.cubesMeta.filter(
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(database) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableIdentifier.table))
+ cubes.nonEmpty
}
def loadMetadata(metadataPath: String): MetaData = {
@@ -338,7 +294,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
dbName: String, tableName: String, partitioner: Partitioner)
(sqlContext: SQLContext): String = {
- if (cubeExists(Seq(dbName, tableName))(sqlContext)) {
+ if (tableExists(TableIdentifier(tableName, Some(dbName)))(sqlContext)) {
sys.error(s"Table [$tableName] already exists under Database [$dbName]")
}
@@ -506,7 +462,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
/**
* Shows all schemas which has Database name like
*/
- def showSchemas(schemaLike: Option[String]): Seq[String] = {
+ def showDatabases(schemaLike: Option[String]): Seq[String] = {
checkSchemasModifiedTimeAndReloadCubes()
metadata.cubesMeta.map { c =>
schemaLike match {
@@ -526,7 +482,7 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
/**
* Shows all cubes for given schema.
*/
- def getCubes(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
+ def getTables(databaseName: Option[String])(sqlContext: SQLContext): Seq[(String, Boolean)] = {
val schemaName = databaseName
.getOrElse(sqlContext.asInstanceOf[HiveContext].catalog.client.currentDatabase)
@@ -539,21 +495,25 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
/**
* Shows all cubes in all schemas.
*/
- def getAllCubes()(sqlContext: SQLContext): Seq[(String, String)] = {
+ def getAllTables()(sqlContext: SQLContext): Seq[TableIdentifier] = {
checkSchemasModifiedTimeAndReloadCubes()
- metadata.cubesMeta
- .map { c => (c.carbonTableIdentifier.getDatabaseName, c.carbonTableIdentifier.getTableName) }
+ metadata.cubesMeta.map { c =>
+ TableIdentifier(c.carbonTableIdentifier.getTableName,
+ Some(c.carbonTableIdentifier.getDatabaseName))
+ }
}
- def dropCube(partitionCount: Int, tableStorePath: String, schemaName: String, cubeName: String)
+ def dropTable(partitionCount: Int, tableStorePath: String, tableIdentifier: TableIdentifier)
(sqlContext: SQLContext) {
- if (!cubeExists(Seq(schemaName, cubeName))(sqlContext)) {
- LOGGER.audit(s"Drop Table failed. Table with $schemaName.$cubeName does not exist")
- sys.error(s"Table with $schemaName.$cubeName does not exist")
+ val dbName = tableIdentifier.database.get
+ val tableName = tableIdentifier.table
+ if (!tableExists(tableIdentifier)(sqlContext)) {
+ LOGGER.audit(s"Drop Table failed. Table with ${dbName}.$tableName does not exist")
+ sys.error(s"Table with $dbName.$tableName does not exist")
}
val carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .getCarbonTable(schemaName + "_" + cubeName)
+ .getCarbonTable(dbName + "_" + tableName)
if (null != carbonTable) {
val metadatFilePath = carbonTable.getMetaDataFilepath
@@ -561,12 +521,12 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
if (FileFactory.isFileExist(metadatFilePath, fileType)) {
val file = FileFactory.getCarbonFile(metadatFilePath, fileType)
- CarbonUtil.renameCubeForDeletion(partitionCount, tableStorePath, schemaName, cubeName)
+ CarbonUtil.renameCubeForDeletion(partitionCount, tableStorePath, dbName, tableName)
CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
}
val partitionLocation = tableStorePath + File.separator + "partition" + File.separator +
- schemaName + File.separator + cubeName
+ dbName + File.separator + tableName
val partitionFileType = FileFactory.getFileType(partitionLocation)
if (FileFactory.isFileExist(partitionLocation, partitionFileType)) {
CarbonUtil
@@ -575,20 +535,20 @@ class CarbonMetastoreCatalog(hive: HiveContext, val storePath: String, client: C
}
try {
- sqlContext.sql(s"DROP TABLE $schemaName.$cubeName").collect()
+ sqlContext.sql(s"DROP TABLE $dbName.$tableName").collect()
} catch {
case e: Exception =>
LOGGER.audit(
- s"Error While deleting the table $schemaName.$cubeName during drop Table" + e.getMessage)
+ s"Error While deleting the table $dbName.$tableName during drop Table" + e.getMessage)
}
metadata.cubesMeta -= metadata.cubesMeta.filter(
- c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(schemaName) &&
- c.carbonTableIdentifier.getTableName.equalsIgnoreCase(cubeName))(0)
+ c => c.carbonTableIdentifier.getDatabaseName.equalsIgnoreCase(dbName) &&
+ c.carbonTableIdentifier.getTableName.equalsIgnoreCase(tableName))(0)
org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance
- .removeTable(schemaName + "_" + cubeName)
- logInfo(s"Table $cubeName of $schemaName Database dropped syccessfully.")
- LOGGER.info("Table " + cubeName + " of " + schemaName + " Database dropped syccessfully.")
+ .removeTable(dbName + "_" + tableName)
+ logInfo(s"Table $tableName of $dbName Database dropped syccessfully.")
+ LOGGER.info("Table " + tableName + " of " + dbName + " Database dropped syccessfully.")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
index b1c3924..c01a937 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonRawStrategies.scala
@@ -63,19 +63,7 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
detailQuery = true,
useBinaryAggregation = false)(sqlContext)._1 :: Nil
}
-
- case catalyst.planning.PartialAggregation(
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
- groupingExpressions,
- partialComputation,
- PhysicalOperation(projectList, predicates,
- l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
- handleRawAggregation(plan, plan, projectList, predicates, carbonRelation,
- l, partialComputation, groupingExpressions, namedGroupingAttributes,
- rewrittenAggregateExpressions)
- case CarbonDictionaryCatalystDecoder(relations, profile,
- aliasMap, _, child) =>
+ case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
CarbonDictionaryDecoder(relations,
profile,
aliasMap,
@@ -85,47 +73,6 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
}
}
-
- def handleRawAggregation(plan: LogicalPlan,
- aggPlan: LogicalPlan,
- projectList: Seq[NamedExpression],
- predicates: Seq[Expression],
- carbonRelation: CarbonDatasourceRelation,
- logicalRelation: LogicalRelation,
- partialComputation: Seq[NamedExpression],
- groupingExpressions: Seq[Expression],
- namedGroupingAttributes: Seq[Attribute],
- rewrittenAggregateExpressions: Seq[NamedExpression]):
- Seq[SparkPlan] = {
- val groupByPresentOnMsr = isGroupByPresentOnMeasures(groupingExpressions,
- carbonRelation.carbonRelation.metaData.carbonTable)
- if(!groupByPresentOnMsr) {
- val s = carbonRawScan(projectList,
- predicates,
- carbonRelation,
- logicalRelation,
- Some(partialComputation),
- detailQuery = false,
- useBinaryAggregation = true)(sqlContext)
- // If any aggregate function present on dimnesions then don't use this plan.
- if (!s._2) {
- CarbonAggregate(
- partial = false,
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
- CarbonRawAggregate(
- partial = true,
- groupingExpressions,
- partialComputation,
- s._1))(sqlContext) :: Nil
- } else {
- Nil
- }
- } else {
- Nil
- }
- }
-
/**
* Create carbon scan
*/
@@ -141,13 +88,6 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
// Check out any expressions are there in project list. if they are present then we need to
// decode them as well.
- val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
- projectList.map {
- case attr: AttributeReference =>
- case Alias(attr: AttributeReference, _) =>
- case others =>
- others.references.map(f => projectExprsNeedToDecode.add(f))
- }
val projectSet = AttributeSet(projectList.flatMap(_.references))
val scan = CarbonRawTableScan(projectSet.toSeq,
relation.carbonRelation,
@@ -155,13 +95,19 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
groupExprs,
useBinaryAggregation)(sqlContext)
val dimAggrsPresence: Boolean = scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0
- projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
+ projectList.map {
+ case attr: AttributeReference =>
+ case Alias(attr: AttributeReference, _) =>
+ case others =>
+ others.references
+ .map(f => scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]))
+ }
if (!detailQuery) {
- if (projectExprsNeedToDecode.size > 0) {
+ if (scan.attributesNeedToDecode.size > 0) {
val decoder = getCarbonDecoder(logicalRelation,
sc,
tableName,
- projectExprsNeedToDecode.asScala.toSeq,
+ scan.attributesNeedToDecode.asScala.toSeq,
scan)
if (scan.unprocessedExprs.nonEmpty) {
val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
@@ -173,11 +119,11 @@ class CarbonRawStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan
(scan, dimAggrsPresence)
}
} else {
- if (projectExprsNeedToDecode.size() > 0) {
+ if (scan.attributesNeedToDecode.size() > 0) {
val decoder = getCarbonDecoder(logicalRelation,
sc,
tableName,
- projectExprsNeedToDecode.asScala.toSeq,
+ scan.attributesNeedToDecode.asScala.toSeq,
scan)
if (scan.unprocessedExprs.nonEmpty) {
val filterCondToAdd = scan.unprocessedExprs.reduceLeftOption(expressions.And)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 6c7e362..56deb8a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -21,12 +21,15 @@ package org.apache.spark.sql.hive
import scala.math.BigInt.int2bigInt
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation, QueryPlanner}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Limit, LogicalPlan, Sort}
import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, ExecutedCommand, Filter, Project, SparkPlan}
+import org.apache.spark.sql.execution.aggregate.Utils
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
import org.apache.spark.sql.execution.joins.{BroadCastFilterPushJoin, BuildLeft, BuildRight}
@@ -75,44 +78,35 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
case Limit(IntegerLiteral(limit),
Sort(order, _,
- p@PartialAggregation(namedGroupingAttributes,
- rewrittenAggregateExpressions,
- groupingExpressions,
- partialComputation,
+ p@CarbonAggregation(groupingExpressions,
+ aggregateExpressions,
PhysicalOperation(
projectList,
predicates,
l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))))) =>
val aggPlan = handleAggregation(plan, p, projectList, predicates, carbonRelation,
- partialComputation, groupingExpressions, namedGroupingAttributes,
- rewrittenAggregateExpressions)
+ groupingExpressions, aggregateExpressions)
org.apache.spark.sql.execution.TakeOrderedAndProject(limit,
order,
None,
aggPlan.head) :: Nil
- case Limit(IntegerLiteral(limit), p@PartialAggregation(
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
+ case Limit(IntegerLiteral(limit), p@CarbonAggregation(
groupingExpressions,
- partialComputation,
+ aggregateExpressions,
PhysicalOperation(projectList, predicates,
l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))) =>
val aggPlan = handleAggregation(plan, p, projectList, predicates, carbonRelation,
- partialComputation, groupingExpressions, namedGroupingAttributes,
- rewrittenAggregateExpressions)
+ groupingExpressions, aggregateExpressions)
org.apache.spark.sql.execution.Limit(limit, aggPlan.head) :: Nil
- case PartialAggregation(
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
+ case CarbonAggregation(
groupingExpressions,
- partialComputation,
+ aggregateExpressions,
PhysicalOperation(projectList, predicates,
l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
handleAggregation(plan, plan, projectList, predicates, carbonRelation,
- partialComputation, groupingExpressions, namedGroupingAttributes,
- rewrittenAggregateExpressions)
+ groupingExpressions, aggregateExpressions)
case Limit(IntegerLiteral(limit),
PhysicalOperation(projectList, predicates,
@@ -188,59 +182,138 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
projectList: Seq[NamedExpression],
predicates: Seq[Expression],
carbonRelation: CarbonDatasourceRelation,
- partialComputation: Seq[NamedExpression],
groupingExpressions: Seq[Expression],
- namedGroupingAttributes: Seq[Attribute],
- rewrittenAggregateExpressions: Seq[NamedExpression]):
+ namedGroupingAttributes: Seq[NamedExpression]):
Seq[SparkPlan] = {
val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan)
val s =
try {
carbonScan(projectList, predicates, carbonRelation.carbonRelation,
- Some(partialComputation), substitutesortExprs, limitExpr, groupingExpressions.nonEmpty)
+ Some(namedGroupingAttributes), substitutesortExprs,
+ limitExpr, groupingExpressions.nonEmpty)
} catch {
case e: Exception => null
}
if (s != null) {
- CarbonAggregate(
- partial = false,
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
- CarbonAggregate(
- partial = true,
- groupingExpressions,
- partialComputation,
- s)(sqlContext))(sqlContext) :: Nil
+ aggregatePlan(groupingExpressions, namedGroupingAttributes, s)
} else {
(aggPlan, true) match {
- case PartialAggregation(
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
+ case CarbonAggregation(
groupingExpressions,
- partialComputation,
+ namedGroupingAttributes,
PhysicalOperation(projectList, predicates,
l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = extractPlan(plan)
val s = carbonScan(projectList, predicates, carbonRelation.carbonRelation,
- Some(partialComputation), substitutesortExprs, limitExpr,
+ Some(namedGroupingAttributes), substitutesortExprs, limitExpr,
groupingExpressions.nonEmpty, detailQuery = true)
- CarbonAggregate(
- partial = false,
- namedGroupingAttributes,
- rewrittenAggregateExpressions,
- CarbonAggregate(
- partial = true,
- groupingExpressions,
- partialComputation,
- s)(sqlContext))(sqlContext) :: Nil
+ aggregatePlan(groupingExpressions, namedGroupingAttributes, s)
+ }
+ }
+ }
+
+ // TODO: It is duplicated code from spark. Need to find a way
+ private def aggregatePlan(groupingExpressions: Seq[Expression],
+ resultExpressions: Seq[NamedExpression],
+ child: SparkPlan) = {
+ // A single aggregate expression might appear multiple times in resultExpressions.
+ // In order to avoid evaluating an individual aggregate function multiple times, we'll
+ // build a set of the distinct aggregate expressions and build a function which can
+ // be used to re-write expressions so that they reference the single copy of the
+ // aggregate function which actually gets computed.
+ val aggregateExpressions = resultExpressions.flatMap { expr =>
+ expr.collect {
+ case agg: AggregateExpression => agg
}
+ }.distinct
+ // For those distinct aggregate expressions, we create a map from the
+ // aggregate function to the corresponding attribute of the function.
+ val aggregateFunctionToAttribute = aggregateExpressions.map { agg =>
+ val aggregateFunction = agg.aggregateFunction
+ val attribute = Alias(aggregateFunction, aggregateFunction.toString)().toAttribute
+ (aggregateFunction, agg.isDistinct) -> attribute
+ }.toMap
+
+ val (functionsWithDistinct, functionsWithoutDistinct) =
+ aggregateExpressions.partition(_.isDistinct)
+ if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
+ // This is a sanity check. We should not reach here when we have multiple distinct
+ // column sets. Our MultipleDistinctRewriter should take care this case.
+ sys.error("You hit a query analyzer bug. Please report your query to " +
+ "Spark user mailing list.")
}
+
+ val namedGroupingExpressions = groupingExpressions.map {
+ case ne: NamedExpression => ne -> ne
+ // If the expression is not a NamedExpressions, we add an alias.
+ // So, when we generate the result of the operator, the Aggregate Operator
+ // can directly get the Seq of attributes representing the grouping expressions.
+ case other =>
+ val withAlias = Alias(other, other.toString)()
+ other -> withAlias
+ }
+ val groupExpressionMap = namedGroupingExpressions.toMap
+
+ // The original `resultExpressions` are a set of expressions which may reference
+ // aggregate expressions, grouping column values, and constants. When aggregate operator
+ // emits output rows, we will use `resultExpressions` to generate an output projection
+ // which takes the grouping columns and final aggregate result buffer as input.
+ // Thus, we must re-write the result expressions so that their attributes match up with
+ // the attributes of the final result projection's input row:
+ val rewrittenResultExpressions = resultExpressions.map { expr =>
+ expr.transformDown {
+ case AggregateExpression(aggregateFunction, _, isDistinct) =>
+ // The final aggregation buffer's attributes will be `finalAggregationAttributes`,
+ // so replace each aggregate expression by its corresponding attribute in the set:
+ aggregateFunctionToAttribute(aggregateFunction, isDistinct)
+ case expression =>
+ // Since we're using `namedGroupingAttributes` to extract the grouping key
+ // columns, we need to replace grouping key expressions with their corresponding
+ // attributes. We do not rely on the equality check at here since attributes may
+ // differ cosmetically. Instead, we use semanticEquals.
+ groupExpressionMap.collectFirst {
+ case (expr, ne) if expr semanticEquals expression => ne.toAttribute
+ }.getOrElse(expression)
+ }.asInstanceOf[NamedExpression]
+ }
+
+ val aggregateOperator =
+ if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
+ if (functionsWithDistinct.nonEmpty) {
+ sys.error("Distinct columns cannot exist in Aggregate operator containing " +
+ "aggregate functions which don't support partial aggregation.")
+ } else {
+ Utils.planAggregateWithoutPartial(
+ namedGroupingExpressions.map(_._2),
+ aggregateExpressions,
+ aggregateFunctionToAttribute,
+ rewrittenResultExpressions,
+ child)
+ }
+ } else if (functionsWithDistinct.isEmpty) {
+ Utils.planAggregateWithoutDistinct(
+ namedGroupingExpressions.map(_._2),
+ aggregateExpressions,
+ aggregateFunctionToAttribute,
+ rewrittenResultExpressions,
+ child)
+ } else {
+ Utils.planAggregateWithOneDistinct(
+ namedGroupingExpressions.map(_._2),
+ functionsWithDistinct,
+ functionsWithoutDistinct,
+ aggregateFunctionToAttribute,
+ rewrittenResultExpressions,
+ child)
+ }
+
+ aggregateOperator
}
private def canPushDownJoin(otherRDDPlan: LogicalPlan,
@@ -333,8 +406,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
ExecutedCommand(ShowAllTablesDetail(schemaName, plan.output)) :: Nil
case DropTable(tableName, ifNotExists)
if CarbonEnv.getInstance(sqlContext).carbonCatalog
- .cubeExists(Seq(tableName.toLowerCase()))(sqlContext) =>
- ExecutedCommand(DropCubeCommand(ifNotExists, None, tableName.toLowerCase())) :: Nil
+ .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) =>
+ ExecutedCommand(DropCubeCommand(ifNotExists, None, tableName.toLowerCase)) :: Nil
case ShowAggregateTablesCommand(schemaName) =>
ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil
case ShowLoadsCommand(schemaName, cube, limit) =>
@@ -342,7 +415,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
case LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath,
partionValues, isOverwriteExist, inputSqlString) =>
val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .cubeExists(schemaNameOp, cubeName)(sqlContext)
+ .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext)
if (isCarbonTable || partionValues.nonEmpty) {
ExecutedCommand(LoadCube(schemaNameOp, cubeName, factPathFromUser,
dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: Nil
@@ -360,7 +433,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
}
case DescribeFormattedCommand(sql, tblIdentifier) =>
val isCube = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .cubeExists(tblIdentifier)(sqlContext)
+ .tableExists(tblIdentifier)(sqlContext)
if (isCube) {
val describe =
LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), isExtended = false)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
index c4c214d..58c02ff 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
@@ -34,7 +34,7 @@ private[sql] object CarbonStrategy {
}
}
-private[spark] class CarbonSQLDialect extends HiveQLDialect {
+private[spark] class CarbonSQLDialect(context: HiveContext) extends HiveQLDialect(context) {
@transient
protected val sqlParser = new CarbonSqlParser
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
index a975317..d80c065 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonOptimizer.scala
@@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions.{AggregateExpression, Attribute, _}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -122,7 +123,7 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
}
var child = agg.child
// Incase if the child also aggregate then push down decoder to child
- if (attrsOndimAggs.size() > 0 && !child.isInstanceOf[Aggregate]) {
+ if (attrsOndimAggs.size() > 0 && !(child.equals(agg))) {
child = CarbonDictionaryTempDecoder(attrsOndimAggs,
new util.HashSet[Attribute](),
agg.child)
@@ -398,10 +399,10 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
allAttrsNotDecode: util.Set[Attribute],
aliasMap: CarbonAliasDecoderRelation) = {
val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = relations.find(p => p.attributeMap.contains(uAttr))
+ val relation = relations.find(p => p.contains(uAttr))
if (relation.isDefined) {
relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
- case Some(true) if !allAttrsNotDecode.contains(uAttr) =>
+ case Some(true) if !allAttrsNotDecode.asScala.exists(p => p.name.equals(uAttr.name)) =>
val newAttr = AttributeReference(attr.name,
IntegerType,
attr.nullable,
@@ -419,7 +420,7 @@ class CarbonOptimizer(optimizer: Optimizer, conf: CatalystConf)
relations: Seq[CarbonDecoderRelation],
aliasMap: CarbonAliasDecoderRelation): Boolean = {
val uAttr = aliasMap.getOrElse(attr, attr)
- val relation = relations.find(p => p.attributeMap.contains(uAttr))
+ val relation = relations.find(p => p.contains(uAttr))
if (relation.isDefined) {
relation.get.carbonRelation.carbonRelation.metaData.dictionaryMap.get(uAttr.name) match {
case Some(true) => true
@@ -459,9 +460,16 @@ case class CarbonDecoderRelation(
}
def contains(attr: Attribute): Boolean = {
- attributeMap
- .exists(entry => entry._1.name.equals(attr.name) && entry._1.exprId.equals(attr.exprId)) ||
- extraAttrs.exists(entry => entry.name.equals(attr.name) && entry.exprId.equals(attr.exprId))
+ var exists =
+ attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name) &&
+ entry._1.exprId.equals(attr.exprId)) ||
+ extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) &&
+ entry.exprId.equals(attr.exprId))
+ if(!exists) {
+ exists = attributeMap.exists(entry => entry._1.name.equalsIgnoreCase(attr.name)) ||
+ extraAttrs.exists(entry => entry.name.equalsIgnoreCase(attr.name) )
+ }
+ exists
}
}
@@ -474,7 +482,8 @@ case class CarbonAliasDecoderRelation() {
}
def getOrElse(key: Attribute, default: Attribute): Attribute = {
- val value = attrMap.find(p => p._1.name.equals(key.name) && p._1.exprId.equals(key.exprId))
+ val value = attrMap.find(p =>
+ p._1.name.equalsIgnoreCase(key.name) && p._1.exprId.equals(key.exprId))
value match {
case Some((k, v)) => v
case _ => default
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
index 8c6df4c..aee375a 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonFilters.scala
@@ -27,6 +27,9 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.optimizer.CarbonAliasDecoderRelation
import org.apache.spark.sql.types.StructType
+import org.carbondata.core.carbon.metadata.datatype.DataType
+import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
import org.carbondata.query.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
import org.carbondata.query.expression.conditional._
import org.carbondata.query.expression.logical.{AndExpression, OrExpression}
@@ -160,7 +163,8 @@ object CarbonFilters {
def processExpression(exprs: Seq[Expression],
attributesNeedToDecode: java.util.HashSet[AttributeReference],
- unprocessedExprs: ArrayBuffer[Expression]): Option[CarbonExpression] = {
+ unprocessedExprs: ArrayBuffer[Expression],
+ carbonTable: CarbonTable): Option[CarbonExpression] = {
def transformExpression(expr: Expression): Option[CarbonExpression] = {
expr match {
case Or(left, right) =>
@@ -208,8 +212,9 @@ object CarbonFilters {
new ListExpression(list.map(transformExpression(_).get).asJava)))
case AttributeReference(name, dataType, _, _) =>
- Some(new CarbonColumnExpression(name.toString,
- CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+ Some(new CarbonColumnExpression(name,
+ CarbonScalaUtil.convertSparkToCarbonDataType(
+ getActualCarbonDataType(name, carbonTable))))
case FakeCarbonCast(literal, dataType) => transformExpression(literal)
case Literal(name, dataType) => Some(new
CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
@@ -225,4 +230,19 @@ object CarbonFilters {
exprs.flatMap(transformExpression).reduceOption(new AndExpression(_, _))
}
+ private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
+ var carbonColumn: CarbonColumn =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
+ val dataType = if (carbonColumn != null) {
+ carbonColumn.getDataType
+ } else {
+ carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
+ carbonColumn.getDataType match {
+ case DataType.LONG => DataType.LONG
+ case DataType.DECIMAL => DataType.DECIMAL
+ case _ => DataType.DOUBLE
+ }
+ }
+ CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
index b91cf98..12f3dc4 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/CarbonOption.scala
@@ -21,7 +21,7 @@ package org.carbondata.spark
* Contains all options for Spark data source
*/
class CarbonOption(options: Map[String, String]) {
- def tableIdentifier: String = options.getOrElse("cubeName", s"$dbName.$tableName")
+ def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
def dbName: String = options.getOrElse("dbName", "default")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
index d906745..6ee882b 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/KeyVal.scala
@@ -47,6 +47,14 @@ class RawKeyValImpl extends RawKeyVal[BatchRawResult, Any] {
override def getKey(key: BatchRawResult, value: Any): (BatchRawResult, Any) = (key, value)
}
+trait RawKey[K, V] extends Serializable {
+ def getKey(key: Array[Any], value: Any): (K, V)
+
+}
+
+class RawKeyImpl extends RawKey[Array[Any], Any] {
+ override def getKey(key: Array[Any], value: Any): (Array[Any], Any) = (key, value)
+}
trait Result[K, V] extends Serializable {
def getKey(key: Int, value: LoadMetadataDetails): (K, V)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
deleted file mode 100644
index 65e7538..0000000
--- a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
+++ /dev/null
@@ -1,807 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.agg
-
-import scala.language.implicitConversions
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types._
-
-import org.carbondata.query.aggregator.MeasureAggregator
-import org.carbondata.query.aggregator.impl._
-
-case class CountCarbon(child: Expression) extends UnaryExpression with PartialAggregate1 {
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"COUNT($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialCount = Alias(CountCarbon(child), "PartialCount")()
- SplitEvaluation(CountCarbonFinal(partialCount.toAttribute, LongType), partialCount :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new CountFunctionCarbon(child, this, false)
-
- implicit def toAggregates(aggregate: MeasureAggregator): Double = aggregate.getDoubleValue()
-}
-
-case class CountCarbonFinal(child: Expression, origDataType: DataType)
- extends AggregateExpression1 {
-
- override def children: Seq[Expression] = child :: Nil
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"COUNT($child)"
-
- override def newInstance(): AggregateFunction1 = new CountFunctionCarbon(child, this, true)
-}
-
-
-case class CountDistinctCarbon(child: Expression) extends PartialAggregate1 {
- override def children: Seq[Expression] = child :: Nil
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"COUNT(DISTINCT ($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialSet = Alias(CountDistinctCarbon(child), "partialSets")()
- SplitEvaluation(
- CountDistinctCarbonFinal(partialSet.toAttribute, LongType),
- partialSet :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new CountDistinctFunctionCarbon(child, this)
-}
-
-case class CountDistinctCarbonFinal(inputSet: Expression, origDataType: DataType)
- extends AggregateExpression1 {
- override def children: Seq[Expression] = inputSet :: Nil
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"COUNTFINAL(DISTINCT ${ inputSet }})"
-
- override def newInstance(): AggregateFunction1 = {
- new CountDistinctFunctionCarbonFinal(inputSet, this)
- }
-}
-
-case class AverageCarbon(child: Expression, castedDataType: DataType = null)
- extends UnaryExpression with PartialAggregate1 {
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"AVGCarbon($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialSum = Alias(AverageCarbon(child), "PartialAverage")()
- SplitEvaluation(
- AverageCarbonFinal(partialSum.toAttribute,
- child.dataType match {
- case IntegerType | StringType | LongType | TimestampType => DoubleType
- case _ => child.dataType
- }),
- partialSum :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new AverageFunctionCarbon(child, this, false)
-}
-
-case class AverageCarbonFinal(child: Expression, origDataType: DataType)
- extends AggregateExpression1 {
- override def children: Seq[Expression] = child :: Nil
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"AVG($child)"
-
- override def newInstance(): AggregateFunction1 = new AverageFunctionCarbon(child, this, true)
-}
-
-case class SumCarbon(child: Expression, castedDataType: DataType = null)
- extends UnaryExpression with PartialAggregate1 {
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"SUMCarbon($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialSum = Alias(SumCarbon(child), "PartialSum")()
- SplitEvaluation(
- SumCarbonFinal(partialSum.toAttribute,
- if (castedDataType != null) castedDataType else child.dataType),
- partialSum :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new SumFunctionCarbon(child, this, false)
-
- implicit def toAggregates(aggregate: MeasureAggregator): Double = aggregate.getDoubleValue()
-}
-
-case class SumCarbonFinal(child: Expression, origDataType: DataType) extends AggregateExpression1 {
- override def children: Seq[Expression] = child :: Nil
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"SUMCarbonFinal($child)"
-
- override def newInstance(): AggregateFunction1 = new SumFunctionCarbon(child, this, true)
-}
-
-case class MaxCarbon(child: Expression, castedDataType: DataType = null)
- extends UnaryExpression with PartialAggregate1 {
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"MaxCarbon($child)"
-
- // to do partialSum to PartialMax many places
- override def asPartial: SplitEvaluation = {
- val partialSum = Alias(MaxCarbon(child), "PartialMax")()
- SplitEvaluation(
- MaxCarbonFinal(partialSum.toAttribute,
- if (castedDataType != null) castedDataType else child.dataType),
- partialSum :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new MaxFunctionCarbon(child, this, false)
-}
-
-case class MaxCarbonFinal(child: Expression, origDataType: DataType) extends AggregateExpression1 {
- override def children: Seq[Expression] = child :: Nil
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"MaxCarbonFinal($child)"
-
- override def newInstance(): AggregateFunction1 = new MaxFunctionCarbon(child, this, true)
-}
-
-case class MinCarbon(child: Expression, castedDataType: DataType = null)
- extends UnaryExpression with PartialAggregate1 {
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"MinCarbon($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialSum = Alias(MinCarbon(child), "PartialMin")()
- SplitEvaluation(
- MinCarbonFinal(partialSum.toAttribute,
- if (castedDataType != null) castedDataType else child.dataType),
- partialSum :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new MinFunctionCarbon(child, this, false)
-}
-
-case class MinCarbonFinal(child: Expression, origDataType: DataType) extends AggregateExpression1 {
- override def children: Seq[Expression] = child :: Nil
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"MinCarbonFinal($child)"
-
- override def newInstance(): AggregateFunction1 = new MinFunctionCarbon(child, this, true)
-}
-
-case class SumDistinctCarbon(child: Expression, castedDataType: DataType = null)
- extends UnaryExpression with PartialAggregate1 {
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"PARTIAL_SUM_DISTINCT($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialSum = Alias(SumDistinctCarbon(child), "PartialSumDistinct")()
- SplitEvaluation(
- SumDistinctFinalCarbon(partialSum.toAttribute,
- if (castedDataType != null) {
- castedDataType
- } else {
- child.dataType
- }),
- partialSum :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = {
- new SumDisctinctFunctionCarbon(child, this, false)
- }
-}
-
-case class SumDistinctFinalCarbon(child: Expression, origDataType: DataType)
- extends AggregateExpression1 {
- override def children: Seq[Expression] = child :: Nil
-
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = false
-
- override def dataType: DataType = origDataType
-
- override def toString: String = s"FINAL_SUM_DISTINCT($child)"
-
- override def newInstance(): AggregateFunction1 = new SumDisctinctFunctionCarbon(child, this, true)
-}
-
-case class FirstCarbon(child: Expression, origDataType: DataType = MeasureAggregatorUDT)
- extends UnaryExpression with PartialAggregate1 {
- override def references: AttributeSet = child.references
-
- override def nullable: Boolean = child.nullable
-
- override def dataType: DataType = MeasureAggregatorUDT
-
- override def toString: String = s"FIRST($child)"
-
- override def asPartial: SplitEvaluation = {
- val partialFirst = Alias(FirstCarbon(child), "PartialFirst")()
- SplitEvaluation(
- FirstCarbon(partialFirst.toAttribute, child.dataType),
- partialFirst :: Nil)
- }
-
- override def newInstance(): AggregateFunction1 = new FirstFunctionCarbon(child, this)
-}
-
-case class AverageFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
- extends AggregateFunction1 {
-
- def this() = this(null, null, false) // Required for serialization.
-
- // var count: Int = _
- private var avg: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case s =>
- var dc: MeasureAggregator = null
- if (s != null) {
- s match {
- case v: java.math.BigDecimal =>
- dc = new AvgBigDecimalAggregator
- dc.agg(new java.math.BigDecimal(s.toString))
- dc.setNewValue(new java.math.BigDecimal(s.toString))
- case l: Long =>
- dc = new AvgLongAggregator
- dc.agg(s.toString.toLong)
- dc.setNewValue(s.toString.toLong)
- case _ =>
- dc = new AvgDoubleAggregator
- dc.agg(s.toString.toDouble)
- dc.setNewValue(s.toString.toDouble)
- }
- }
- else {
- dc = new AvgDoubleAggregator()
- }
- dc
- }
- if (avg == null) {
- avg = agg
- } else {
- avg.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = {
- if (finalAgg) {
- if (avg.isFirstTime) {
- null
- } else {
- avg match {
- case avg: AvgBigDecimalAggregator =>
- Cast(Literal(avg.getBigDecimalValue), base.dataType).eval(null)
- case avg: AvgLongAggregator =>
- Cast(Literal(avg.getLongValue), base.dataType).eval(null)
- case _ =>
- Cast(Literal(avg.getDoubleValue), base.dataType).eval(null)
- }
- }
- } else {
- avg
- }
- }
-}
-
-case class CountFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
- extends AggregateFunction1 {
- def this() = this(null, null, false) // Required for serialization.
-
- private var count: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case m: MeasureAggregator => m
- case others =>
- val agg1: MeasureAggregator = new CountAggregator
- if (others != null) {
- agg1.agg(0)
- }
- agg1
- }
- if (count == null) {
- count = agg
- } else {
- count.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = {
- if (finalAgg && count != null) {
- if (count.isFirstTime) {
- 0L
- } else {
- Cast(Literal(count.getDoubleValue), base.dataType).eval(null)
- }
- } else {
- count
- }
- }
-
-}
-
-
-case class SumFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
- extends AggregateFunction1 {
- def this() = this(null, null, false) // Required for serialization.
-
- private var sum: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case s =>
- var dc: MeasureAggregator = null
- if (s != null) {
- s match {
- case bd: java.math.BigDecimal =>
- dc = new SumBigDecimalAggregator
- dc.agg(new java.math.BigDecimal(s.toString))
- dc.setNewValue(new java.math.BigDecimal(s.toString))
- case l: Long =>
- dc = new SumLongAggregator
- dc.agg(s.toString.toLong)
- dc.setNewValue(s.toString.toLong)
- case _ =>
- dc = new SumDoubleAggregator
- dc.agg(s.toString.toDouble)
- dc.setNewValue(s.toString.toDouble)
- }
- }
- else {
- dc = new SumDoubleAggregator
- }
- dc
- }
- if (sum == null) {
- sum = agg
- } else {
- sum.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = {
- if (finalAgg && sum != null) {
- if (sum.isFirstTime) {
- null
- } else {
- sum match {
- case s: SumBigDecimalAggregator =>
- Cast(Literal(sum.getBigDecimalValue), base.dataType).eval(input)
- case s: SumLongAggregator =>
- Cast(Literal(sum.getLongValue), base.dataType).eval(input)
- case _ =>
- Cast(Literal(sum.getDoubleValue), base.dataType).eval(input)
- }
- }
- } else {
- sum
- }
- }
-}
-
-case class MaxFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
- extends AggregateFunction1 {
- def this() = this(null, null, false) // Required for serialization.
-
- private var max: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case s =>
- val dc = new MaxAggregator
- if (s != null) {
- dc.agg(s.toString.toDouble)
- dc.setNewValue(s.toString.toDouble)
- }
- dc
- }
- if (max == null) {
- max = agg
- } else {
- max.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = {
- if (finalAgg && max != null) {
- if (max.isFirstTime) {
- null
- } else {
- Cast(Literal(max.getValueObject), base.dataType).eval(null)
- }
- } else {
- max
- }
- }
-}
-
-case class MinFunctionCarbon(expr: Expression, base: AggregateExpression1, finalAgg: Boolean)
- extends AggregateFunction1 {
- def this() = this(null, null, false) // Required for serialization.
-
- private var min: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case s =>
- val dc: MeasureAggregator = new MinAggregator
- if (s != null) {
- dc.agg(s.toString.toDouble)
- dc.setNewValue(s.toString.toDouble)
- }
- dc
- }
- if (min == null) {
- min = agg
- } else {
- min.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = {
- if (finalAgg && min != null) {
- if (min.isFirstTime) {
- null
- } else {
- Cast(Literal(min.getValueObject), base.dataType).eval(null)
- }
- } else {
- min
- }
- }
-}
-
-case class SumDisctinctFunctionCarbon(expr: Expression, base: AggregateExpression1,
- isFinal: Boolean)
- extends AggregateFunction1 {
-
- def this() = this(null, null, false) // Required for serialization.
-
- private var distinct: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
-
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case null => null
- case s =>
- var dc: MeasureAggregator = null
- s match {
- case Double =>
- dc = new SumDistinctDoubleAggregator
- dc.setNewValue(s.toString.toDouble)
- case Int =>
- dc = new SumDistinctLongAggregator
- dc.setNewValue(s.toString.toLong)
- case bd: java.math.BigDecimal =>
- dc = new SumDistinctBigDecimalAggregator
- dc.setNewValue(new java.math.BigDecimal(s.toString))
- case _ =>
- }
- dc
- }
- if (distinct == null) {
- distinct = agg
- } else {
- distinct.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any =
- // in case of empty load it was failing so added null check.
- {
- if (isFinal && distinct != null) {
- if (distinct.isFirstTime) {
- null
- }
- else {
- Cast(Literal(distinct.getValueObject), base.dataType).eval(null)
- }
- }
- else {
- distinct
- }
- }
-}
-
-case class CountDistinctFunctionCarbon(expr: Expression, base: AggregateExpression1)
- extends AggregateFunction1 {
-
- def this() = this(null, null) // Required for serialization.
-
- private var count: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case null => null
- case s =>
- val dc = new DistinctCountAggregatorObjectSet
- dc.setNewValue(s.toString)
- dc
- }
- if (count == null) {
- count = agg
- } else {
- count.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = count
-}
-
-case class CountDistinctFunctionCarbonFinal(expr: Expression, base: AggregateExpression1)
- extends AggregateFunction1 {
-
- def this() = this(null, null) // Required for serialization.
-
- private var count: MeasureAggregator = null
-
- override def update(input: InternalRow): Unit = {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
- val agg = resolution match {
- case s: MeasureAggregator => s
- case null => null
- case s =>
- val dc = new DistinctCountAggregatorObjectSet
- dc.setNewValue(s.toString)
- dc
- }
- if (count == null) {
- count = agg
- } else {
- count.merge(agg)
- }
- }
-
- override def eval(input: InternalRow): Any = {
- if (count == null) {
- Cast(Literal(0), base.dataType).eval(null)
- } else if (count.isFirstTime) {
- Cast(Literal(0), base.dataType).eval(null)
- } else {
- Cast(Literal(count.getDoubleValue), base.dataType).eval(null)
- }
- }
-}
-
-case class FirstFunctionCarbon(expr: Expression, base: AggregateExpression1)
- extends AggregateFunction1 {
- def this() = this(null, null) // Required for serialization.
-
- var result: Any = null
-
- override def update(input: InternalRow): Unit = {
- if (result == null) {
- val br = expr.collectFirst({ case a@BoundReference(_, _, _) => a })
- val resolution =
- if (br.isDefined) {
- input.get(br.get.ordinal, MeasureAggregatorUDT)
- } else {
- expr.eval(input)
- }
-
- result = resolution
- }
- }
-
- override def eval(input: InternalRow): Any = Cast(Literal(result), base.dataType).eval(null)
-}
-
-case class FlattenExpr(expr: Expression) extends Expression with CodegenFallback {
- self: Product =>
-
- override def children: Seq[Expression] = Seq(expr)
-
- override def dataType: DataType = expr.dataType
-
- override def nullable: Boolean = expr.nullable
-
- override def references: AttributeSet = AttributeSet(expr.flatMap(_.references.iterator))
-
- override def foldable: Boolean = expr.foldable
-
- override def toString: String = "Flatten(" + expr.toString + ")"
-
- type EvaluatedType = Any
-
- override def eval(input: InternalRow): Any = {
- expr.eval(input) match {
- case d: MeasureAggregator => d.getDoubleValue
- case others => others
- }
- }
-}
-
-case class FlatAggregatorsExpr(expr: Expression) extends Expression with CodegenFallback {
- self: Product =>
-
- override def children: Seq[Expression] = Seq(expr)
-
- override def dataType: DataType = expr.dataType
-
- override def nullable: Boolean = expr.nullable
-
- override def references: AttributeSet = AttributeSet(expr.flatMap(_.references.iterator))
-
- override def foldable: Boolean = expr.foldable
-
- override def toString: String = "FlattenAggregators(" + expr.toString + ")"
-
- type EvaluatedType = Any
-
- override def eval(input: InternalRow): Any = {
- expr.eval(input) match {
- case d: MeasureAggregator =>
- d.setNewValue(d.getDoubleValue)
- d
- case others => others
- }
- }
-}
-
-case class PositionLiteral(expr: Expression, intermediateDataType: DataType)
- extends LeafExpression with CodegenFallback {
- override def dataType: DataType = expr.dataType
-
- override def nullable: Boolean = false
-
- type EvaluatedType = Any
- var position = -1
-
- def setPosition(pos: Int): Unit = position = pos
-
- override def toString: String = s"PositionLiteral($position : $expr)"
-
- override def eval(input: InternalRow): Any = {
- if (position != -1) {
- input.get(position, intermediateDataType)
- } else {
- expr.eval(input)
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala b/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala
deleted file mode 100644
index ea0c8af..0000000
--- a/integration/spark/src/main/scala/org/carbondata/spark/agg/MeasureAggregatorUDT.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.agg
-
-import org.apache.spark.sql.types._
-
-import org.carbondata.query.aggregator.MeasureAggregator
-
-/**
- * class to support user defined type for carbon measure aggregators
- * from spark 1.5, spark has made the data type strict and ANY is no more supported
- * for every data, we need to give the data type
- */
-class MeasureAggregatorUDT extends UserDefinedType[MeasureAggregator] {
- // the default DoubleType is Ok as we are not going to pass to spark sql to
- // evaluate,need to add this for compilation errors
- override def sqlType: DataType = {
- ArrayType(DoubleType, containsNull = false)
- }
-
- override def serialize(obj: Any): Any = {
- obj match {
- case p: MeasureAggregator => p
- }
- }
-
- override def deserialize(datum: Any): MeasureAggregator = {
- datum match {
- case values =>
- val xy = values.asInstanceOf[MeasureAggregator]
- xy
- }
- }
-
- override def userClass: Class[MeasureAggregator] = classOf[MeasureAggregator]
-
- override def asNullable: MeasureAggregatorUDT = this
-}
-
-case object MeasureAggregatorUDT extends MeasureAggregatorUDT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ded665e..0672281 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -22,7 +22,7 @@ import java.util
import java.util.concurrent.{Executors, ExecutorService}
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks._
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -189,9 +189,7 @@ object CarbonDataRDDFactory extends Logging {
if (carbonLock.lockWithRetries()) {
logInfo("Successfully got the table metadata file lock")
if (updatedLoadMetadataDetailsList.nonEmpty) {
- LoadAggregateTabAfterRetention(schemaName, cube.getFactTableName, cube.getFactTableName,
- sqlContext, schema, updatedLoadMetadataDetailsList
- )
+ // TODO: Load Aggregate tables after retention.
}
// write
@@ -217,56 +215,6 @@ object CarbonDataRDDFactory extends Logging {
}
}
- def LoadAggregateTabAfterRetention(
- schemaName: String,
- cubeName: String,
- factTableName: String,
- sqlContext: SQLContext,
- schema: CarbonDataLoadSchema,
- list: ListBuffer[LoadMetadataDetails]) {
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
- Option(schemaName),
- cubeName,
- None
- )(sqlContext).asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $schemaName.$cubeName does not exist")
- }
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(cubeName)
- carbonLoadModel.setDatabaseName(schemaName)
- val table = relation.cubeMeta.carbonTable
- val aggTables = schema.getCarbonTable.getAggregateTablesName
- if (null != aggTables && !aggTables.isEmpty) {
- carbonLoadModel.setRetentionRequest(true)
- carbonLoadModel.setLoadMetadataDetails(list.asJava)
- carbonLoadModel.setTableName(table.getFactTableName)
- carbonLoadModel
- .setCarbonDataLoadSchema(new CarbonDataLoadSchema(relation.cubeMeta.carbonTable))
- // TODO: need to fill dimension relation from data load sql command
- var storeLocation = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir")
- )
- storeLocation = storeLocation + "/carbonstore/" + System.currentTimeMillis()
- val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
- var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
- if (null == kettleHomePath) {
- kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
- }
- if (kettleHomePath == null) {
- sys.error(s"carbon.kettle.home is not set")
- }
- CarbonDataRDDFactory.loadCarbonData(
- sqlContext,
- carbonLoadModel,
- storeLocation,
- relation.cubeMeta.storePath,
- kettleHomePath,
- relation.cubeMeta.partitioner, columinar, isAgg = true)
- }
- }
-
def configSplitMaxSize(context: SparkContext, filePaths: String,
hadoopConfiguration: Configuration): Unit = {
val defaultParallelism = if (context.defaultParallelism < 1) 1 else context.defaultParallelism
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
index 7dcb33b..5993677 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
@@ -25,8 +25,9 @@ import org.carbondata.core.iterator.CarbonIterator
import org.carbondata.query.carbon.executor.QueryExecutorFactory
import org.carbondata.query.carbon.model.QueryModel
import org.carbondata.query.carbon.result.BatchRawResult
+import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor
import org.carbondata.query.expression.Expression
-import org.carbondata.spark.RawKeyVal
+import org.carbondata.spark.{RawKey, RawKeyVal}
/**
@@ -48,7 +49,7 @@ class CarbonRawQueryRDD[K, V](
sc: SparkContext,
queryModel: QueryModel,
filterExpression: Expression,
- keyClass: RawKeyVal[K, V],
+ keyClass: RawKey[K, V],
@transient conf: Configuration,
cubeCreationTime: Long,
schemaLastUpdatedTime: Long,
@@ -66,7 +67,7 @@ class CarbonRawQueryRDD[K, V](
override def compute(thepartition: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
val iter = new Iterator[(K, V)] {
- var rowIterator: CarbonIterator[BatchRawResult] = _
+ var rowIterator: CarbonIterator[Array[Any]] = _
var queryStartTime: Long = 0
try {
val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
@@ -83,8 +84,9 @@ class CarbonRawQueryRDD[K, V](
System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties");
}
// execute query
- rowIterator = QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
- .asInstanceOf[CarbonIterator[BatchRawResult]]
+ rowIterator = new ChunkRawRowIterartor(
+ QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
+ .asInstanceOf[CarbonIterator[BatchRawResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
}
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
index 6cd9986..9bc1a64 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/CarbonScalaUtil.scala
@@ -105,18 +105,22 @@ object CarbonScalaUtil {
}
}
+ def convertValueToSparkDataType(value: Any,
+ dataType: org.apache.spark.sql.types.DataType): Any = {
+ dataType match {
+ case StringType => value.toString
+ case IntegerType => value.toString.toInt
+ case LongType => value.toString.toLong
+ case DoubleType => value.toString.toDouble
+ case FloatType => value.toString.toFloat
+ case _ => value.toString.toDouble
+ }
+ }
+
case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
object CarbonSparkUtil {
- def createBaseRDD(carbonContext: CarbonContext, carbonTable: CarbonTable): TransformHolder = {
- val relation = CarbonEnv.getInstance(carbonContext).carbonCatalog
- .lookupRelation1(Option(carbonTable.getDatabaseName),
- carbonTable.getFactTableName, None)(carbonContext).asInstanceOf[CarbonRelation]
- val rdd = new SchemaRDD(carbonContext, relation)
- rdd.registerTempTable(carbonTable.getFactTableName)
- TransformHolder(rdd, createSparkMeta(carbonTable))
- }
def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
index 121a3a5..ebc8ead 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -395,7 +395,7 @@ object GlobalDictionaryUtil extends Logging {
// update CarbonDataLoadSchema
val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
- model.table.getTableName, None)(sqlContext)
+ model.table.getTableName)(sqlContext)
.asInstanceOf[CarbonRelation].cubeMeta.carbonTable
carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/resources/datawithmaxinteger.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datawithmaxinteger.csv b/integration/spark/src/test/resources/datawithmaxinteger.csv
new file mode 100644
index 0000000..52dfdfc
--- /dev/null
+++ b/integration/spark/src/test/resources/datawithmaxinteger.csv
@@ -0,0 +1,12 @@
+imei,age
+1AA1,10
+1AA2,26
+1AA3,10
+1AA4,10
+1AA5,20
+1AA6,10
+1AA7,10
+1AA8,10
+1AA9,10
+1AA10,10
+1AA12,2147483647
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/resources/datawithmaxmininteger.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datawithmaxmininteger.csv b/integration/spark/src/test/resources/datawithmaxmininteger.csv
new file mode 100644
index 0000000..5677a40
--- /dev/null
+++ b/integration/spark/src/test/resources/datawithmaxmininteger.csv
@@ -0,0 +1,13 @@
+imei,age
+1AA1,10
+1AA2,26
+1AA3,10
+1AA4,10
+1AA5,20
+1AA6,10
+1AA7,10
+1AA8,10
+1AA9,10
+1AA10,10
+1AA11,-2147483648
+1AA12,2147483647
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/resources/datawithmininteger.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/datawithmininteger.csv b/integration/spark/src/test/resources/datawithmininteger.csv
new file mode 100644
index 0000000..cc34efa
--- /dev/null
+++ b/integration/spark/src/test/resources/datawithmininteger.csv
@@ -0,0 +1,12 @@
+imei,age
+1AA1,10
+1AA2,26
+1AA3,10
+1AA4,10
+1AA5,20
+1AA6,10
+1AA7,10
+1AA8,10
+1AA9,10
+1AA10,10
+1AA11,-2147483648
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index eaa249e..5811d3a 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -19,12 +19,12 @@ package org.apache.spark.sql.common.util
import java.util.{Locale, TimeZone}
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import scala.collection.JavaConversions._
+
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.columnar.InMemoryRelation
-
-import scala.collection.JavaConversions._
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
class QueryTest extends PlanTest {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/ead0076b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala
new file mode 100644
index 0000000..26c88f8
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithMaxMinInteger.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.integration.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+ * Test Class for data loading when there are min integer value in int column
+ *
+ */
+class TestLoadDataWithMaxMinInteger extends QueryTest with BeforeAndAfterAll {
+ override def beforeAll {
+ sql("drop table if exists integer_table_01")
+ sql("drop table if exists integer_table_02")
+ sql("drop table if exists integer_table_03")
+ }
+ test("test carbon table data loading when the int column " +
+ "contains min integer value") {
+ sql(
+ """
+ CREATE TABLE integer_table_01(imei string,age int)
+ STORED BY 'org.apache.carbondata.format'
+ """)
+ sql(
+ """
+ LOAD DATA INPATH './src/test/resources/datawithmininteger.csv'
+ INTO table integer_table_01 options ('DELIMITER'=',',
+ 'QUOTECHAR'='"', 'FILEHEADER'= 'imei,age')
+ """)
+ checkAnswer(sql("select age from integer_table_01"),
+ Seq(Row(10.0), Row(26.0), Row(10.0), Row(10.0), Row(20.0),
+ Row(10.0), Row(10.0), Row(10.0), Row(10.0), Row(10.0),
+ Row(-2147483648.0)))
+ }
+
+ test("test carbon table data loading when the int column " +
+ "contains max integer value") {
+ sql(
+ """
+ CREATE TABLE integer_table_02(imei string,age int)
+ STORED BY 'org.apache.carbondata.format'
+ """)
+ sql(
+ """
+ LOAD DATA INPATH './src/test/resources/datawithmaxinteger.csv'
+ INTO table integer_table_02 options ('DELIMITER'=',',
+ 'QUOTECHAR'='"', 'FILEHEADER'= 'imei,age')
+ """)
+ checkAnswer(sql("select age from integer_table_02"),
+ Seq(Row(10.0), Row(26.0), Row(10.0), Row(10.0), Row(20.0),
+ Row(10.0), Row(10.0), Row(10.0), Row(10.0), Row(10.0),
+ Row(2147483647.0)))
+ }
+
+ test("test carbon table data loading when the int column " +
+ "contains min and max integer value") {
+ sql(
+ """
+ CREATE TABLE integer_table_03(imei string,age int)
+ STORED BY 'org.apache.carbondata.format'
+ """)
+ sql(
+ """
+ LOAD DATA INPATH './src/test/resources/datawithmaxmininteger.csv'
+ INTO table integer_table_03 options ('DELIMITER'=',',
+ 'QUOTECHAR'='"', 'FILEHEADER'= 'imei,age')
+ """)
+ checkAnswer(sql("select age from integer_table_03"),
+ Seq(Row(10.0), Row(26.0), Row(10.0), Row(10.0), Row(20.0),
+ Row(10.0), Row(10.0), Row(10.0), Row(10.0), Row(10.0),
+ Row(-2147483648.0), Row(2147483647.0)))
+ }
+ override def afterAll {
+ sql("drop table if exists integer_table_01")
+ sql("drop table if exists integer_table_02")
+ sql("drop table if exists integer_table_03")
+ }
+}