You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/27 22:48:48 UTC
[3/4] carbondata git commit: [CARBONDATA-1552][Spark-2.2 Integration]
Spark-2.2 Carbon Integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 7c23e5e..1bb6ab0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -19,16 +19,19 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, InsertIntoCarbonTable, SparkSession}
+import org.apache.spark.sql._
+import org.apache.spark.sql.CarbonExpressions.CarbonSubqueryAlias
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, PredicateSubquery, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.CarbonExpressions.MatchCast
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
@@ -100,7 +103,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// subquery
case Aggregate(groupingExp,
aggregateExp,
- SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
// only carbon query plan is supported checking whether logical relation is
// is for carbon
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
@@ -120,7 +123,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// filter expression
case Aggregate(groupingExp, aggregateExp,
Filter(filterExp,
- SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
+ CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
// only carbon query plan is supported checking whether logical relation is
// is for carbon
if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
@@ -134,7 +137,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
tableName,
list)
// TODO need to handle filter predicate subquery scenario
- isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
+ // isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
// getting the columns from filter expression
if(isValidPlan) {
filterExp.transform {
@@ -187,19 +190,19 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// if it doesnot match with any pre aggregate table return the same plan
if (!selectedDataMapSchemas.isEmpty) {
// sort the selected child schema based on size to select smallest pre aggregate table
- val (aggDataMapSchema, carbonRelation) =
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val (aggDataMapSchema, carbonRelation, relation) =
selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
- val catalog = sparkSession.sessionState.catalog
- val carbonRelation = catalog
- .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
- .getTableName,
- Some(selectedDataMapSchema.getRelationIdentifier
- .getDatabaseName))).asInstanceOf[SubqueryAlias].child
- .asInstanceOf[LogicalRelation]
- (selectedDataMapSchema, carbonRelation)
- }.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
+ val identifier = TableIdentifier(
+ selectedDataMapSchema.getRelationIdentifier.getTableName,
+ Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
+ val carbonRelation =
+ catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val relation = sparkSession.sessionState.catalog.lookupRelation(identifier)
+ (selectedDataMapSchema, carbonRelation, relation)
+ }.minBy(f => f._2.sizeInBytes)
// transform the query plan based on selected child schema
- transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation)
+ transformPreAggQueryPlan(plan, aggDataMapSchema, relation)
} else {
plan
}
@@ -217,7 +220,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* child schema
* @param attributeReference
* parent attribute reference
- * @param childCarbonRelation
+ * @param attributes
* child logical relation
* @param aggFunction
* aggregation function applied on child
@@ -225,7 +228,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
*/
def getChildAttributeReference(dataMapSchema: DataMapSchema,
attributeReference: AttributeReference,
- childCarbonRelation: LogicalRelation,
+ attributes: Seq[AttributeReference],
aggFunction: String = ""): AttributeReference = {
val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
val columnSchema = if (aggFunction.isEmpty) {
@@ -240,7 +243,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
throw new AnalysisException("Column does not exists in Pre Aggregate table")
}
// finding the child attribute from child logical relation
- childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
+ attributes.find(p => p.name.equals(columnSchema.getColumnName)).get
}
/**
@@ -268,14 +271,16 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* parent logical plan
* @param aggDataMapSchema
* select data map schema
- * @param childCarbonRelation
+ * @param childPlan
* child carbon table relation
* @return transformed plan
*/
def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
- aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan = {
+ aggDataMapSchema: DataMapSchema,
+ childPlan: LogicalPlan): LogicalPlan = {
+ val attributes = childPlan.output.asInstanceOf[Seq[AttributeReference]]
logicalPlan.transform {
- case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _))
+ case Aggregate(grExp, aggExp, child@CarbonSubqueryAlias(_, l: LogicalRelation))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, None) =
@@ -284,13 +289,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
child,
None,
aggDataMapSchema,
- childCarbonRelation)
+ attributes,
+ childPlan)
Aggregate(updatedGroupExp,
updatedAggExp,
newChild)
case Aggregate(grExp,
aggExp,
- Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _)))
+ Filter(expression, child@CarbonSubqueryAlias(_, l: LogicalRelation)))
if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
@@ -299,7 +305,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
child,
Some(expression),
aggDataMapSchema,
- childCarbonRelation)
+ attributes,
+ childPlan)
Aggregate(updatedGroupExp,
updatedAggExp,
Filter(updatedFilterExpression.get,
@@ -313,7 +320,8 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
l,
None,
aggDataMapSchema,
- childCarbonRelation)
+ attributes,
+ childPlan)
Aggregate(updatedGroupExp,
updatedAggExp,
newChild)
@@ -339,7 +347,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* filter expression
* @param aggDataMapSchema
* pre aggregate table schema
- * @param childCarbonRelation
+ * @param attributes
* pre aggregate table logical relation
* @return tuple of(updated grouping expression,
* updated aggregate expression,
@@ -350,13 +358,14 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
aggregateExpressions: Seq[NamedExpression],
child: LogicalPlan, filterExpression: Option[Expression] = None,
aggDataMapSchema: DataMapSchema,
- childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
+ attributes: Seq[AttributeReference],
+ aggPlan: LogicalPlan): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
Option[Expression]) = {
// transforming the group by expression attributes with child attributes
val updatedGroupExp = groupingExpressions.map { exp =>
exp.transform {
case attr: AttributeReference =>
- getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+ getChildAttributeReference(aggDataMapSchema, attr, attributes)
}
}
// below code is for updating the aggregate expression.
@@ -379,7 +388,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case attr: AttributeReference =>
val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
attr,
- childCarbonRelation)
+ attributes)
// returning the alias to show proper column name in output
Alias(childAttributeReference,
attr.name)(NamedExpression.newExprId,
@@ -388,7 +397,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case Alias(attr: AttributeReference, name) =>
val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
attr,
- childCarbonRelation)
+ attributes)
// returning alias with child attribute reference
Alias(childAttributeReference,
name)(NamedExpression.newExprId,
@@ -398,7 +407,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// get the updated aggregate aggregate function
val aggExp = getUpdatedAggregateExpressionForChild(attr,
aggDataMapSchema,
- childCarbonRelation)
+ attributes)
// returning alias with child attribute reference
Alias(aggExp,
name)(NamedExpression.newExprId,
@@ -407,16 +416,19 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// transformaing the logical relation
val newChild = child.transform {
case _: LogicalRelation =>
- childCarbonRelation
+ aggPlan
case _: SubqueryAlias =>
- childCarbonRelation
+ aggPlan match {
+ case s: SubqueryAlias => s.child
+ case others => others
+ }
}
// updating the filter expression if present
val updatedFilterExpression = if (filterExpression.isDefined) {
val filterExp = filterExpression.get
Some(filterExp.transform {
case attr: AttributeReference =>
- getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+ getChildAttributeReference(aggDataMapSchema, attr, attributes)
})
} else {
None
@@ -441,13 +453,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
* aggregate expression
* @param dataMapSchema
* child data map schema
- * @param childCarbonRelation
+ * @param attributes
* child logical relation
* @return updated expression
*/
def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
dataMapSchema: DataMapSchema,
- childCarbonRelation: LogicalRelation):
+ attributes: Seq[AttributeReference]):
Expression = {
aggExp.aggregateFunction match {
// Change the count AggregateExpression to Sum as count
@@ -456,7 +468,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case count@Count(Seq(attr: AttributeReference)) =>
AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
count.prettyName),
LongType)),
aggExp.mode,
@@ -464,44 +476,44 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case sum@Sum(attr: AttributeReference) =>
AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
sum.prettyName)),
aggExp.mode,
isDistinct = false)
case max@Max(attr: AttributeReference) =>
AggregateExpression(Max(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
max.prettyName)),
aggExp.mode,
isDistinct = false)
case min@Min(attr: AttributeReference) =>
AggregateExpression(Min(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
min.prettyName)),
aggExp.mode,
isDistinct = false)
- case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
sum.prettyName),
changeDataType)),
aggExp.mode,
isDistinct = false)
- case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
AggregateExpression(Min(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
min.prettyName),
changeDataType)),
aggExp.mode,
isDistinct = false)
- case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
AggregateExpression(Max(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
max.prettyName),
changeDataType)),
aggExp.mode,
@@ -513,13 +525,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
case Average(attr: AttributeReference) =>
Divide(AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
"sum")),
aggExp.mode,
isDistinct = false),
AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
"count"),
LongType)),
aggExp.mode,
@@ -527,17 +539,17 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
// In case of average aggregate function select 2 columns from aggregate table
// with aggregation sum and count.
// Then add divide(sum(column with sum), sum(column with count)).
- case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
Divide(AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
"sum"),
changeDataType)),
aggExp.mode,
isDistinct = false),
AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
attr,
- childCarbonRelation,
+ attributes,
"count"),
LongType)),
aggExp.mode,
@@ -632,7 +644,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
carbonTable,
tableName,
sum.prettyName))
- case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case sum@Sum(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
Seq(getQueryColumn(attr.name,
carbonTable,
tableName,
@@ -649,7 +661,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
carbonTable,
tableName,
min.prettyName))
- case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case min@Min(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
Seq(getQueryColumn(attr.name,
carbonTable,
tableName,
@@ -661,7 +673,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
carbonTable,
tableName,
max.prettyName))
- case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case max@Max(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
Seq(getQueryColumn(attr.name,
carbonTable,
tableName,
@@ -682,7 +694,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
))
// in case of average need to return two columns
// sum and count of the column to added during table creation to support rollup
- case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ case Average(MatchCast(attr: AttributeReference, changeDataType: DataType)) =>
Seq(getQueryColumn(attr.name,
carbonTable,
tableName,
@@ -742,7 +754,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
/**
* Insert into carbon table from other source
*/
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
plan.transform {
// Wait until children are resolved.
@@ -781,12 +793,22 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
case attr => attr
}
}
+ val version = sparkSession.version
val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
- p.child
+ if (version.startsWith("2.1")) {
+ CarbonReflectionUtils.getField("child", p).asInstanceOf[LogicalPlan]
+ } else if (version.startsWith("2.2")) {
+ CarbonReflectionUtils.getField("query", p).asInstanceOf[LogicalPlan]
+ } else {
+ throw new UnsupportedOperationException(s"Spark version $version is not supported")
+ }
} else {
Project(newChildOutput, childPlan)
}
- InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
+
+ val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
+
+ InsertIntoCarbonTable(relation, p.partition, newChild, overwrite, true)
} else {
CarbonException.analysisException(
"Cannot insert into target table because number of columns mismatch")
@@ -812,7 +834,7 @@ object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
Alias(attrExpression
.copy(aggregateFunction = Count(attr),
resultId = NamedExpression.newExprId), attr.name + "_count")())
- case Average(cast@Cast(attr: AttributeReference, _)) =>
+ case Average(cast@MatchCast(attr: AttributeReference, _)) =>
Seq(Alias(attrExpression
.copy(aggregateFunction = Sum(cast),
resultId = NamedExpression.newExprId),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 69b8d50..aadce98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -171,7 +171,13 @@ case class CarbonRelation(
}
// TODO: Use data from the footers.
- override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+ // TODO For 2.1
+ // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
+ // Todo for 2.2
+ // override def computeStats(conf: SQLConf): Statistics = Statistics(sizeInBytes =
+ // this.sizeInBytes)
+
+ // override lazy val statistics = Statistics(sizeInBytes = this.sizeInBytes)
override def equals(other: Any): Boolean = {
other match {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
deleted file mode 100644
index b0aecd7..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
-import org.apache.spark.sql.execution.command.preaaggregate._
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
-import org.apache.spark.sql.parser.CarbonSparkSqlParser
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events._
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonSessionCatalog(
- externalCatalog: HiveExternalCatalog,
- globalTempViewManager: GlobalTempViewManager,
- sparkSession: SparkSession,
- functionResourceLoader: FunctionResourceLoader,
- functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hadoopConf: Configuration)
- extends HiveSessionCatalog(
- externalCatalog,
- globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
- conf,
- hadoopConf) {
-
- lazy val carbonEnv = {
- val env = new CarbonEnv
- env.init(sparkSession)
- env
- }
-
- /**
- * This method will invalidate carbonrelation from cache if carbon table is updated in
- * carbon catalog
- *
- * @param name
- * @param alias
- * @return
- */
- override def lookupRelation(name: TableIdentifier,
- alias: Option[String]): LogicalPlan = {
- val rtnRelation = super.lookupRelation(name, alias)
- var toRefreshRelation = false
- rtnRelation match {
- case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
- _) =>
- toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
- case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
- toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
- case _ =>
- }
-
- if (toRefreshRelation) {
- super.lookupRelation(name, alias)
- } else {
- rtnRelation
- }
- }
-
- private def refreshRelationFromCache(identifier: TableIdentifier,
- alias: Option[String],
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
- var isRefreshed = false
- val storePath = CarbonProperties.getStorePath
- carbonEnv.carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
-
- val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
- carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
- carbonDatasourceHadoopRelation.carbonTable.getTableName)
- if (table.isEmpty || (table.isDefined &&
- table.get.getTableLastUpdatedTime !=
- carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
- refreshTable(identifier)
- DataMapStoreManager.getInstance().
- clearDataMaps(AbsoluteTableIdentifier.from(storePath,
- identifier.database.getOrElse("default"), identifier.table))
- isRefreshed = true
- logInfo(s"Schema changes have been detected for table: $identifier")
- }
- isRefreshed
- }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
- experimentalMethods.extraStrategies =
- Seq(
- new StreamingTableStrategy(sparkSession),
- new CarbonLateDecodeStrategy,
- new DDLStrategy(sparkSession)
- )
- experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
-
- override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
- override lazy val analyzer: Analyzer = {
- new Analyzer(catalog, conf) {
- override val extendedResolutionRules =
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
- CarbonPreInsertionCasts ::
- CarbonPreAggregateQueryRules(sparkSession) ::
- CarbonIUDAnalysisRule(sparkSession) ::
- AnalyzeCreateTable(sparkSession) ::
- PreprocessTableInsertion(conf) ::
- DataSourceAnalysis(conf) ::
- (if (conf.runSQLonFile) {
- new ResolveDataSource(sparkSession) :: Nil
- } else { Nil }
- )
-
- override val extendedCheckRules = Seq(
- PreWriteCheck(conf, catalog))
- }
- }
-
- /**
- * Internal catalog for managing table and database states.
- */
- override lazy val catalog = {
- new CarbonSessionCatalog(
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
- sparkSession.sharedState.globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
- conf,
- newHadoopConf())
- }
-}
-
-class CarbonOptimizer(
- catalog: SessionCatalog,
- conf: SQLConf,
- experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
- // optimize whole plan at once.
- val transFormedPlan = plan.transform {
- case filter: Filter =>
- filter.transformExpressions {
- case s: ScalarSubquery =>
- val tPlan = s.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- ScalarSubquery(tPlan, s.children, s.exprId)
- case p: PredicateSubquery =>
- val tPlan = p.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
- }
- }
- super.execute(transFormedPlan)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
deleted file mode 100644
index 6c91e7e..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.internal
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
- val carbonProperties = CarbonProperties.getInstance()
-
- /**
- * To initialize dynamic param defaults along with usage docs
- */
- def addDefaultCarbonParams(): Unit = {
- val ENABLE_UNSAFE_SORT =
- SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
- .doc("To enable/ disable unsafe sort.")
- .booleanConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
- SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
- .doc("To enable/ disable carbon custom block distribution.")
- .booleanConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
- val BAD_RECORDS_LOGGER_ENABLE =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
- .doc("To enable/ disable carbon bad record logger.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants
- .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- val BAD_RECORDS_ACTION =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
- .doc("To configure the bad records action.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- val IS_EMPTY_DATA_BAD_RECORD =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
- .doc("Property to decide weather empty data to be considered bad/ good record.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
- .toBoolean)
- val SORT_SCOPE =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
- .doc("Property to specify sort scope.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- val BATCH_SORT_SIZE_INMB =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
- .doc("Property to specify batch sort size in MB.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- val SINGLE_PASS =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
- .doc("Property to enable/disable single_pass.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- val BAD_RECORD_PATH =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
- .doc("Property to configure the bad record location.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- val GLOBAL_SORT_PARTITIONS =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
- .doc("Property to configure the global sort partitions.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- val DATEFORMAT =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
- .doc("Property to configure data format for date type columns.")
- .stringConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
- "carbon.input.segments.<database_name>.<table_name>")
- .doc("Property to configure the list of segments to query.").stringConf
- .createWithDefault(carbonProperties
- .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
- }
-
- /**
- * to set the dynamic properties default values
- */
- def addDefaultCarbonSessionParams(): Unit = {
- sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index dbc807d..75cc128 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.sources
import org.apache.spark.sql.types._
import org.apache.spark.sql.CarbonContainsWith
import org.apache.spark.sql.CarbonEndsWith
+import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -42,7 +43,6 @@ import org.apache.carbondata.spark.util.CarbonScalaUtil
*/
object CarbonFilters {
-
/**
* Converts data sources filters to carbon filter predicates.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 0187489..ee2c422 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -20,10 +20,9 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
import scala.language.implicitConversions
-import org.apache.spark.sql.{AnalysisException, DeleteRecords, ShowLoadsCommand, UpdateTable}
+import org.apache.spark.sql.{AnalysisException, DeleteRecords, ShowLoadsCommand, SparkSession, UpdateTable}
import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.datamap.{CarbonCreateDataMapCommand, CarbonDataMapShowCommand, CarbonDropDataMapCommand}
@@ -31,7 +30,10 @@ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCo
import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
import org.apache.spark.sql.types.StructField
+import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
@@ -164,15 +166,27 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
protected lazy val deleteRecords: Parser[LogicalPlan] =
- (DELETE ~> FROM ~> table) ~ restInput.? <~ opt(";") ^^ {
+ (DELETE ~> FROM ~> aliasTable) ~ restInput.? <~ opt(";") ^^ {
case table ~ rest =>
- val tableName = getTableName(table.tableIdentifier)
- val alias = table.alias.getOrElse("")
- DeleteRecords("select tupleId from " + tableName + " " + alias + rest.getOrElse(""), table)
+ val tableName = getTableName(table._2)
+ val relation: LogicalPlan = table._3 match {
+ case Some(a) =>
+ DeleteRecords(
+ "select tupleId from " + tableName + " " + table._3.getOrElse("")
+ + rest.getOrElse(""),
+ Some(table._3.get),
+ table._1)
+ case None =>
+ DeleteRecords(
+ "select tupleId from " + tableName + " " + rest.getOrElse(""),
+ None,
+ table._1)
+ }
+ relation
}
protected lazy val updateTable: Parser[LogicalPlan] =
- UPDATE ~> table ~
+ UPDATE ~> aliasTable ~
(SET ~> "(" ~> repsep(element, ",") <~ ")") ~
("=" ~> restInput) <~ opt(";") ^^ {
case tab ~ columns ~ rest =>
@@ -184,31 +198,58 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
// only list of expression are given, need to convert that list of expressions into
// select statement on destination table
- val relation = tab match {
- case r@UnresolvedRelation(tableIdentifier, alias) =>
- updateRelation(r, tableIdentifier, alias)
- case _ => tab
+ val relation : UnresolvedRelation = tab._1 match {
+ case r@CarbonUnresolvedRelation(tableIdentifier) =>
+ tab._3 match {
+ case Some(a) => (updateRelation(r, tableIdentifier, tab._4, Some(tab._3.get)))
+ case None => (updateRelation(r, tableIdentifier, tab._4, None))
+ }
+ case _ => tab._1
+ }
+
+ tab._3 match {
+ case Some(a) =>
+ ("select " + sel + " from " + getTableName(tab._2) + " " + tab._3.get, relation)
+ case None =>
+ ("select " + sel + " from " + getTableName(tab._2), relation)
}
- ("select " + sel + " from " + getTableName(relation.tableIdentifier) + " " +
- relation.alias.get, relation)
+
} else {
- (sel, updateRelation(tab, tab.tableIdentifier, tab.alias))
+ (sel, updateRelation(tab._1, tab._2, tab._4, Some(tab._3.get)))
}
- UpdateTable(relation, columns, selectStmt, where)
+ val rel = tab._3 match {
+ case Some(a) => UpdateTable(relation, columns, selectStmt, Some(tab._3.get), where)
+ case None => UpdateTable(relation,
+ columns,
+ selectStmt,
+ Some(tab._1.tableIdentifier.table),
+ where)
+ }
+ rel
}
+
+
private def updateRelation(
r: UnresolvedRelation,
- tableIdentifier: Seq[String],
+ tableIdent: Seq[String],
+ tableIdentifier: TableIdentifier,
alias: Option[String]): UnresolvedRelation = {
alias match {
case Some(_) => r
case _ =>
- val tableAlias = tableIdentifier match {
+ val tableAlias = tableIdent match {
case Seq(dbName, tableName) => Some(tableName)
case Seq(tableName) => Some(tableName)
}
- UnresolvedRelation(tableIdentifier, tableAlias)
+ // Use Reflection to choose between Spark2.1 and Spark2.2
+ // Move UnresolvedRelation(tableIdentifier, tableAlias) to reflection.
+ val unresolvedrelation =
+ CarbonReflectionUtils.getUnresolvedRelation(
+ tableIdentifier,
+ SparkSession.getActiveSession.get.version,
+ tableAlias)
+ unresolvedrelation
}
}
@@ -219,7 +260,26 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val table: Parser[UnresolvedRelation] = {
rep1sep(attributeName, ".") ~ opt(ident) ^^ {
- case tableIdent ~ alias => UnresolvedRelation(tableIdent, alias)
+ case tableIdent ~ alias => UnresolvedRelation(tableIdent)
+ }
+ }
+
+ protected lazy val aliasTable: Parser[(UnresolvedRelation, List[String], Option[String],
+ TableIdentifier)] = {
+ rep1sep(attributeName, ".") ~ opt(ident) ^^ {
+ case tableIdent ~ alias =>
+
+ val tableIdentifier: TableIdentifier = toTableIdentifier(tableIdent)
+
+ // Use Reflection to choose between Spark2.1 and Spark2.2
+ // Move (UnresolvedRelation(tableIdent, alias), tableIdent, alias) to reflection.
+ val unresolvedRelation =
+ CarbonReflectionUtils.getUnresolvedRelation(
+ tableIdentifier,
+ SparkSession.getActiveSession.get.version,
+ alias)
+
+ (unresolvedRelation, tableIdent, alias, tableIdentifier)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 8e53927..ec20c49 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,17 +18,17 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
-import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CarbonCreateTableCommand, PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{CarbonCreateTableCommand, PartitionerField, TableModel}
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -40,7 +40,8 @@ import org.apache.carbondata.spark.util.CommonUtil
*/
class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
- val astBuilder = new CarbonSqlAstBuilder(conf)
+ val parser = new CarbonSpark2SqlParser
+ val astBuilder = CarbonReflectionUtils.getAstBuilder(conf, parser, sparkSession)
private val substitutor = new VariableSubstitution(conf)
@@ -53,7 +54,7 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
throw ce
case ex =>
try {
- astBuilder.parser.parse(sqlText)
+ parser.parse(sqlText)
} catch {
case mce: MalformedCarbonCommandException =>
throw mce
@@ -73,13 +74,11 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
}
}
-class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
-
- val parser = new CarbonSpark2SqlParser
+class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
+ extends SparkSqlAstBuilder(conf) {
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- Option(ctx.query()).map(plan)
- val fileStorage = Option(ctx.createFileFormat) match {
+ def getFileStorage(createFileFormat: CreateFileFormatContext): String = {
+ Option(createFileFormat) match {
case Some(value) =>
if (value.children.get(1).getText.equalsIgnoreCase("by")) {
value.storageHandler().STRING().getSymbol.getText
@@ -89,63 +88,6 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
}
case _ => ""
}
- if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- val (name, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
- // TODO: implement temporary tables
- if (temp) {
- throw new ParseException(
- "CREATE TEMPORARY TABLE is not supported yet. " +
- "Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
- }
- if (ctx.skewSpec != null) {
- operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
- }
- if (ctx.bucketSpec != null) {
- operationNotAllowed("CREATE TABLE ... CLUSTERED BY", ctx)
- }
-
- // validate schema
- val (colsStructFields, colNames) = validateSchema(ctx, name)
-
- val tableProperties = mutable.Map[String, String]()
- val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues)
- .getOrElse(Map.empty)
- properties.foreach{property => tableProperties.put(property._1, property._2)}
-
- val options = new CarbonOption(properties)
-
- // validate streaming table property
- validateStreamingProperty(ctx, options)
-
- // validate partition clause
- val (partitionByStructFields, partitionFields) =
- validateParitionFields(ctx, colNames, tableProperties)
-
- val fields = parser.getFields(colsStructFields ++ partitionByStructFields)
-
- // validate bucket fields
- val bucketFields: Option[BucketFields] =
- parser.getBucketFields(tableProperties, fields, options)
-
- val tableComment = Option(ctx.STRING()).map(string)
-
- // prepare table model of the collected tokens
- val tableModel: TableModel = parser.prepareTableModel(
- ifNotExists,
- convertDbNameToLowerCase(name.database),
- name.table.toLowerCase,
- fields,
- partitionFields,
- tableProperties,
- bucketFields,
- isAlterFlow = false,
- tableComment)
-
- CarbonCreateTableCommand(tableModel)
- } else {
- super.visitCreateTable(ctx)
- }
}
/**
@@ -154,17 +96,24 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
* @param dbName
* @return Option of String
*/
- protected def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
+ def convertDbNameToLowerCase(dbName: Option[String]): Option[String] = {
dbName match {
case Some(databaseName) => Some(databaseName.toLowerCase)
case None => dbName
}
}
+
+
+ def needToConvertToLowerCase(key: String): Boolean = {
+ val noConvertList = Array("LIST_INFO", "RANGE_INFO")
+ !noConvertList.exists(x => x.equalsIgnoreCase(key));
+ }
+
/**
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
*/
- private def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
+ def visitPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String] = {
val props = visitTablePropertyList(ctx)
val badKeys = props.filter { case (_, v) => v == null }.keys
if (badKeys.nonEmpty) {
@@ -180,52 +129,93 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
}
}
- private def needToConvertToLowerCase(key: String): Boolean = {
- val noConvertList = Array("LIST_INFO", "RANGE_INFO")
- !noConvertList.exists(x => x.equalsIgnoreCase(key))
+ def getPropertyKeyValues(ctx: TablePropertyListContext): Map[String, String]
+ = {
+ Option(ctx).map(visitPropertyKeyValues)
+ .getOrElse(Map.empty)
}
- private def validateParitionFields(
- ctx: CreateTableContext,
- colNames: Seq[String],
- tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
- val partitionByStructFields = Option(ctx.partitionColumns).toSeq.flatMap(visitColTypeList)
- val partitionerFields = partitionByStructFields.map { structField =>
- PartitionerField(structField.name, Some(structField.dataType.toString), null)
+ def createCarbonTable(tableHeader: CreateTableHeaderContext,
+ skewSpecContext: SkewSpecContext,
+ bucketSpecContext: BucketSpecContext,
+ partitionColumns: ColTypeListContext,
+ columns : ColTypeListContext,
+ tablePropertyList : TablePropertyListContext,
+ tableComment : Option[String]) : LogicalPlan = {
+ // val parser = new CarbonSpark2SqlParser
+
+ val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
+ // TODO: implement temporary tables
+ if (temp) {
+ throw new ParseException(
+ "CREATE TEMPORARY TABLE is not supported yet. " +
+ "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
}
- if (partitionerFields.nonEmpty) {
- if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
- throw new MalformedCarbonCommandException("Error: Invalid partition definition")
- }
- // partition columns should not be part of the schema
- val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
- if (badPartCols.nonEmpty) {
- operationNotAllowed(s"Partition columns should not be specified in the schema: " +
- badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
- }
+ if (skewSpecContext != null) {
+ operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
+ }
+ if (bucketSpecContext != null) {
+ operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
}
- (partitionByStructFields, partitionerFields)
- }
- private def validateSchema(
- ctx: CreateTableContext,
- name: TableIdentifier): (Seq[StructField], Seq[String]) = {
- // Validate schema, ensuring whether no duplicate name is used in table definition
- val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+
+ val cols = Option(columns).toSeq.flatMap(visitColTypeList)
+ val properties = getPropertyKeyValues(tablePropertyList)
+
+ // Ensuring whether no duplicate name is used in table definition
val colNames = cols.map(_.name)
if (colNames.length != colNames.distinct.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}
operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
- duplicateColumns.mkString("[", ",", "]"), ctx)
+ duplicateColumns.mkString("[", ",", "]"), columns)
}
- (cols, colNames)
+
+
+
+ val tableProperties = mutable.Map[String, String]()
+ properties.foreach{property => tableProperties.put(property._1, property._2)}
+
+ // validate partition clause
+ val (partitionByStructFields, partitionFields) =
+ validateParitionFields(partitionColumns, colNames, tableProperties)
+
+ // validate partition clause
+ if (partitionFields.nonEmpty) {
+ if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) {
+ throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+ }
+ // partition columns should not be part of the schema
+ val badPartCols = partitionFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+ if (badPartCols.nonEmpty) {
+ operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"),
+ partitionColumns)
+ }
+ }
+ val fields = parser.getFields(cols ++ partitionByStructFields)
+ val options = new CarbonOption(properties)
+ // validate tblProperties
+ val bucketFields = parser.getBucketFields(tableProperties, fields, options)
+
+ validateStreamingProperty(options)
+
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
+ convertDbNameToLowerCase(name.database),
+ name.table.toLowerCase,
+ fields,
+ partitionFields,
+ tableProperties,
+ bucketFields,
+ isAlterFlow = false,
+ tableComment)
+
+ CarbonCreateTableCommand(tableModel)
}
- private def validateStreamingProperty(
- ctx: CreateTableContext,
- carbonOption: CarbonOption): Unit = {
+ private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
try {
carbonOption.isStreaming
} catch {
@@ -234,4 +224,34 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
"Table property 'streaming' should be either 'true' or 'false'")
}
}
+
+ private def validateParitionFields(
+ partitionColumns: ColTypeListContext,
+ colNames: Seq[String],
+ tableProperties: mutable.Map[String, String]): (Seq[StructField], Seq[PartitionerField]) = {
+ val partitionByStructFields = Option(partitionColumns).toSeq.flatMap(visitColTypeList)
+ val partitionerFields = partitionByStructFields.map { structField =>
+ PartitionerField(structField.name, Some(structField.dataType.toString), null)
+ }
+ if (partitionerFields.nonEmpty) {
+ if (!CommonUtil.validatePartitionColumns(tableProperties, partitionerFields)) {
+ throw new MalformedCarbonCommandException("Error: Invalid partition definition")
+ }
+ // partition columns should not be part of the schema
+ val badPartCols = partitionerFields.map(_.partitionColumn).toSet.intersect(colNames.toSet)
+ if (badPartCols.nonEmpty) {
+ operationNotAllowed(s"Partition columns should not be specified in the schema: " +
+ badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]")
+ , partitionColumns: ColTypeListContext)
+ }
+ }
+ (partitionByStructFields, partitionerFields)
+ }
+
+}
+
+trait CarbonAstTrait {
+ def getFileStorage (createFileFormat : CreateFileFormatContext): String
}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 07491d1..a1742de 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -23,9 +23,9 @@ import scala.collection.mutable.ListBuffer
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -130,8 +130,7 @@ object AlterTableUtil {
*/
def updateSchemaInfo(carbonTable: CarbonTable,
schemaEvolutionEntry: SchemaEvolutionEntry,
- thriftTable: TableInfo)(sparkSession: SparkSession,
- sessionState: CarbonSessionState): Unit = {
+ thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -145,8 +144,9 @@ object AlterTableUtil {
val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession).schema.json
val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
- sessionState.metadataHive.runSqlHive(
- s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
+ val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+ .asInstanceOf[HiveExternalCatalog].client
+ hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
}
@@ -329,7 +329,7 @@ object AlterTableUtil {
*/
def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String],
propKeys: Seq[String], set: Boolean)
- (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = {
+ (sparkSession: SparkSession): Unit = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName")
@@ -380,7 +380,7 @@ object AlterTableUtil {
}
updateSchemaInfo(carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- thriftTable)(sparkSession, sessionState)
+ thriftTable)(sparkSession)
LOGGER.info(s"Alter table comment is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table comment is successful for table $dbName.$tableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
new file mode 100644
index 0000000..837b21f
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+ val carbonProperties = CarbonProperties.getInstance()
+
+ /**
+ * To initialize dynamic param defaults along with usage docs
+ */
+ def addDefaultCarbonParams(): Unit = {
+ val ENABLE_UNSAFE_SORT =
+ SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+ .doc("To enable/ disable unsafe sort.")
+ .booleanConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+ val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+ SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+ .doc("To enable/ disable carbon custom block distribution.")
+ .booleanConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+ val BAD_RECORDS_LOGGER_ENABLE =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+ .doc("To enable/ disable carbon bad record logger.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants
+ .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+ val BAD_RECORDS_ACTION =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+ .doc("To configure the bad records action.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+ val IS_EMPTY_DATA_BAD_RECORD =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+ .doc("Property to decide weather empty data to be considered bad/ good record.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+ .toBoolean)
+ val SORT_SCOPE =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+ .doc("Property to specify sort scope.")
+ .stringConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ val BATCH_SORT_SIZE_INMB =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+ .doc("Property to specify batch sort size in MB.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+ val SINGLE_PASS =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+ .doc("Property to enable/disable single_pass.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+ val BAD_RECORD_PATH =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+ .doc("Property to configure the bad record location.")
+ .stringConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ val GLOBAL_SORT_PARTITIONS =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+ .doc("Property to configure the global sort partitions.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+ val DATEFORMAT =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+ .doc("Property to configure data format for date type columns.")
+ .stringConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+ val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+ "carbon.input.segments.<database_name>.<table_name>")
+ .doc("Property to configure the list of segments to query.").stringConf
+ .createWithDefault(carbonProperties
+ .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+ }
+ /**
+ * to set the dynamic properties default values
+ */
+ def addDefaultCarbonSessionParams(): Unit = {
+ sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+ sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+ carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c481485/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
new file mode 100644
index 0000000..eadae6a
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener}
+import org.apache.spark.sql.execution.command.preaaggregate._
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events._
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+ externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ sparkSession: SparkSession,
+ functionResourceLoader: FunctionResourceLoader,
+ functionRegistry: FunctionRegistry,
+ conf: SQLConf,
+ hadoopConf: Configuration)
+ extends HiveSessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ hadoopConf) {
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+ /**
+ * This method will invalidate carbonrelation from cache if carbon table is updated in
+ * carbon catalog
+ *
+ * @param name
+ * @param alias
+ * @return
+ */
+ override def lookupRelation(name: TableIdentifier,
+ alias: Option[String]): LogicalPlan = {
+ val rtnRelation = super.lookupRelation(name, alias)
+ var toRefreshRelation = false
+ rtnRelation match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
+ toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+ case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+ case _ =>
+ }
+
+ if (toRefreshRelation) {
+ super.lookupRelation(name, alias)
+ } else {
+ rtnRelation
+ }
+ }
+
+ private def refreshRelationFromCache(identifier: TableIdentifier,
+ alias: Option[String],
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
+ var isRefreshed = false
+ val storePath = CarbonProperties.getStorePath
+ carbonEnv.carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTables()
+
+ val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+ carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ carbonDatasourceHadoopRelation.carbonTable.getTableName)
+ if (table.isEmpty || (table.isDefined &&
+ table.get.getTableLastUpdatedTime !=
+ carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+ refreshTable(identifier)
+ DataMapStoreManager.getInstance().
+ clearDataMaps(AbsoluteTableIdentifier.from(storePath,
+ identifier.database.getOrElse("default"), identifier.table))
+ isRefreshed = true
+ logInfo(s"Schema changes have been detected for table: $identifier")
+ }
+ isRefreshed
+ }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+ experimentalMethods.extraStrategies =
+ Seq(
+ new StreamingTableStrategy(sparkSession),
+ new CarbonLateDecodeStrategy,
+ new DDLStrategy(sparkSession)
+ )
+ experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+
+ override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+ override lazy val analyzer: Analyzer = {
+ new Analyzer(catalog, conf) {
+ override val extendedResolutionRules =
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ CarbonPreInsertionCasts(sparkSession) ::
+ CarbonPreAggregateQueryRules(sparkSession) ::
+ CarbonIUDAnalysisRule(sparkSession) ::
+ AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableInsertion(conf) ::
+ DataSourceAnalysis(conf) ::
+ (if (conf.runSQLonFile) {
+ new ResolveDataSource(sparkSession) :: Nil
+ } else { Nil }
+ )
+
+ override val extendedCheckRules = Seq(
+ PreWriteCheck(conf, catalog))
+ }
+ }
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ override lazy val catalog = {
+ new CarbonSessionCatalog(
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ newHadoopConf())
+ }
+}
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+ // optimize whole plan at once.
+ val transFormedPlan = plan.transform {
+ case filter: Filter =>
+ filter.transformExpressions {
+ case s: ScalarSubquery =>
+ val tPlan = s.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ ScalarSubquery(tPlan, s.children, s.exprId)
+ case p: PredicateSubquery =>
+ val tPlan = p.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
+ }
+ }
+ super.execute(transFormedPlan)
+ }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
+ SparkSqlAstBuilder(conf) {
+
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ helper.createCarbonTable(ctx.createTableHeader,
+ ctx.skewSpec,
+ ctx.bucketSpec,
+ ctx.partitionColumns,
+ ctx.columns,
+ ctx.tablePropertyList,
+ Option(ctx.STRING()).map(string))
+ } else {
+ super.visitCreateTable(ctx)
+ }
+ }
+}