You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/09/27 06:34:05 UTC
[2/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon
command to separate file in spark2 integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
new file mode 100644
index 0000000..21b974a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
+import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/**
+ * Command for Alter Table Add & Split partition
+ * Add is a special case of Splitting the default partition (part0)
+ */
+case class AlterTableSplitCarbonPartitionCommand(
+ splitPartitionModel: AlterTableSplitPartitionModel)
+ extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
+
+ val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+ // TODO will add rollback function in case of process data failure
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ processData(sparkSession)
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+ val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val tableName = splitPartitionModel.tableName
+ val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+ val storePath = relation.tableMeta.storePath
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+ if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+ LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+ sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+ }
+ val table = relation.tableMeta.carbonTable
+ val partitionInfo = table.getPartitionInfo(tableName)
+ val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+ // keep a copy of partitionIdList before update partitionInfo.
+ // will be used in partition data scan
+ oldPartitionIds.addAll(partitionIds.asJava)
+
+ if (partitionInfo == null) {
+ sys.error(s"Table $tableName is not a partition table.")
+ }
+ if (partitionInfo.getPartitionType == PartitionType.HASH) {
+ sys.error(s"Hash partition table cannot be added or split!")
+ }
+
+ updatePartitionInfo(partitionInfo, partitionIds)
+
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+ // read TableInfo
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+ dbName, tableName, storePath)
+ val tableSchema = wrapperTableInfo.getFactTable
+ tableSchema.setPartitionInfo(partitionInfo)
+ wrapperTableInfo.setFactTable(tableSchema)
+ wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ val thriftTable =
+ schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+ dbName, tableName, storePath)
+ CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+ // update the schema modified time
+ carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+ sparkSession.catalog.refreshTable(tableName)
+ Seq.empty
+ }
+
+ private def updatePartitionInfo(partitionInfo: PartitionInfo,
+ partitionIds: List[Int]) = {
+ val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+ val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+ PartitionUtils.updatePartitionInfo(
+ partitionInfo,
+ partitionIds,
+ splitPartitionModel.partitionId.toInt,
+ splitPartitionModel.splitInfo,
+ timestampFormatter,
+ dateFormatter)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+ val tableName = splitPartitionModel.tableName
+ var locks = List.empty[ICarbonLock]
+ var success = false
+ try {
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.DROP_TABLE_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.ALTER_PARTITION_LOCK)
+ locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+ locksToBeAcquired)(sparkSession)
+ val carbonLoadModel = new CarbonLoadModel()
+ val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ val storePath = relation.tableMeta.storePath
+ val table = relation.tableMeta.carbonTable
+ val carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
+ carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+ carbonLoadModel.setStorePath(storePath)
+ val loadStartTime = CarbonUpdateUtil.readCurrentTime
+ carbonLoadModel.setFactTimeStamp(loadStartTime)
+ CarbonDataRDDFactory.alterTableSplitPartition(
+ sparkSession.sqlContext,
+ splitPartitionModel.partitionId.toInt.toString,
+ carbonLoadModel,
+ oldPartitionIds.asScala.toList
+ )
+ success = true
+ } catch {
+ case e: Exception =>
+ success = false
+ sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
+ } finally {
+ AlterTableUtil.releaseLocks(locks)
+ CacheProvider.getInstance().dropAllCache()
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.info("Locks released after alter table add/split partition action.")
+ LOGGER.audit("Locks released after alter table add/split partition action.")
+ if (success) {
+ LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+ }
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
new file mode 100644
index 0000000..224304a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.command.{RunnableCommand, SchemaProcessCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Command for show table partitions Command
+ */
+private[sql] case class ShowCarbonPartitionsCommand(
+ tableIdentifier: TableIdentifier)
+ extends RunnableCommand with SchemaProcessCommand {
+
+ override val output: Seq[Attribute] = CommonUtil.partitionInfoOutput
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).
+ asInstanceOf[CarbonRelation]
+ val carbonTable = relation.tableMeta.carbonTable
+ val tableName = carbonTable.getFactTableName
+ val partitionInfo = carbonTable.getPartitionInfo(
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ if (partitionInfo == null) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+ }
+ val partitionType = partitionInfo.getPartitionType
+ val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+ val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
+ LOGGER.info("partition column name:" + columnName)
+ CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
new file mode 100644
index 0000000..6e6a4b1
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
+
+private[sql] case class AlterTableAddColumnCommand(
+ alterTableAddColumnsModel: AlterTableAddColumnsModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = alterTableAddColumnsModel.tableName
+ val dbName = alterTableAddColumnsModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ var locks = List.empty[ICarbonLock]
+ var timeStamp = 0L
+ var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
+ var carbonTable: CarbonTable = null
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+ // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st
+ // operation is success and updates the schema file. 2nd operation will get the lock after
+ // completion of 1st operation but as look up relation is called before it will have the
+ // older carbon table and this can lead to inconsistent state in the system. Therefor look
+ // up relation should be called after acquiring the lock
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ // get the latest carbon table and check for column existence
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getStorePath)
+ newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
+ dbName,
+ wrapperTableInfo,
+ carbonTablePath,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath, sparkSession.sparkContext).process
+ // generate dictionary files for the newly added columns
+ new AlterTableAddColumnRDD(sparkSession.sparkContext,
+ newCols,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
+ timeStamp = System.currentTimeMillis
+ val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(timeStamp)
+ schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+ val thriftTable = schemaConverter
+ .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+ thriftTable)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, "Alter table add columns failed")
+ if (newCols.nonEmpty) {
+ LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
+ new AlterTableDropColumnRDD(sparkSession.sparkContext,
+ newCols,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
+ AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+ }
+ sys.error(s"Alter table add operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
new file mode 100644
index 0000000..be87bbb
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
+
+private[sql] case class AlterTableDataTypeChangeCommand(
+ alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = alterTableDataTypeChangeModel.tableName
+ val dbName = alterTableDataTypeChangeModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ var locks = List.empty[ICarbonLock]
+ // get the latest carbon table and check for column existence
+ var carbonTable: CarbonTable = null
+ var timeStamp = 0L
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ val columnName = alterTableDataTypeChangeModel.columnName
+ val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+ if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Column $columnName does not exist")
+ sys.error(s"Column does not exist: $columnName")
+ }
+ val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
+ if (carbonColumn.size == 1) {
+ CarbonScalaUtil
+ .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
+ } else {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Column $columnName is invalid")
+ sys.error(s"Invalid Column: $columnName")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ // maintain the added column for schema evolution history
+ var addColumnSchema: ColumnSchema = null
+ var deletedColumnSchema: ColumnSchema = null
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+ columnSchemaList.foreach { columnSchema =>
+ if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+ deletedColumnSchema = columnSchema.deepCopy
+ columnSchema.setData_type(DataTypeConverterUtil
+ .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+ columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+ columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+ addColumnSchema = columnSchema
+ }
+ }
+ timeStamp = System.currentTimeMillis
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+ schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+ schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaEvolutionEntry,
+ tableInfo)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception => LOGGER
+ .error("Alter table change datatype failed : " + e.getMessage)
+ if (carbonTable != null) {
+ AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
+ }
+ sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
new file mode 100644
index 0000000..2f1e3d9
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
+
+private[sql] case class AlterTableDropColumnCommand(
+ alterTableDropColumnModel: AlterTableDropColumnModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val tableName = alterTableDropColumnModel.tableName
+ val dbName = alterTableDropColumnModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+ var locks = List.empty[ICarbonLock]
+ var timeStamp = 0L
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+ // get the latest carbon table and check for column existence
+ var carbonTable: CarbonTable = null
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ carbonTable = metastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
+ val partitionInfo = carbonTable.getPartitionInfo(tableName)
+ if (partitionInfo != null) {
+ val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala
+ .map(_.getColumnName)
+ // check each column existence in the table
+ val partitionColumns = alterTableDropColumnModel.columns.filter {
+ tableColumn => partitionColumnSchemaList.contains(tableColumn)
+ }
+ if (partitionColumns.nonEmpty) {
+ throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
+ s"$partitionColumns")
+ }
+ }
+ val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+ var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
+ .ColumnSchema]()
+ var keyColumnCountToBeDeleted = 0
+ // TODO: if deleted column list includes bucketted column throw an error
+ alterTableDropColumnModel.columns.foreach { column =>
+ var columnExist = false
+ tableColumns.foreach { tableColumn =>
+ // column should not be already deleted and should exist in the table
+ if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+ if (tableColumn.isDimension) {
+ keyColumnCountToBeDeleted += 1
+ if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+ dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
+ }
+ }
+ columnExist = true
+ }
+ }
+ if (!columnExist) {
+ sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+ }
+ }
+ // take the total key column count. key column to be deleted should not
+ // be >= key columns in schema
+ val totalKeyColumnInSchema = tableColumns.count {
+ tableColumn => !tableColumn.isInvisible && tableColumn.isDimension
+ }
+ if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+ sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ // maintain the deleted columns for schema evolution history
+ var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+ alterTableDropColumnModel.columns.foreach { column =>
+ columnSchemaList.foreach { columnSchema =>
+ if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+ deletedColumnSchema += columnSchema.deepCopy
+ columnSchema.invisible = true
+ }
+ }
+ }
+ // add deleted columns to schema evolution history and update the schema
+ timeStamp = System.currentTimeMillis
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+ schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaEvolutionEntry,
+ tableInfo)(sparkSession,
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState])
+ // TODO: 1. add check for deletion of index tables
+ // delete dictionary files for dictionary column and clear dictionary cache from memory
+ new AlterTableDropColumnRDD(sparkSession.sparkContext,
+ dictionaryColumns,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
+ LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception => LOGGER
+ .error("Alter table drop columns failed : " + e.getMessage)
+ if (carbonTable != null) {
+ AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
+ }
+ sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
new file mode 100644
index 0000000..af361d5
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.schema
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.SchemaEvolutionEntry
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+private[sql] case class AlterTableRenameTableCommand(
+ alterTableRenameModel: AlterTableRenameModel)
+ extends RunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
+ val newTableIdentifier = alterTableRenameModel.newTableIdentifier
+ val oldDatabaseName = oldTableIdentifier.database
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ val newDatabaseName = newTableIdentifier.database
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
+ throw new MalformedCarbonCommandException("Database name should be same for both tables")
+ }
+ val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
+ if (tableExists) {
+ throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
+ s"already exists")
+ }
+ val oldTableName = oldTableIdentifier.table.toLowerCase
+ val newTableName = newTableIdentifier.table.toLowerCase
+ LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relation: CarbonRelation =
+ metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Rename table request has failed. " +
+ s"Table $oldDatabaseName.$oldTableName does not exist")
+ sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
+ }
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.DROP_TABLE_LOCK)
+ var locks = List.empty[ICarbonLock]
+ var timeStamp = 0L
+ var carbonTable: CarbonTable = null
+ try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
+ sparkSession)
+ val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].tableMeta
+ carbonTable = tableMeta.carbonTable
+ // get the latest carbon table and check for column existence
+ val carbonTablePath = CarbonStorePath.
+ getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath))
+ val tableMetadataFile = carbonTablePath.getPath
+ val tableInfo: org.apache.carbondata.format.TableInfo =
+ metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setTableName(newTableName)
+ timeStamp = System.currentTimeMillis()
+ schemaEvolutionEntry.setTime_stamp(timeStamp)
+ renameBadRecords(oldTableName, newTableName, oldDatabaseName)
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+ .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+ newTableName)
+ if (!rename) {
+ renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+ sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
+ }
+ }
+ val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+ newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+ val newTablePath = metastore.updateTableSchema(newTableIdentifier,
+ carbonTable.getCarbonTableIdentifier,
+ tableInfo,
+ schemaEvolutionEntry,
+ tableMeta.tablePath)(sparkSession)
+ metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+ s"('tableName'='$newTableName', " +
+ s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+ sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
+ Some(oldDatabaseName)).quotedString)
+ LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
+ LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, "Rename table failed: " + e.getMessage)
+ if (carbonTable != null) {
+ AlterTableUtil
+ .revertRenameTableChanges(oldTableIdentifier,
+ newTableName,
+ carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier.getTableId,
+ timeStamp)(
+ sparkSession)
+ renameBadRecords(newTableName, oldTableName, oldDatabaseName)
+ }
+ sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
+ } finally {
+ // release lock after command execution completion
+ AlterTableUtil.releaseLocks(locks)
+ // case specific to rename table as after table rename old table path will not be found
+ if (carbonTable != null) {
+ AlterTableUtil
+ .releaseLocksManually(locks,
+ locksToBeAcquired,
+ oldDatabaseName,
+ newTableName,
+ carbonTable.getStorePath)
+ }
+ }
+ Seq.empty
+ }
+
+ private def renameBadRecords(
+ oldTableName: String,
+ newTableName: String,
+ dataBaseName: String): Unit = {
+ val oldPath = CarbonUtil
+ .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName)
+ val newPath = CarbonUtil
+ .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName)
+ val fileType = FileFactory.getFileType(oldPath)
+ if (FileFactory.isFileExist(oldPath, fileType)) {
+ val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType)
+ .renameForce(newPath)
+ if (!renameSuccess) {
+ sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName")
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
new file mode 100644
index 0000000..c6ca950
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.strategy
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.{AtomicType, IntegerType, StringType}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.BucketingInfo
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * Carbon specific optimization for late decode (convert dictionary key to value as late as
+ * possible), which can improve the aggregation performance and reduce memory usage
+ */
+private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
+ val PUSHED_FILTERS = "PushedFilters"
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case PhysicalOperation(projects, filters, l: LogicalRelation)
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ pruneFilterProject(
+ l,
+ projects,
+ filters,
+ (a, f, needDecoder) => toCatalystRDD(l, a, relation.buildScan(
+ a.map(_.name).toArray, f), needDecoder)) :: Nil
+ case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, child) =>
+ if ((profile.isInstanceOf[IncludeProfile] && profile.isEmpty) ||
+ !CarbonDictionaryDecoder.
+ isRequiredToDecode(CarbonDictionaryDecoder.
+ getDictionaryColumnMapping(child.output, relations, profile, aliasMap))) {
+ planLater(child) :: Nil
+ } else {
+ CarbonDictionaryDecoder(relations,
+ profile,
+ aliasMap,
+ planLater(child),
+ SparkSession.getActiveSession.get
+ ) :: Nil
+ }
+ case _ => Nil
+ }
+ }
+
+
+ def getDecoderRDD(
+ logicalRelation: LogicalRelation,
+ projectExprsNeedToDecode: ArrayBuffer[AttributeReference],
+ rdd: RDD[InternalRow],
+ output: Seq[Attribute]): RDD[InternalRow] = {
+ val table = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation])
+ val attrs = projectExprsNeedToDecode.map { attr =>
+ val newAttr = AttributeReference(attr.name,
+ attr.dataType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, Option(table.carbonRelation.tableName))
+ relation.addAttribute(newAttr)
+ newAttr
+ }
+
+ new CarbonDecoderRDD(
+ Seq(relation),
+ IncludeProfile(attrs),
+ CarbonAliasDecoderRelation(),
+ rdd,
+ output,
+ CarbonEnv.getInstance(SparkSession.getActiveSession.get).storePath,
+ table.carbonTable.getTableInfo.serialize())
+ }
+
+ private[this] def toCatalystRDD(
+ relation: LogicalRelation,
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ needDecode: ArrayBuffer[AttributeReference]):
+ RDD[InternalRow] = {
+ if (needDecode.nonEmpty) {
+ rdd.asInstanceOf[CarbonScanRDD].setVectorReaderSupport(false)
+ getDecoderRDD(relation, needDecode, rdd, output)
+ } else {
+ rdd.asInstanceOf[CarbonScanRDD]
+ .setVectorReaderSupport(supportBatchedDataSource(relation.relation.sqlContext, output))
+ rdd
+ }
+ }
+
+ protected def pruneFilterProject(
+ relation: LogicalRelation,
+ projects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Array[Filter],
+ ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+ pruneFilterProjectRaw(
+ relation,
+ projects,
+ filterPredicates,
+ (requestedColumns, _, pushedFilters, a) => {
+ scanBuilder(requestedColumns, pushedFilters.toArray, a)
+ })
+ }
+
+ protected def pruneFilterProjectRaw(
+ relation: LogicalRelation,
+ rawProjects: Seq[NamedExpression],
+ filterPredicates: Seq[Expression],
+ scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+ ArrayBuffer[AttributeReference]) => RDD[InternalRow]) = {
+ val projects = rawProjects.map {p =>
+ p.transform {
+ case CustomDeterministicExpression(exp) => exp
+ }
+ }.asInstanceOf[Seq[NamedExpression]]
+
+ val projectSet = AttributeSet(projects.flatMap(_.references))
+ val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
+
+ val candidatePredicates = filterPredicates.map {
+ _ transform {
+ case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
+ }
+ }
+
+ val (unhandledPredicates, pushedFilters) =
+ selectFilters(relation.relation, candidatePredicates)
+
+ // A set of column attributes that are only referenced by pushed down filters. We can eliminate
+ // them from requested columns.
+ val handledSet = {
+ val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
+ val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
+ AttributeSet(handledPredicates.flatMap(_.references)) --
+ (projectSet ++ unhandledSet).map(relation.attributeMap)
+ }
+
+ // Combines all Catalyst filter `Expression`s that are either not convertible to data source
+ // `Filter`s or cannot be handled by `relation`.
+ val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
+ val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ val map = table.carbonRelation.metaData.dictionaryMap
+
+ val metadata: Map[String, String] = {
+ val pairs = ArrayBuffer.empty[(String, String)]
+
+ if (pushedFilters.nonEmpty) {
+ pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]"))
+ }
+ pairs.toMap
+ }
+
+
+ val needDecoder = ArrayBuffer[AttributeReference]()
+ filterCondition match {
+ case Some(exp: Expression) =>
+ exp.references.collect {
+ case attr: AttributeReference =>
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ needDecoder += attr
+ }
+ }
+ case None =>
+ }
+
+ projects.map {
+ case attr: AttributeReference =>
+ case Alias(attr: AttributeReference, _) =>
+ case others =>
+ others.references.map { f =>
+ val dictionary = map.get(f.name)
+ if (dictionary.isDefined && dictionary.get) {
+ needDecoder += f.asInstanceOf[AttributeReference]
+ }
+ }
+ }
+
+ if (projects.map(_.toAttribute) == projects &&
+ projectSet.size == projects.size &&
+ filterSet.subsetOf(projectSet)) {
+ // When it is possible to just use column pruning to get the right projection and
+ // when the columns of this projection are enough to evaluate all filter conditions,
+ // just do a scan followed by a filter, with no extra project.
+ val requestedColumns = projects
+ // Safe due to if above.
+ .asInstanceOf[Seq[Attribute]]
+ // Match original case of attributes.
+ .map(relation.attributeMap)
+ // Don't request columns that are only referenced by pushed filters.
+ .filterNot(handledSet.contains)
+ val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+
+ val updateProject = projects.map { expr =>
+ var attr = expr.toAttribute.asInstanceOf[AttributeReference]
+ if (!needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ attr = AttributeReference(attr.name, IntegerType, attr.nullable, attr.metadata)(attr
+ .exprId, attr.qualifier)
+ }
+ }
+ attr
+ }
+ val scan = getDataSourceScan(relation,
+ updateProject,
+ scanBuilder,
+ candidatePredicates,
+ pushedFilters,
+ metadata,
+ needDecoder,
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]])
+ filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
+ } else {
+
+ var newProjectList: Seq[Attribute] = Seq.empty
+ val updatedProjects = projects.map {
+ case a@Alias(s: ScalaUDF, name)
+ if name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID) ||
+ name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID) =>
+ val reference = AttributeReference(name, StringType, true)().withExprId(a.exprId)
+ newProjectList :+= reference
+ reference
+ case a@Alias(s: ScalaUDF, name)
+ if name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID) =>
+ val reference =
+ AttributeReference(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID,
+ StringType, true)().withExprId(a.exprId)
+ newProjectList :+= reference
+ a.transform {
+ case s: ScalaUDF =>
+ ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
+ }
+ case other => other
+ }
+ // Don't request columns that are only referenced by pushed filters.
+ val requestedColumns =
+ (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq ++ newProjectList
+ val updateRequestedColumns = updateRequestedColumnsFunc(requestedColumns, table, needDecoder)
+ val scan = getDataSourceScan(relation,
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]],
+ scanBuilder,
+ candidatePredicates,
+ pushedFilters,
+ metadata,
+ needDecoder,
+ updateRequestedColumns.asInstanceOf[Seq[Attribute]])
+ execution.ProjectExec(
+ updateRequestedColumnsFunc(updatedProjects, table,
+ needDecoder).asInstanceOf[Seq[NamedExpression]],
+ filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
+ }
+ }
+
+ def getDataSourceScan(relation: LogicalRelation,
+ output: Seq[Attribute],
+ scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
+ ArrayBuffer[AttributeReference]) => RDD[InternalRow],
+ candidatePredicates: Seq[Expression],
+ pushedFilters: Seq[Filter],
+ metadata: Map[String, String],
+ needDecoder: ArrayBuffer[AttributeReference],
+ updateRequestedColumns: Seq[Attribute]): DataSourceScanExec = {
+ val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) &&
+ needDecoder.isEmpty) {
+ BatchedDataSourceScanExec(
+ output,
+ scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+ relation.relation,
+ getPartitioning(table.carbonTable, updateRequestedColumns),
+ metadata,
+ relation.catalogTable.map(_.identifier))
+ } else {
+ RowDataSourceScanExec(output,
+ scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
+ relation.relation,
+ getPartitioning(table.carbonTable, updateRequestedColumns),
+ metadata,
+ relation.catalogTable.map(_.identifier))
+ }
+ }
+
+ def updateRequestedColumnsFunc(requestedColumns: Seq[Expression],
+ relation: CarbonDatasourceHadoopRelation,
+ needDecoder: ArrayBuffer[AttributeReference]): Seq[Expression] = {
+ val map = relation.carbonRelation.metaData.dictionaryMap
+ requestedColumns.map {
+ case attr: AttributeReference =>
+ if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+ attr
+ } else {
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ AttributeReference(attr.name,
+ IntegerType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier)
+ } else {
+ attr
+ }
+ }
+ case alias @ Alias(attr: AttributeReference, name) =>
+ if (needDecoder.exists(_.name.equalsIgnoreCase(attr.name))) {
+ alias
+ } else {
+ val dict = map.get(attr.name)
+ if (dict.isDefined && dict.get) {
+ alias.transform {
+ case attrLocal: AttributeReference =>
+ AttributeReference(attr.name,
+ IntegerType,
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier)
+ }
+ } else {
+ alias
+ }
+ }
+ case others => others
+ }
+ }
+
+ private def getPartitioning(carbonTable: CarbonTable,
+ output: Seq[Attribute]): Partitioning = {
+ val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
+ if (info != null) {
+ val cols = info.getListOfColumns.asScala
+ val sortColumn = carbonTable.
+ getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName
+ val numBuckets = info.getNumberOfBuckets
+ val bucketColumns = cols.flatMap { n =>
+ val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName))
+ attrRef match {
+ case Some(attr: AttributeReference) =>
+ Some(AttributeReference(attr.name,
+ CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType),
+ attr.nullable,
+ attr.metadata)(attr.exprId, attr.qualifier))
+ case _ => None
+ }
+ }
+ if (bucketColumns.size == cols.size) {
+ HashPartitioning(bucketColumns, numBuckets)
+ } else {
+ UnknownPartitioning(0)
+ }
+ } else {
+ UnknownPartitioning(0)
+ }
+ }
+
+ protected[sql] def selectFilters(
+ relation: BaseRelation,
+ predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {
+
+ // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
+ // called `predicate`s, while all data source filters of type `sources.Filter` are simply called
+ // `filter`s.
+
+ val translated: Seq[(Expression, Filter)] =
+ for {
+ predicate <- predicates
+ filter <- translateFilter(predicate)
+ } yield predicate -> filter
+
+ // A map from original Catalyst expressions to corresponding translated data source filters.
+ val translatedMap: Map[Expression, Filter] = translated.toMap
+
+ // Catalyst predicate expressions that cannot be translated to data source filters.
+ val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)
+
+ // Data source filters that cannot be handled by `relation`. The semantic of a unhandled filter
+ // at here is that a data source may not be able to apply this filter to every row
+ // of the underlying dataset.
+ val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet
+
+ val (unhandled, handled) = translated.partition {
+ case (predicate, filter) =>
+ unhandledFilters.contains(filter)
+ }
+
+ // Catalyst predicate expressions that can be translated to data source filters, but cannot be
+ // handled by `relation`.
+ val (unhandledPredicates, _) = unhandled.unzip
+
+ // Translated data source filters that can be handled by `relation`
+ val (_, handledFilters) = handled.unzip
+
+ // translated contains all filters that have been converted to the public Filter interface.
+ // We should always push them to the data source no matter whether the data source can apply
+ // a filter to every row or not.
+ val (_, translatedFilters) = translated.unzip
+
+ (unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
+ }
+
+
+ /**
+ * Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
+ *
+ * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
+ */
+ protected[sql] def translateFilter(predicate: Expression, or: Boolean = false): Option[Filter] = {
+ predicate match {
+ case or@Or(left, right) =>
+
+ val leftFilter = translateFilter(left, true)
+ val rightFilter = translateFilter(right, true)
+ if (leftFilter.isDefined && rightFilter.isDefined) {
+ Some(sources.Or(leftFilter.get, rightFilter.get))
+ } else {
+ None
+ }
+
+ case And(left, right) =>
+ val leftFilter = translateFilter(left, or)
+ val rightFilter = translateFilter(right, or)
+ if (or) {
+ if (leftFilter.isDefined && rightFilter.isDefined) {
+ (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
+ } else {
+ None
+ }
+ } else {
+ (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)
+ }
+ case EqualTo(a: Attribute, Literal(v, t)) =>
+ Some(sources.EqualTo(a.name, v))
+ case EqualTo(l@Literal(v, t), a: Attribute) =>
+ Some(sources.EqualTo(a.name, v))
+ case c@EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case Not(EqualTo(a: Attribute, Literal(v, t))) =>
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case Not(EqualTo(Literal(v, t), a: Attribute)) =>
+ Some(sources.Not(sources.EqualTo(a.name, v)))
+ case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+ case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+ case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.Not(sources.In(a.name, hSet.toArray)))
+ case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ val hSet = list.map(e => e.eval(EmptyRow))
+ Some(sources.In(a.name, hSet.toArray))
+ case c@Not(In(Cast(a: Attribute, _), list))
+ if !list.exists(!_.isInstanceOf[Literal]) =>
+ Some(CastExpr(c))
+ case c@In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+ Some(CastExpr(c))
+ case InSet(a: Attribute, set) =>
+ Some(sources.In(a.name, set.toArray))
+ case Not(InSet(a: Attribute, set)) =>
+ Some(sources.Not(sources.In(a.name, set.toArray)))
+ case GreaterThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThan(a.name, v))
+ case GreaterThan(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThan(a.name, v))
+ case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case LessThan(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThan(a.name, v))
+ case LessThan(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThan(a.name, v))
+ case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+ Some(sources.LessThanOrEqual(a.name, v))
+ case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+ Some(sources.GreaterThanOrEqual(a.name, v))
+ case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+ CastExpressionOptimization.checkIfCastCanBeRemove(c)
+ case StartsWith(a: Attribute, Literal(v, t)) =>
+ Some(sources.StringStartsWith(a.name, v.toString))
+ case c@EndsWith(a: Attribute, Literal(v, t)) =>
+ Some(CarbonEndsWith(c))
+ case c@Contains(a: Attribute, Literal(v, t)) =>
+ Some(CarbonContainsWith(c))
+ case others => None
+ }
+ }
+
+ def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): Boolean = {
+ val vectorizedReader = {
+ if (sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+ sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+ System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+ } else {
+ CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+ CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+ }
+ }
+ val supportCodegen =
+ sqlContext.conf.wholeStageEnabled && sqlContext.conf.wholeStageMaxNumFields >= cols.size
+ supportCodegen && vectorizedReader.toBoolean &&
+ cols.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
new file mode 100644
index 0000000..715af1d
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.strategy
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand}
+import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand
+import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand}
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+/**
+ * Carbon strategies for ddl commands
+ */
+class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+ plan match {
+ case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(identifier)(sparkSession) =>
+ ExecutedCommandExec(
+ LoadTableCommand(
+ identifier.database,
+ identifier.table.toLowerCase,
+ path,
+ Seq(),
+ Map(),
+ isOverwrite)) :: Nil
+ case alter@AlterTableRenameCommand(oldTableIdentifier, newTableIdentifier, _) =>
+ val dbOption = oldTableIdentifier.database.map(_.toLowerCase)
+ val tableIdentifier = TableIdentifier(oldTableIdentifier.table.toLowerCase(), dbOption)
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(tableIdentifier)(
+ sparkSession)
+ if (isCarbonTable) {
+ val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier)
+ ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil
+ } else {
+ ExecutedCommandExec(alter) :: Nil
+ }
+ case DropTableCommand(identifier, ifNotExists, isView, _)
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .isTablePathExists(identifier)(sparkSession) =>
+ ExecutedCommandExec(
+ CarbonDropTableCommand(ifNotExists, identifier.database,
+ identifier.table.toLowerCase)) :: Nil
+ case ShowLoadsCommand(databaseName, table, limit) =>
+ ExecutedCommandExec(
+ CarbonShowLoadsCommand(
+ databaseName,
+ table.toLowerCase,
+ limit,
+ plan.output)) :: Nil
+ case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
+ _, child: LogicalPlan, overwrite, _) =>
+ ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite.enabled)) :: Nil
+ case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+ CarbonUtil.createDatabaseDirectory(dbName, CarbonEnv.getInstance(sparkSession).storePath)
+ ExecutedCommandExec(createDb) :: Nil
+ case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
+ ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
+ case alterTable@AlterTableCompactionCommand(altertablemodel) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(TableIdentifier(altertablemodel.tableName,
+ altertablemodel.dbName))(sparkSession)
+ if (isCarbonTable) {
+ if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
+ altertablemodel.compactionType.equalsIgnoreCase("major")) {
+ ExecutedCommandExec(alterTable) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on carbon table")
+ }
+ } else {
+ throw new MalformedCarbonCommandException(
+ "Operation not allowed : " + altertablemodel.alterSql)
+ }
+ case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
+ alterTableChangeDataTypeModel.databaseName))(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(dataTypeChange) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
+ alterTableAddColumnsModel.databaseName))(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(addColumn) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
+ alterTableDropColumnModel.databaseName))(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(dropColumn) :: Nil
+ } else {
+ throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
+ }
+ case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(identifier)(sparkSession) && isFormatted =>
+ val resolvedTable =
+ sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
+ val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
+ ExecutedCommandExec(
+ CarbonDescribeFormattedCommand(
+ resultPlan,
+ plan.output,
+ identifier)) :: Nil
+ case ShowPartitionsCommand(t, cols) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(t)(sparkSession)
+ if (isCarbonTable) {
+ ExecutedCommandExec(ShowCarbonPartitionsCommand(t)) :: Nil
+ } else {
+ ExecutedCommandExec(ShowPartitionsCommand(t, cols)) :: Nil
+ }
+ case set@SetCommand(kv) =>
+ ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
+ case reset@ResetCommand =>
+ ExecutedCommandExec(CarbonResetCommand()) :: Nil
+ case org.apache.spark.sql.execution.datasources.CreateTable(tableDesc, mode, None)
+ if tableDesc.provider.get != DDLUtils.HIVE_PROVIDER
+ && tableDesc.provider.get.equals("org.apache.spark.sql.CarbonSource") =>
+ val updatedCatalog =
+ CarbonSource.updateCatalogTableWithCarbonSchema(tableDesc, sparkSession)
+ val cmd =
+ CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore)
+ ExecutedCommandExec(cmd) :: Nil
+ case _ => Nil
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index c9fc46c..f61ab84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,12 +20,12 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.{ProjectExec, SparkSqlParser, SubqueryExec}
-import org.apache.spark.sql.execution.command.ProjectForDeleteCommand
+import org.apache.spark.sql.execution.SparkSqlParser
+import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -167,13 +167,13 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
updatedSelectPlan
}
val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
- val tidSeq = Seq(getDB.getDatabaseName(tid.database, sparkSession))
+ val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
val destinationTable = UnresolvedRelation(table.tableIdentifier, table.alias)
ProjectForUpdate(destinationTable, columns, Seq(finalPlan))
}
def processDeleteRecordsQuery(selectStmt: String, table: UnresolvedRelation): LogicalPlan = {
- val tidSeq = Seq(getDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
+ val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
table.tableIdentifier.table)
var addedTupleId = false
val parsePlan = parser.parsePlan(selectStmt)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 1fe6c83..478b178 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -25,16 +25,13 @@ import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubqu
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.DDLStrategy
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.carbondata.processing.merger.TableMeta
-
/**
* This class will have carbon catalog and refresh the relation from cache if the carbontable in
* carbon catalog is not same as cached carbon relation's carbon table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index f435fa6..1d8bb8a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution.command
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -26,7 +27,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
extends RunnableCommand {
- override val output = command.output
+ override val output: Seq[Attribute] = command.output
override def run(sparkSession: SparkSession): Seq[Row] = {
val dbName = command.databaseName
@@ -50,7 +51,7 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
case class CarbonSetCommand(command: SetCommand)
extends RunnableCommand {
- override val output = command.output
+ override val output: Seq[Attribute] = command.output
override def run(sparkSession: SparkSession): Seq[Row] = {
val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index c6dd905..2ddde7a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command.{ProjectForUpdateCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.mutation.ProjectForUpdateCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.{IntegerType, StringType}