You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:27 UTC
[09/50] [abbrv] carbondata git commit: [CARBONDATA-1987] Make package
name and directory paths consistent;
remove duplicate file CarbonColumnValidator
[CARBONDATA-1987] Make package name and directory paths consistent;remove duplicate file CarbonColumnValidator
add coveralls token to spark-2.2 profile;synchronize file path and package name;Delete duplicate class CarbonColumnValidator present in spark2 module
This closes #1764
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4d3f3989
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4d3f3989
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4d3f3989
Branch: refs/heads/fgdatamap
Commit: 4d3f3989b5b3aef6ed44e3c67c4102bea4505013
Parents: 94011c3
Author: Raghunandan S <ca...@gmail.com>
Authored: Thu Jan 4 20:18:07 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Jan 31 08:19:22 2018 +0530
----------------------------------------------------------------------
.../spark/CarbonColumnValidator.scala | 36 --
.../src/main/spark2.1/CarbonSQLConf.scala | 149 -------
.../src/main/spark2.1/CarbonSessionState.scala | 339 ----------------
.../apache/spark/sql/hive/CarbonSQLConf.scala | 149 +++++++
.../spark/sql/hive/CarbonSessionState.scala | 339 ++++++++++++++++
.../src/main/spark2.2/CarbonSessionState.scala | 398 -------------------
.../src/main/spark2.2/CarbonSqlConf.scala | 148 -------
.../spark/sql/hive/CarbonSessionState.scala | 398 +++++++++++++++++++
.../apache/spark/sql/hive/CarbonSqlConf.scala | 148 +++++++
pom.xml | 8 +-
10 files changed, 1039 insertions(+), 1073 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
deleted file mode 100644
index 03c4764..0000000
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark
-
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
- /**
- * Carbon column validator
- */
-class CarbonColumnValidator extends ColumnValidator {
- def validateColumns(allColumns: Seq[ColumnSchema]): Unit = {
- allColumns.foreach { columnSchema =>
- val colWithSameId = allColumns.filter { x =>
- x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
- }
- if (colWithSameId.size > 1) {
- throw new MalformedCarbonCommandException("Two column can not have same columnId")
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
deleted file mode 100644
index 15ccb0c..0000000
--- a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
- val carbonProperties = CarbonProperties.getInstance()
-
- /**
- * To initialize dynamic param defaults along with usage docs
- */
- def addDefaultCarbonParams(): Unit = {
- val ENABLE_UNSAFE_SORT =
- SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
- .doc("To enable/ disable unsafe sort.")
- .booleanConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
- SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
- .doc("To set carbon task distribution.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- val BAD_RECORDS_LOGGER_ENABLE =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
- .doc("To enable/ disable carbon bad record logger.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants
- .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- val BAD_RECORDS_ACTION =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
- .doc("To configure the bad records action.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- val IS_EMPTY_DATA_BAD_RECORD =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
- .doc("Property to decide weather empty data to be considered bad/ good record.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
- .toBoolean)
- val SORT_SCOPE =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
- .doc("Property to specify sort scope.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- val BATCH_SORT_SIZE_INMB =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
- .doc("Property to specify batch sort size in MB.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- val SINGLE_PASS =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
- .doc("Property to enable/disable single_pass.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- val BAD_RECORD_PATH =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
- .doc("Property to configure the bad record location.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- val GLOBAL_SORT_PARTITIONS =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
- .doc("Property to configure the global sort partitions.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- val DATEFORMAT =
- SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
- .doc("Property to configure data format for date type columns.")
- .stringConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
- "carbon.input.segments.<database_name>.<table_name>")
- .doc("Property to configure the list of segments to query.").stringConf
- .createWithDefault(carbonProperties
- .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
- }
- /**
- * to set the dynamic properties default values
- */
- def addDefaultCarbonSessionParams(): Unit = {
- sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
deleted file mode 100644
index 0fe0f96..0000000
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ /dev/null
@@ -1,339 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonSessionCatalog(
- externalCatalog: HiveExternalCatalog,
- globalTempViewManager: GlobalTempViewManager,
- sparkSession: SparkSession,
- functionResourceLoader: FunctionResourceLoader,
- functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hadoopConf: Configuration)
- extends HiveSessionCatalog(
- externalCatalog,
- globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
- conf,
- hadoopConf) {
-
- lazy val carbonEnv = {
- val env = new CarbonEnv
- env.init(sparkSession)
- env
- }
-
- // Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
-
- /**
- * This method will invalidate carbonrelation from cache if carbon table is updated in
- * carbon catalog
- *
- * @param name
- * @param alias
- * @return
- */
- override def lookupRelation(name: TableIdentifier,
- alias: Option[String]): LogicalPlan = {
- val rtnRelation = super.lookupRelation(name, alias)
- var toRefreshRelation = false
- rtnRelation match {
- case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
- toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
- case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
- toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
- case _ =>
- }
-
- if (toRefreshRelation) {
- super.lookupRelation(name, alias)
- } else {
- rtnRelation
- }
- }
-
- private def refreshRelationFromCache(identifier: TableIdentifier,
- alias: Option[String],
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
- var isRefreshed = false
- val storePath = CarbonProperties.getStorePath
- carbonEnv.carbonMetastore.
- checkSchemasModifiedTimeAndReloadTable(identifier)
-
- val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
- carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
- carbonDatasourceHadoopRelation.carbonTable.getTableName)
- if (table.isEmpty || (table.isDefined &&
- table.get.getTableLastUpdatedTime !=
- carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
- refreshTable(identifier)
- DataMapStoreManager.getInstance().
- clearDataMaps(AbsoluteTableIdentifier.from(storePath,
- identifier.database.getOrElse("default"), identifier.table))
- isRefreshed = true
- logInfo(s"Schema changes have been detected for table: $identifier")
- }
- isRefreshed
- }
-
- /**
- * returns hive client from session state
- *
- * @return
- */
- def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
- }
-
- override def createPartitions(
- tableName: TableIdentifier,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = {
- try {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- // Get the properties from thread local
- val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
- if (carbonSessionInfo != null) {
- val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } else {
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- } catch {
- case e: Exception =>
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- }
-
- /**
- * This is alternate way of getting partition information. It first fetches all partitions from
- * hive and then apply filter instead of querying hive along with filters.
- * @param partitionFilters
- * @param sparkSession
- * @param identifier
- * @return
- */
- def getPartitionsAlternate(
- partitionFilters: Seq[Expression],
- sparkSession: SparkSession,
- identifier: TableIdentifier) = {
- val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
- val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
- val partitionSchema = catalogTable.partitionSchema
- if (partitionFilters.nonEmpty) {
- val boundPredicate =
- InterpretedPredicate.create(partitionFilters.reduce(And).transform {
- case att: AttributeReference =>
- val index = partitionSchema.indexWhere(_.name == att.name)
- BoundReference(index, partitionSchema(index).dataType, nullable = true)
- })
- allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
- } else {
- allPartitions
- }
- }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- * @param sparkSession
- */
-class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
-
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
- experimentalMethods.extraStrategies = extraStrategies
-
- experimentalMethods.extraOptimizations = extraOptimizations
-
- def extraStrategies: Seq[Strategy] = {
- Seq(
- new StreamingTableStrategy(sparkSession),
- new CarbonLateDecodeStrategy,
- new DDLStrategy(sparkSession)
- )
- }
-
- def extraOptimizations: Seq[Rule[LogicalPlan]] = {
- Seq(new CarbonIUDRule,
- new CarbonUDFTransformRule,
- new CarbonLateDecodeRule)
- }
-
- override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
- def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
- def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
- catalog.ParquetConversions ::
- catalog.OrcConversions ::
- CarbonPreInsertionCasts(sparkSession) ::
- CarbonIUDAnalysisRule(sparkSession) ::
- AnalyzeCreateTable(sparkSession) ::
- PreprocessTableInsertion(conf) ::
- DataSourceAnalysis(conf) ::
- (if (conf.runSQLonFile) {
- new ResolveDataSource(sparkSession) :: Nil
- } else { Nil })
- }
-
- override lazy val analyzer: Analyzer =
- new CarbonAnalyzer(catalog, conf, sparkSession,
- new Analyzer(catalog, conf) {
- override val extendedResolutionRules =
- if (extendedAnalyzerRules.nonEmpty) {
- extendedAnalyzerRules ++ internalAnalyzerRules
- } else {
- internalAnalyzerRules
- }
- override val extendedCheckRules = Seq(
- PreWriteCheck(conf, catalog))
- }
- )
-
- /**
- * Internal catalog for managing table and database states.
- */
- override lazy val catalog = {
- new CarbonSessionCatalog(
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
- sparkSession.sharedState.globalTempViewManager,
- sparkSession,
- functionResourceLoader,
- functionRegistry,
- conf,
- newHadoopConf())
- }
-}
-
-class CarbonAnalyzer(catalog: SessionCatalog,
- conf: CatalystConf,
- sparkSession: SparkSession,
- analyzer: Analyzer) extends Analyzer(catalog, conf) {
- override def execute(plan: LogicalPlan): LogicalPlan = {
- var logicalPlan = analyzer.execute(plan)
- logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
- CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
- }
-}
-
-class CarbonOptimizer(
- catalog: SessionCatalog,
- conf: SQLConf,
- experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
- super.execute(transFormedPlan)
- }
-}
-
-object CarbonOptimizerUtil {
- def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
- // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
- // And optimize whole plan at once.
- val transFormedPlan = plan.transform {
- case filter: Filter =>
- filter.transformExpressions {
- case s: ScalarSubquery =>
- val tPlan = s.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- ScalarSubquery(tPlan, s.children, s.exprId)
- case p: PredicateSubquery =>
- val tPlan = p.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
- }
- }
- transFormedPlan
- }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
- extends SparkSqlAstBuilder(conf) {
-
- val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- val fileStorage = helper.getFileStorage(ctx.createFileFormat)
-
- if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- helper.createCarbonTable(
- tableHeader = ctx.createTableHeader,
- skewSpecContext = ctx.skewSpec,
- bucketSpecContext = ctx.bucketSpec,
- partitionColumns = ctx.partitionColumns,
- columns = ctx.columns,
- tablePropertyList = ctx.tablePropertyList,
- locationSpecContext = ctx.locationSpec(),
- tableComment = Option(ctx.STRING()).map(string),
- ctas = ctx.AS,
- query = ctx.query)
- } else {
- super.visitCreateTable(ctx)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
new file mode 100644
index 0000000..15ccb0c
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+ val carbonProperties = CarbonProperties.getInstance()
+
+ /**
+ * To initialize dynamic param defaults along with usage docs
+ */
+ def addDefaultCarbonParams(): Unit = {
+ val ENABLE_UNSAFE_SORT =
+ SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+ .doc("To enable/ disable unsafe sort.")
+ .booleanConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+ val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+ SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+ .doc("To set carbon task distribution.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+ val BAD_RECORDS_LOGGER_ENABLE =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+ .doc("To enable/ disable carbon bad record logger.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants
+ .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+ val BAD_RECORDS_ACTION =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+ .doc("To configure the bad records action.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+ val IS_EMPTY_DATA_BAD_RECORD =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+ .doc("Property to decide weather empty data to be considered bad/ good record.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+ .toBoolean)
+ val SORT_SCOPE =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+ .doc("Property to specify sort scope.")
+ .stringConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ val BATCH_SORT_SIZE_INMB =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+ .doc("Property to specify batch sort size in MB.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+ val SINGLE_PASS =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+ .doc("Property to enable/disable single_pass.")
+ .booleanConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+ val BAD_RECORD_PATH =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+ .doc("Property to configure the bad record location.")
+ .stringConf
+ .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ val GLOBAL_SORT_PARTITIONS =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+ .doc("Property to configure the global sort partitions.")
+ .stringConf
+ .createWithDefault(carbonProperties
+ .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+ val DATEFORMAT =
+ SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+ .doc("Property to configure data format for date type columns.")
+ .stringConf
+ .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+ val CARBON_INPUT_SEGMENTS = SQLConfigBuilder(
+ "carbon.input.segments.<database_name>.<table_name>")
+ .doc("Property to configure the list of segments to query.").stringConf
+ .createWithDefault(carbonProperties
+ .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
+ }
+ /**
+ * to set the dynamic properties default values
+ */
+ def addDefaultCarbonSessionParams(): Unit = {
+ sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+ sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+ carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+ carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+ CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+ sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..0fe0f96
--- /dev/null
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.ParserUtils._
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy}
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+ externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ sparkSession: SparkSession,
+ functionResourceLoader: FunctionResourceLoader,
+ functionRegistry: FunctionRegistry,
+ conf: SQLConf,
+ hadoopConf: Configuration)
+ extends HiveSessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ hadoopConf) {
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+ // Initialize all listeners to the Operation bus.
+ CarbonEnv.initListeners()
+
+ /**
+ * This method will invalidate carbonrelation from cache if carbon table is updated in
+ * carbon catalog
+ *
+ * @param name
+ * @param alias
+ * @return
+ */
+ override def lookupRelation(name: TableIdentifier,
+ alias: Option[String]): LogicalPlan = {
+ val rtnRelation = super.lookupRelation(name, alias)
+ var toRefreshRelation = false
+ rtnRelation match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) =>
+ toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+ case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+ case _ =>
+ }
+
+ if (toRefreshRelation) {
+ super.lookupRelation(name, alias)
+ } else {
+ rtnRelation
+ }
+ }
+
+ private def refreshRelationFromCache(identifier: TableIdentifier,
+ alias: Option[String],
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
+ var isRefreshed = false
+ val storePath = CarbonProperties.getStorePath
+ carbonEnv.carbonMetastore.
+ checkSchemasModifiedTimeAndReloadTable(identifier)
+
+ val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
+ carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ carbonDatasourceHadoopRelation.carbonTable.getTableName)
+ if (table.isEmpty || (table.isDefined &&
+ table.get.getTableLastUpdatedTime !=
+ carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+ refreshTable(identifier)
+ DataMapStoreManager.getInstance().
+ clearDataMaps(AbsoluteTableIdentifier.from(storePath,
+ identifier.database.getOrElse("default"), identifier.table))
+ isRefreshed = true
+ logInfo(s"Schema changes have been detected for table: $identifier")
+ }
+ isRefreshed
+ }
+
+ /**
+ * returns hive client from session state
+ *
+ * @return
+ */
+ def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ }
+
+ override def createPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ try {
+ val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ // Get the properties from thread local
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
+ } else {
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ } catch {
+ case e: Exception =>
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ }
+
+ /**
+ * This is alternate way of getting partition information. It first fetches all partitions from
+ * hive and then apply filter instead of querying hive along with filters.
+ * @param partitionFilters
+ * @param sparkSession
+ * @param identifier
+ * @return
+ */
+ def getPartitionsAlternate(
+ partitionFilters: Seq[Expression],
+ sparkSession: SparkSession,
+ identifier: TableIdentifier) = {
+ val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
+ val partitionSchema = catalogTable.partitionSchema
+ if (partitionFilters.nonEmpty) {
+ val boundPredicate =
+ InterpretedPredicate.create(partitionFilters.reduce(And).transform {
+ case att: AttributeReference =>
+ val index = partitionSchema.indexWhere(_.name == att.name)
+ BoundReference(index, partitionSchema(index).dataType, nullable = true)
+ })
+ allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
+ } else {
+ allPartitions
+ }
+ }
+}
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ * @param sparkSession
+ */
+class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+ experimentalMethods.extraStrategies = extraStrategies
+
+ experimentalMethods.extraOptimizations = extraOptimizations
+
+ def extraStrategies: Seq[Strategy] = {
+ Seq(
+ new StreamingTableStrategy(sparkSession),
+ new CarbonLateDecodeStrategy,
+ new DDLStrategy(sparkSession)
+ )
+ }
+
+ def extraOptimizations: Seq[Rule[LogicalPlan]] = {
+ Seq(new CarbonIUDRule,
+ new CarbonUDFTransformRule,
+ new CarbonLateDecodeRule)
+ }
+
+ override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+ def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil
+ def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = {
+ catalog.ParquetConversions ::
+ catalog.OrcConversions ::
+ CarbonPreInsertionCasts(sparkSession) ::
+ CarbonIUDAnalysisRule(sparkSession) ::
+ AnalyzeCreateTable(sparkSession) ::
+ PreprocessTableInsertion(conf) ::
+ DataSourceAnalysis(conf) ::
+ (if (conf.runSQLonFile) {
+ new ResolveDataSource(sparkSession) :: Nil
+ } else { Nil })
+ }
+
+ override lazy val analyzer: Analyzer =
+ new CarbonAnalyzer(catalog, conf, sparkSession,
+ new Analyzer(catalog, conf) {
+ override val extendedResolutionRules =
+ if (extendedAnalyzerRules.nonEmpty) {
+ extendedAnalyzerRules ++ internalAnalyzerRules
+ } else {
+ internalAnalyzerRules
+ }
+ override val extendedCheckRules = Seq(
+ PreWriteCheck(conf, catalog))
+ }
+ )
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ override lazy val catalog = {
+ new CarbonSessionCatalog(
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ newHadoopConf())
+ }
+}
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+ conf: CatalystConf,
+ sparkSession: SparkSession,
+ analyzer: Analyzer) extends Analyzer(catalog, conf) {
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ var logicalPlan = analyzer.execute(plan)
+ logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+ CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+ }
+}
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+ super.execute(transFormedPlan)
+ }
+}
+
+object CarbonOptimizerUtil {
+ def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = {
+ // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule,
+ // And optimize whole plan at once.
+ val transFormedPlan = plan.transform {
+ case filter: Filter =>
+ filter.transformExpressions {
+ case s: ScalarSubquery =>
+ val tPlan = s.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ ScalarSubquery(tPlan, s.children, s.exprId)
+ case p: PredicateSubquery =>
+ val tPlan = p.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId)
+ }
+ }
+ transFormedPlan
+ }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
+
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ helper.createCarbonTable(
+ tableHeader = ctx.createTableHeader,
+ skewSpecContext = ctx.skewSpec,
+ bucketSpecContext = ctx.bucketSpec,
+ partitionColumns = ctx.partitionColumns,
+ columns = ctx.columns,
+ tablePropertyList = ctx.tablePropertyList,
+ locationSpecContext = ctx.locationSpec(),
+ tableComment = Option(ctx.STRING()).map(string),
+ ctas = ctx.AS,
+ query = ctx.query)
+ } else {
+ super.visitCreateTable(ctx)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
deleted file mode 100644
index 3c151f0..0000000
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-
-import scala.collection.generic.SeqFactory
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
-import org.apache.spark.sql.catalyst.optimizer.Optimizer
-import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils.string
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
-import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SQLConf, SessionState}
-import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
-import org.apache.spark.sql.types.DecimalType
-import org.apache.spark.util.CarbonReflectionUtils
-
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonSessionCatalog(
- externalCatalog: HiveExternalCatalog,
- globalTempViewManager: GlobalTempViewManager,
- functionRegistry: FunctionRegistry,
- sparkSession: SparkSession,
- conf: SQLConf,
- hadoopConf: Configuration,
- parser: ParserInterface,
- functionResourceLoader: FunctionResourceLoader)
- extends HiveSessionCatalog(
- externalCatalog,
- globalTempViewManager,
- new HiveMetastoreCatalog(sparkSession),
- functionRegistry,
- conf,
- hadoopConf,
- parser,
- functionResourceLoader
- ) {
-
- lazy val carbonEnv = {
- val env = new CarbonEnv
- env.init(sparkSession)
- env
- }
-
- def getCarbonEnv() : CarbonEnv = {
- carbonEnv
- }
-
- // Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
-
-
-
-
- override def lookupRelation(name: TableIdentifier): LogicalPlan = {
- val rtnRelation = super.lookupRelation(name)
- var toRefreshRelation = false
- rtnRelation match {
- case SubqueryAlias(_,
- LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
- toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
- case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
- toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
- case SubqueryAlias(_, relation) if
- relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
- relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
- relation.getClass.getName.equals(
- "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
- val catalogTable =
- CarbonReflectionUtils.getFieldOfCatalogTable(
- "tableMeta",
- relation).asInstanceOf[CatalogTable]
- toRefreshRelation =
- CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
- case _ =>
- }
-
- if (toRefreshRelation) {
- super.lookupRelation(name)
- } else {
- rtnRelation
- }
- }
-
- /**
- * returns hive client from HiveExternalCatalog
- *
- * @return
- */
- def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
- sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
- }
-
- override def createPartitions(
- tableName: TableIdentifier,
- parts: Seq[CatalogTablePartition],
- ignoreIfExists: Boolean): Unit = {
- try {
- val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
- // Get the properties from thread local
- val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
- if (carbonSessionInfo != null) {
- val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
- super.createPartitions(tableName, updatedParts, ignoreIfExists)
- } else {
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- } catch {
- case e: Exception =>
- super.createPartitions(tableName, parts, ignoreIfExists)
- }
- }
-
- /**
- * This is alternate way of getting partition information. It first fetches all partitions from
- * hive and then apply filter instead of querying hive along with filters.
- * @param partitionFilters
- * @param sparkSession
- * @param identifier
- * @return
- */
- def getPartitionsAlternate(partitionFilters: Seq[Expression],
- sparkSession: SparkSession,
- identifier: TableIdentifier) = {
- val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
- ExternalCatalogUtils.prunePartitionsByFilter(
- sparkSession.sessionState.catalog.getTableMetadata(identifier),
- allPartitions,
- partitionFilters,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
- }
-}
-
-
-class CarbonAnalyzer(catalog: SessionCatalog,
- conf: SQLConf,
- sparkSession: SparkSession,
- analyzer: Analyzer) extends Analyzer(catalog, conf) {
- override def execute(plan: LogicalPlan): LogicalPlan = {
- var logicalPlan = analyzer.execute(plan)
- logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
- CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
- }
-}
-
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
- parentState: Option[SessionState] = None)
- extends HiveSessionStateBuilder(sparkSession, parentState) {
-
- override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
- experimentalMethods.extraStrategies =
- Seq(new StreamingTableStrategy(sparkSession),
- new CarbonLateDecodeStrategy,
- new DDLStrategy(sparkSession)
- )
- experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
- new CarbonUDFTransformRule,
- new CarbonLateDecodeRule)
-
- /**
- * Internal catalog for managing table and database states.
- */
- /**
- * Create a [[CarbonSessionCatalogBuild]].
- */
- override protected lazy val catalog: CarbonSessionCatalog = {
- val catalog = new CarbonSessionCatalog(
- externalCatalog,
- session.sharedState.globalTempViewManager,
- functionRegistry,
- sparkSession,
- conf,
- SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
- sqlParser,
- resourceLoader)
- parentState.foreach(_.catalog.copyStateTo(catalog))
- catalog
- }
-
- private def externalCatalog: HiveExternalCatalog =
- session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
- /**
- * Create a Hive aware resource loader.
- */
- override protected lazy val resourceLoader: HiveSessionResourceLoader = {
- val client: HiveClient = externalCatalog.client.newSession()
- new HiveSessionResourceLoader(session, client)
- }
-
- override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
-
- override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
- new Analyzer(catalog, conf) {
-
- override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
- new ResolveHiveSerdeTable(session) +:
- new FindDataSourceTable(session) +:
- new ResolveSQLOnFile(session) +:
- new CarbonIUDAnalysisRule(sparkSession) +:
- new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
-
- override val extendedCheckRules: Seq[LogicalPlan => Unit] =
- PreWriteCheck :: HiveOnlyCheck :: Nil
-
- override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
- new DetermineTableStats(session) +:
- RelationConversions(conf, catalog) +:
- PreprocessTableCreation(session) +:
- PreprocessTableInsertion(conf) +:
- DataSourceAnalysis(conf) +:
- HiveAnalysis +:
- customPostHocResolutionRules
- }
- )
-
- override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-
-}
-
-
-class CarbonOptimizer(
- catalog: SessionCatalog,
- conf: SQLConf,
- experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
- // optimize whole plan at once.
- val transFormedPlan = plan.transform {
- case filter: Filter =>
- filter.transformExpressions {
- case s: ScalarSubquery =>
- val tPlan = s.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- ScalarSubquery(tPlan, s.children, s.exprId)
- case e: Exists =>
- val tPlan = e.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
- case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
- val tPlan = sub.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- In(value, Seq(ListQuery(tPlan, l.children , exprId)))
- }
- }
- super.execute(transFormedPlan)
- }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
- extends SparkSqlAstBuilder(conf) {
-
- val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
- override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
- val fileStorage = helper.getFileStorage(ctx.createFileFormat)
-
- if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- helper.createCarbonTable(
- tableHeader = ctx.createTableHeader,
- skewSpecContext = ctx.skewSpec,
- bucketSpecContext = ctx.bucketSpec,
- partitionColumns = ctx.partitionColumns,
- columns = ctx.columns,
- tablePropertyList = ctx.tablePropertyList,
- locationSpecContext = ctx.locationSpec(),
- tableComment = Option(ctx.STRING()).map(string),
- ctas = ctx.AS,
- query = ctx.query)
- } else {
- super.visitCreateHiveTable(ctx)
- }
- }
-
- override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
- val newColumn = visitColType(ctx.colType)
- if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
- throw new MalformedCarbonCommandException(
- "Column names provided are different. Both the column names should be same")
- }
-
- val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
- case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
- case _ => (newColumn.dataType.typeName.toLowerCase, None)
- }
-
- val alterTableChangeDataTypeModel =
- AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
- new CarbonSpark2SqlParser()
- .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
- ctx.tableIdentifier().table.getText.toLowerCase,
- ctx.identifier.getText.toLowerCase,
- newColumn.name.toLowerCase)
-
- CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
- }
-
-
- override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-
- val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
- val fields = parser.getFields(cols)
- val tblProperties = scala.collection.mutable.Map.empty[String, String]
- val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
- new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
- .map(_.getText)),
- ctx.tableIdentifier.table.getText.toLowerCase,
- fields,
- Seq.empty,
- tblProperties,
- None,
- true)
-
- val alterTableAddColumnsModel = AlterTableAddColumnsModel(
- Option(ctx.tableIdentifier().db).map(_.getText),
- ctx.tableIdentifier.table.getText,
- tblProperties.toMap,
- tableModel.dimCols,
- tableModel.msrCols,
- tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
- CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
- }
-
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- super.visitCreateTable(ctx)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
deleted file mode 100644
index 2128ffd..0000000
--- a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.SQLConf.buildConf
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
-
-/**
- * To initialize dynamic values default param
- */
-class CarbonSQLConf(sparkSession: SparkSession) {
-
- val carbonProperties = CarbonProperties.getInstance()
-
- /**
- * To initialize dynamic param defaults along with usage docs
- */
- def addDefaultCarbonParams(): Unit = {
- val ENABLE_UNSAFE_SORT =
- buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
- .doc("To enable/ disable unsafe sort.")
- .booleanConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
- buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
- .doc("To set carbon task distribution.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- val BAD_RECORDS_LOGGER_ENABLE =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
- .doc("To enable/ disable carbon bad record logger.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants
- .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- val BAD_RECORDS_ACTION =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
- .doc("To configure the bad records action.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- val IS_EMPTY_DATA_BAD_RECORD =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
- .doc("Property to decide weather empty data to be considered bad/ good record.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
- .toBoolean)
- val SORT_SCOPE =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
- .doc("Property to specify sort scope.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- val BATCH_SORT_SIZE_INMB =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
- .doc("Property to specify batch sort size in MB.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- val SINGLE_PASS =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
- .doc("Property to enable/disable single_pass.")
- .booleanConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- val BAD_RECORD_PATH =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
- .doc("Property to configure the bad record location.")
- .stringConf
- .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- val GLOBAL_SORT_PARTITIONS =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
- .doc("Property to configure the global sort partitions.")
- .stringConf
- .createWithDefault(carbonProperties
- .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- val DATEFORMAT =
- buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
- .doc("Property to configure data format for date type columns.")
- .stringConf
- .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- val CARBON_INPUT_SEGMENTS = buildConf(
- "carbon.input.segments.<database_name>.<table_name>")
- .doc("Property to configure the list of segments to query.").stringConf
- .createWithDefault(carbonProperties
- .getProperty("carbon.input.segments.<database_name>.<table_name>", "*"))
- }
- /**
- * to set the dynamic properties default values
- */
- def addDefaultCarbonSessionParams(): Unit = {
- sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
- CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
- sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
- carbonProperties
- .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
- CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
- CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
- sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
new file mode 100644
index 0000000..3c151f0
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+
+import scala.collection.generic.SeqFactory
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.ParserUtils.string
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
+import org.apache.spark.sql.hive.client.HiveClient
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
+import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.util.CarbonReflectionUtils
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+ externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ functionRegistry: FunctionRegistry,
+ sparkSession: SparkSession,
+ conf: SQLConf,
+ hadoopConf: Configuration,
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader)
+ extends HiveSessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ new HiveMetastoreCatalog(sparkSession),
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser,
+ functionResourceLoader
+ ) {
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+ def getCarbonEnv() : CarbonEnv = {
+ carbonEnv
+ }
+
+ // Initialize all listeners to the Operation bus.
+ CarbonEnv.initListeners()
+
+
+
+
+ override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+ val rtnRelation = super.lookupRelation(name)
+ var toRefreshRelation = false
+ rtnRelation match {
+ case SubqueryAlias(_,
+ LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
+ toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+ case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+ toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession)
+ case SubqueryAlias(_, relation) if
+ relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+ relation.getClass.getName.equals(
+ "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
+ val catalogTable =
+ CarbonReflectionUtils.getFieldOfCatalogTable(
+ "tableMeta",
+ relation).asInstanceOf[CatalogTable]
+ toRefreshRelation =
+ CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession)
+ case _ =>
+ }
+
+ if (toRefreshRelation) {
+ super.lookupRelation(name)
+ } else {
+ rtnRelation
+ }
+ }
+
+ /**
+ * returns hive client from HiveExternalCatalog
+ *
+ * @return
+ */
+ def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+ .asInstanceOf[HiveExternalCatalog].client
+ }
+
+ override def createPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ try {
+ val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ // Get the properties from thread local
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ if (carbonSessionInfo != null) {
+ val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
+ } else {
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ } catch {
+ case e: Exception =>
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ }
+
+ /**
+ * This is alternate way of getting partition information. It first fetches all partitions from
+ * hive and then apply filter instead of querying hive along with filters.
+ * @param partitionFilters
+ * @param sparkSession
+ * @param identifier
+ * @return
+ */
+ def getPartitionsAlternate(partitionFilters: Seq[Expression],
+ sparkSession: SparkSession,
+ identifier: TableIdentifier) = {
+ val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+ ExternalCatalogUtils.prunePartitionsByFilter(
+ sparkSession.sessionState.catalog.getTableMetadata(identifier),
+ allPartitions,
+ partitionFilters,
+ sparkSession.sessionState.conf.sessionLocalTimeZone)
+ }
+}
+
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+ conf: SQLConf,
+ sparkSession: SparkSession,
+ analyzer: Analyzer) extends Analyzer(catalog, conf) {
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ var logicalPlan = analyzer.execute(plan)
+ logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+ CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+ }
+}
+
+
+/**
+ * Session state implementation to override sql parser and adding strategies
+ *
+ * @param sparkSession
+ */
+class CarbonSessionStateBuilder(sparkSession: SparkSession,
+ parentState: Option[SessionState] = None)
+ extends HiveSessionStateBuilder(sparkSession, parentState) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+ experimentalMethods.extraStrategies =
+ Seq(new StreamingTableStrategy(sparkSession),
+ new CarbonLateDecodeStrategy,
+ new DDLStrategy(sparkSession)
+ )
+ experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+ new CarbonUDFTransformRule,
+ new CarbonLateDecodeRule)
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ /**
+ * Create a [[CarbonSessionCatalogBuild]].
+ */
+ override protected lazy val catalog: CarbonSessionCatalog = {
+ val catalog = new CarbonSessionCatalog(
+ externalCatalog,
+ session.sharedState.globalTempViewManager,
+ functionRegistry,
+ sparkSession,
+ conf,
+ SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+ sqlParser,
+ resourceLoader)
+ parentState.foreach(_.catalog.copyStateTo(catalog))
+ catalog
+ }
+
+ private def externalCatalog: HiveExternalCatalog =
+ session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+ /**
+ * Create a Hive aware resource loader.
+ */
+ override protected lazy val resourceLoader: HiveSessionResourceLoader = {
+ val client: HiveClient = externalCatalog.client.newSession()
+ new HiveSessionResourceLoader(session, client)
+ }
+
+ override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+ override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+ new Analyzer(catalog, conf) {
+
+ override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+ new ResolveHiveSerdeTable(session) +:
+ new FindDataSourceTable(session) +:
+ new ResolveSQLOnFile(session) +:
+ new CarbonIUDAnalysisRule(sparkSession) +:
+ new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+
+ override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+ PreWriteCheck :: HiveOnlyCheck :: Nil
+
+ override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+ new DetermineTableStats(session) +:
+ RelationConversions(conf, catalog) +:
+ PreprocessTableCreation(session) +:
+ PreprocessTableInsertion(conf) +:
+ DataSourceAnalysis(conf) +:
+ HiveAnalysis +:
+ customPostHocResolutionRules
+ }
+ )
+
+ override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
+
+}
+
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+ // optimize whole plan at once.
+ val transFormedPlan = plan.transform {
+ case filter: Filter =>
+ filter.transformExpressions {
+ case s: ScalarSubquery =>
+ val tPlan = s.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ ScalarSubquery(tPlan, s.children, s.exprId)
+ case e: Exists =>
+ val tPlan = e.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+ case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
+ val tPlan = sub.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ In(value, Seq(ListQuery(tPlan, l.children , exprId)))
+ }
+ }
+ super.execute(transFormedPlan)
+ }
+}
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
+
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+ override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+ val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ helper.createCarbonTable(
+ tableHeader = ctx.createTableHeader,
+ skewSpecContext = ctx.skewSpec,
+ bucketSpecContext = ctx.bucketSpec,
+ partitionColumns = ctx.partitionColumns,
+ columns = ctx.columns,
+ tablePropertyList = ctx.tablePropertyList,
+ locationSpecContext = ctx.locationSpec(),
+ tableComment = Option(ctx.STRING()).map(string),
+ ctas = ctx.AS,
+ query = ctx.query)
+ } else {
+ super.visitCreateHiveTable(ctx)
+ }
+ }
+
+ override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+ val newColumn = visitColType(ctx.colType)
+ if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+ throw new MalformedCarbonCommandException(
+ "Column names provided are different. Both the column names should be same")
+ }
+
+ val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+ case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+ case _ => (newColumn.dataType.typeName.toLowerCase, None)
+ }
+
+ val alterTableChangeDataTypeModel =
+ AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+ new CarbonSpark2SqlParser()
+ .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+ ctx.tableIdentifier().table.getText.toLowerCase,
+ ctx.identifier.getText.toLowerCase,
+ newColumn.name.toLowerCase)
+
+ CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+ }
+
+
+ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+
+ val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+ val fields = parser.getFields(cols)
+ val tblProperties = scala.collection.mutable.Map.empty[String, String]
+ val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+ .map(_.getText)),
+ ctx.tableIdentifier.table.getText.toLowerCase,
+ fields,
+ Seq.empty,
+ tblProperties,
+ None,
+ true)
+
+ val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+ Option(ctx.tableIdentifier().db).map(_.getText),
+ ctx.tableIdentifier.table.getText,
+ tblProperties.toMap,
+ tableModel.dimCols,
+ tableModel.msrCols,
+ tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+ CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+ }
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ super.visitCreateTable(ctx)
+ }
+}