You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/29 10:41:13 UTC
[2/4] carbondata git commit: [CARBONDATA-1815][PreAgg] Add
AtomicRunnableCommand abstraction
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
index e2bfbdc..4e983a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
@@ -41,15 +41,66 @@ object Checker {
}
/**
- * Interface for command that modifies schema
+ * Operation that modifies metadata(schema, table_status, etc)
*/
-trait SchemaProcessCommand {
- def processSchema(sparkSession: SparkSession): Seq[Row]
+trait MetadataProcessOpeation {
+ def processMetadata(sparkSession: SparkSession): Seq[Row]
}
/**
- * Interface for command that need to process data in file system
+ * Operation that process data
*/
-trait DataProcessCommand {
+trait DataProcessOperation {
def processData(sparkSession: SparkSession): Seq[Row]
}
+
+/**
+ * Command that modifies metadata(schema, table_status, etc) only without processing data
+ */
+abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processMetadata(sparkSession)
+ }
+}
+
+/**
+ * Command that process data only without modifying metadata
+ */
+abstract class DataCommand extends RunnableCommand with DataProcessOperation {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+}
+
+/**
+ * Subclass of this command is executed in an atomic fashion, either all or nothing.
+ * Subclass need to process both metadata and data, processMetadata should be undoable
+ * if process data failed.
+ */
+abstract class AtomicRunnableCommand
+ extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processMetadata(sparkSession)
+ try {
+ processData(sparkSession)
+ } catch {
+ case e: Exception =>
+ undoMetadata(sparkSession, e)
+ throw e
+ }
+ }
+
+ /**
+ * Developer should override this function to undo the changes in processMetadata.
+ * @param sparkSession spark session
+ * @param exception exception raised when processing data
+ * @return rows to return to spark
+ */
+ def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+ val msg = s"Got exception $exception when processing data. " +
+ s"But this command does not support undo yet, skipping the undo part."
+ LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
deleted file mode 100644
index db87fc8..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import java.util
-import java.util.concurrent.{Executors, ExecutorService, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableDropPartitionModel, DataProcessCommand, DropPartitionCallableModel, RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.partition.DropPartitionCallable
-
-case class AlterTableDropCarbonPartitionCommand(
- model: AlterTableDropPartitionModel)
- extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
-
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- if (model.partitionId.equals("0")) {
- sys.error(s"Cannot drop default partition! Please use delete statement!")
- }
- processSchema(sparkSession)
- processData(sparkSession)
- Seq.empty
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = model.tableName
- val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val tablePath = relation.carbonTable.getTablePath
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
- LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
- sys.error(s"Alter table failed. table not found: $dbName.$tableName")
- }
- val table = relation.carbonTable
- val partitionInfo = table.getPartitionInfo(tableName)
- if (partitionInfo == null) {
- sys.error(s"Table $tableName is not a partition table.")
- }
- val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
- // keep a copy of partitionIdList before update partitionInfo.
- // will be used in partition data scan
- oldPartitionIds.addAll(partitionIds.asJava)
- val partitionIndex = partitionIds.indexOf(Integer.valueOf(model.partitionId))
- partitionInfo.getPartitionType match {
- case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
- case PartitionType.RANGE =>
- val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
- val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
- rangeInfo.remove(rangeToRemove)
- partitionInfo.setRangeInfo(rangeInfo)
- case PartitionType.LIST =>
- val listInfo = new util.ArrayList(partitionInfo.getListInfo)
- val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
- listInfo.remove(listToRemove)
- partitionInfo.setListInfo(listInfo)
- case PartitionType.RANGE_INTERVAL =>
- sys.error(s"Dropping range interval partition isn't support yet!")
- }
- partitionInfo.dropPartition(partitionIndex)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- // read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
-
- val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
- dbName, tableName, tablePath)
- val tableSchema = wrapperTableInfo.getFactTable
- tableSchema.setPartitionInfo(partitionInfo)
- wrapperTableInfo.setFactTable(tableSchema)
- wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
- val thriftTable =
- schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis)
- carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
- dbName, tableName, tablePath)
- CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
- // update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime()
- // sparkSession.catalog.refreshTable(tableName)
- Seq.empty
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = model.tableName
- var locks = List.empty[ICarbonLock]
- var success = false
- try {
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
- LockUsage.COMPACTION_LOCK,
- LockUsage.DELETE_SEGMENT_LOCK,
- LockUsage.DROP_TABLE_LOCK,
- LockUsage.CLEAN_FILES_LOCK,
- LockUsage.ALTER_PARTITION_LOCK)
- locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
- locksToBeAcquired)(sparkSession)
- val carbonLoadModel = new CarbonLoadModel()
- val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(table.getTableName)
- carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTablePath(table.getTablePath)
- val loadStartTime = CarbonUpdateUtil.readCurrentTime
- carbonLoadModel.setFactTimeStamp(loadStartTime)
- alterTableDropPartition(
- sparkSession.sqlContext,
- model.partitionId,
- carbonLoadModel,
- model.dropWithData,
- oldPartitionIds.asScala.toList
- )
- success = true
- } catch {
- case e: Exception =>
- sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
- success = false
- } finally {
- CacheProvider.getInstance().dropAllCache()
- AlterTableUtil.releaseLocks(locks)
- LOGGER.info("Locks released after alter table drop partition action.")
- LOGGER.audit("Locks released after alter table drop partition action.")
- }
- LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
- Seq.empty
- }
-
- private def alterTableDropPartition(sqlContext: SQLContext,
- partitionId: String,
- carbonLoadModel: CarbonLoadModel,
- dropWithData: Boolean,
- oldPartitionIds: List[Int]): Unit = {
- LOGGER.audit(s"Drop partition request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- try {
- startDropThreads(
- sqlContext,
- carbonLoadModel,
- partitionId,
- dropWithData,
- oldPartitionIds)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
- throw e
- }
- }
-
- private def startDropThreads(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- partitionId: String,
- dropWithData: Boolean,
- oldPartitionIds: List[Int]): Unit = {
- val numberOfCores = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
- CarbonCommonConstants.DEFAULT_NUMBER_CORES)
- val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
- try {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
- val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
- val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
- var i = 0
- for (segmentId: String <- validSegments) {
- threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
- segmentId, partitionId, dropWithData, oldPartitionIds)
- threadArray(i).start()
- i += 1
- }
- for (thread <- threadArray) {
- thread.join()
- }
- val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
- carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
- refresher.refreshSegments(validSegments.asJava)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
- } finally {
- executor.shutdown()
- try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
- s" ${ e.getMessage }")
- }
- }
- }
-}
-
-case class dropPartitionThread(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- executor: ExecutorService,
- segmentId: String,
- partitionId: String,
- dropWithData: Boolean,
- oldPartitionIds: List[Int]) extends Thread {
-
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-
- override def run(): Unit = {
- try {
- executeDroppingPartition(sqlContext, carbonLoadModel, executor,
- segmentId, partitionId, dropWithData, oldPartitionIds)
- } catch {
- case e: Exception =>
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
- }
- }
-
- private def executeDroppingPartition(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- executor: ExecutorService,
- segmentId: String,
- partitionId: String,
- dropWithData: Boolean,
- oldPartitionIds: List[Int]): Unit = {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val model = new DropPartitionCallableModel(carbonLoadModel,
- segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
- val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
- try {
- future.get
- } catch {
- case e: Exception =>
- LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
- throw e
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
deleted file mode 100644
index 21871f3..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent.{Executors, ExecutorService, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableSplitPartitionModel, DataProcessCommand, RunnableCommand, SchemaProcessCommand, SplitPartitionCallableModel}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.partition.SplitPartitionCallable
-
-/**
- * Command for Alter Table Add & Split partition
- * Add is a special case of Splitting the default partition (part0)
- */
-case class AlterTableSplitCarbonPartitionCommand(
- splitPartitionModel: AlterTableSplitPartitionModel)
- extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
-
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
-
- // TODO will add rollback function in case of process data failure
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- processData(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
- val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tableName = splitPartitionModel.tableName
- val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- val tablePath = relation.carbonTable.getTablePath
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
- if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
- LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
- sys.error(s"Alter table failed. table not found: $dbName.$tableName")
- }
- val table = relation.carbonTable
- val partitionInfo = table.getPartitionInfo(tableName)
- val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
- // keep a copy of partitionIdList before update partitionInfo.
- // will be used in partition data scan
- oldPartitionIds.addAll(partitionIds.asJava)
-
- if (partitionInfo == null) {
- sys.error(s"Table $tableName is not a partition table.")
- }
- if (partitionInfo.getPartitionType == PartitionType.HASH) {
- sys.error(s"Hash partition table cannot be added or split!")
- }
-
- updatePartitionInfo(partitionInfo, partitionIds)
-
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- // read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
- dbName, tableName, tablePath)
- val tableSchema = wrapperTableInfo.getFactTable
- tableSchema.setPartitionInfo(partitionInfo)
- wrapperTableInfo.setFactTable(tableSchema)
- wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
- val thriftTable =
- schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
- dbName, tableName, tablePath)
- CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
- // update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime()
- sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName)))
- Seq.empty
- }
-
- private def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIds: List[Int]): Unit = {
- val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
- val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
- PartitionUtils.updatePartitionInfo(
- partitionInfo,
- partitionIds,
- splitPartitionModel.partitionId.toInt,
- splitPartitionModel.splitInfo,
- timestampFormatter,
- dateFormatter)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
- val tableName = splitPartitionModel.tableName
- var locks = List.empty[ICarbonLock]
- var success = false
- try {
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
- LockUsage.COMPACTION_LOCK,
- LockUsage.DELETE_SEGMENT_LOCK,
- LockUsage.DROP_TABLE_LOCK,
- LockUsage.CLEAN_FILES_LOCK,
- LockUsage.ALTER_PARTITION_LOCK)
- locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
- locksToBeAcquired)(sparkSession)
- val carbonLoadModel = new CarbonLoadModel()
- val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val tablePath = table.getTablePath
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(table.getTableName)
- carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTablePath(tablePath)
- val loadStartTime = CarbonUpdateUtil.readCurrentTime
- carbonLoadModel.setFactTimeStamp(loadStartTime)
- alterTableSplitPartition(
- sparkSession.sqlContext,
- splitPartitionModel.partitionId.toInt.toString,
- carbonLoadModel,
- oldPartitionIds.asScala.toList
- )
- success = true
- } catch {
- case e: Exception =>
- success = false
- sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
- } finally {
- AlterTableUtil.releaseLocks(locks)
- CacheProvider.getInstance().dropAllCache()
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- LOGGER.info("Locks released after alter table add/split partition action.")
- LOGGER.audit("Locks released after alter table add/split partition action.")
- if (success) {
- LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
- }
- }
- Seq.empty
- }
-
- private def alterTableSplitPartition(
- sqlContext: SQLContext,
- partitionId: String,
- carbonLoadModel: CarbonLoadModel,
- oldPartitionIdList: List[Int]
- ): Unit = {
- LOGGER.audit(s"Add partition request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- try {
- startSplitThreads(sqlContext,
- carbonLoadModel,
- partitionId,
- oldPartitionIdList)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
- throw e
- }
- }
-
- private def startSplitThreads(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- partitionId: String,
- oldPartitionIdList: List[Int]): Unit = {
- val numberOfCores = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
- CarbonCommonConstants.DEFAULT_NUMBER_CORES)
- val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
- try {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
- val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
- val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
- var i = 0
- validSegments.foreach { segmentId =>
- threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
- segmentId, partitionId, oldPartitionIdList)
- threadArray(i).start()
- i += 1
- }
- threadArray.foreach {
- thread => thread.join()
- }
- val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
- carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
- refresher.refreshSegments(validSegments.asJava)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
- throw e
- } finally {
- executor.shutdown()
- try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
- s" ${ e.getMessage }")
- }
- }
- }
-}
-
-case class SplitThread(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- executor: ExecutorService,
- segmentId: String,
- partitionId: String,
- oldPartitionIdList: List[Int]) extends Thread {
-
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-
- override def run(): Unit = {
- var triggeredSplitPartitionStatus = false
- var exception: Exception = null
- try {
- executePartitionSplit(sqlContext,
- carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
- triggeredSplitPartitionStatus = true
- } catch {
- case e: Exception =>
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
- exception = e
- }
- if (!triggeredSplitPartitionStatus) {
- throw new Exception("Exception in split partition " + exception.getMessage)
- }
- }
-
- private def executePartitionSplit( sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- executor: ExecutorService,
- segment: String,
- partitionId: String,
- oldPartitionIdList: List[Int]): Unit = {
- val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
- CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
- )
- scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
- sqlContext, carbonLoadModel, oldPartitionIdList)
- try {
- futureList.asScala.foreach { future =>
- future.get
- }
- } catch {
- case e: Exception =>
- LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
- throw e
- }
- }
-
- private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
- executor: ExecutorService,
- segmentId: String,
- partitionId: String,
- sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- oldPartitionIdList: List[Int]): Unit = {
-
- val splitModel = SplitPartitionCallableModel(carbonLoadModel,
- segmentId,
- partitionId,
- oldPartitionIdList,
- sqlContext)
-
- val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
- futureList.add(future)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
new file mode 100644
index 0000000..69aa91a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.DropPartitionCallable
+
+case class CarbonAlterTableDropPartitionCommand(
+ model: AlterTableDropPartitionModel)
+ extends AtomicRunnableCommand {
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ if (model.partitionId.equals("0")) {
+ sys.error(s"Cannot drop default partition! Please use delete statement!")
+ }
+ val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+ val tableName = model.tableName
+ val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ val tablePath = relation.carbonTable.getTablePath
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
+ LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+ sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+ }
+ val table = relation.carbonTable
+ val partitionInfo = table.getPartitionInfo(tableName)
+ if (partitionInfo == null) {
+ sys.error(s"Table $tableName is not a partition table.")
+ }
+ val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+ // keep a copy of partitionIdList before update partitionInfo.
+ // will be used in partition data scan
+ oldPartitionIds.addAll(partitionIds.asJava)
+ val partitionIndex = partitionIds.indexOf(Integer.valueOf(model.partitionId))
+ partitionInfo.getPartitionType match {
+ case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
+ case PartitionType.RANGE =>
+ val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
+ val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
+ rangeInfo.remove(rangeToRemove)
+ partitionInfo.setRangeInfo(rangeInfo)
+ case PartitionType.LIST =>
+ val listInfo = new util.ArrayList(partitionInfo.getListInfo)
+ val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
+ listInfo.remove(listToRemove)
+ partitionInfo.setListInfo(listInfo)
+ case PartitionType.RANGE_INTERVAL =>
+ sys.error(s"Dropping range interval partition isn't support yet!")
+ }
+ partitionInfo.dropPartition(partitionIndex)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+ // read TableInfo
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+ dbName, tableName, tablePath)
+ val tableSchema = wrapperTableInfo.getFactTable
+ tableSchema.setPartitionInfo(partitionInfo)
+ wrapperTableInfo.setFactTable(tableSchema)
+ wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ val thriftTable =
+ schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+ dbName, tableName, tablePath)
+ CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+ // update the schema modified time
+ carbonMetaStore.updateAndTouchSchemasUpdatedTime()
+ // sparkSession.catalog.refreshTable(tableName)
+ Seq.empty
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+ val tableName = model.tableName
+ var locks = List.empty[ICarbonLock]
+ var success = false
+ try {
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.DROP_TABLE_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.ALTER_PARTITION_LOCK)
+ locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+ locksToBeAcquired)(sparkSession)
+ val carbonLoadModel = new CarbonLoadModel()
+ val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
+ carbonLoadModel.setTablePath(table.getTablePath)
+ val loadStartTime = CarbonUpdateUtil.readCurrentTime
+ carbonLoadModel.setFactTimeStamp(loadStartTime)
+ alterTableDropPartition(
+ sparkSession.sqlContext,
+ model.partitionId,
+ carbonLoadModel,
+ model.dropWithData,
+ oldPartitionIds.asScala.toList
+ )
+ success = true
+ } catch {
+ case e: Exception =>
+ sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
+ success = false
+ } finally {
+ CacheProvider.getInstance().dropAllCache()
+ AlterTableUtil.releaseLocks(locks)
+ LOGGER.info("Locks released after alter table drop partition action.")
+ LOGGER.audit("Locks released after alter table drop partition action.")
+ }
+ LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
+ Seq.empty
+ }
+
+ private def alterTableDropPartition(sqlContext: SQLContext,
+ partitionId: String,
+ carbonLoadModel: CarbonLoadModel,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ LOGGER.audit(s"Drop partition request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ try {
+ startDropThreads(
+ sqlContext,
+ carbonLoadModel,
+ partitionId,
+ dropWithData,
+ oldPartitionIds)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ private def startDropThreads(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ val numberOfCores = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+ CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+ val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+ try {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+ val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+ val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
+ var i = 0
+ for (segmentId: String <- validSegments) {
+ threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, dropWithData, oldPartitionIds)
+ threadArray(i).start()
+ i += 1
+ }
+ for (thread <- threadArray) {
+ thread.join()
+ }
+ val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
+ carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ refresher.refreshSegments(validSegments.asJava)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
+ } finally {
+ executor.shutdown()
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
+ s" ${ e.getMessage }")
+ }
+ }
+ }
+}
+
+case class dropPartitionThread(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]) extends Thread {
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def run(): Unit = {
+ try {
+ executeDroppingPartition(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, dropWithData, oldPartitionIds)
+ } catch {
+ case e: Exception =>
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+ }
+ }
+
+ private def executeDroppingPartition(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val model = new DropPartitionCallableModel(carbonLoadModel,
+ segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
+ val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
+ try {
+ future.get
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+ throw e
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
new file mode 100644
index 0000000..338ec5a
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Executors, ExecutorService, Future}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.cache.CacheProvider
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.partition.SplitPartitionCallable
+
+/**
+ * Command for Alter Table Add & Split partition
+ * Add is a special case of Splitting the default partition (part0)
+ */
+case class CarbonAlterTableSplitPartitionCommand(
+ splitPartitionModel: AlterTableSplitPartitionModel)
+ extends AtomicRunnableCommand {
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+ val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val tableName = splitPartitionModel.tableName
+ val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ val tablePath = relation.carbonTable.getTablePath
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+ if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
+ LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+ sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+ }
+ val table = relation.carbonTable
+ val partitionInfo = table.getPartitionInfo(tableName)
+ val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+ // keep a copy of partitionIdList before update partitionInfo.
+ // will be used in partition data scan
+ oldPartitionIds.addAll(partitionIds.asJava)
+
+ if (partitionInfo == null) {
+ sys.error(s"Table $tableName is not a partition table.")
+ }
+ if (partitionInfo.getPartitionType == PartitionType.HASH) {
+ sys.error(s"Hash partition table cannot be added or split!")
+ }
+
+ updatePartitionInfo(partitionInfo, partitionIds)
+
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+ // read TableInfo
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+ dbName, tableName, tablePath)
+ val tableSchema = wrapperTableInfo.getFactTable
+ tableSchema.setPartitionInfo(partitionInfo)
+ wrapperTableInfo.setFactTable(tableSchema)
+ wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ val thriftTable =
+ schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+ dbName, tableName, tablePath)
+ CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+ // update the schema modified time
+ carbonMetaStore.updateAndTouchSchemasUpdatedTime()
+ sparkSession.sessionState.catalog.refreshTable(TableIdentifier(tableName, Option(dbName)))
+ Seq.empty
+ }
+
+ private def updatePartitionInfo(partitionInfo: PartitionInfo, partitionIds: List[Int]): Unit = {
+ val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+ val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+ PartitionUtils.updatePartitionInfo(
+ partitionInfo,
+ partitionIds,
+ splitPartitionModel.partitionId.toInt,
+ splitPartitionModel.splitInfo,
+ timestampFormatter,
+ dateFormatter)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
+ val tableName = splitPartitionModel.tableName
+ var locks = List.empty[ICarbonLock]
+ var success = false
+ try {
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.DROP_TABLE_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.ALTER_PARTITION_LOCK)
+ locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+ locksToBeAcquired)(sparkSession)
+ val carbonLoadModel = new CarbonLoadModel()
+ val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+ val tablePath = table.getTablePath
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
+ carbonLoadModel.setTablePath(tablePath)
+ val loadStartTime = CarbonUpdateUtil.readCurrentTime
+ carbonLoadModel.setFactTimeStamp(loadStartTime)
+ alterTableSplitPartition(
+ sparkSession.sqlContext,
+ splitPartitionModel.partitionId.toInt.toString,
+ carbonLoadModel,
+ oldPartitionIds.asScala.toList
+ )
+ success = true
+ } catch {
+ case e: Exception =>
+ success = false
+ sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
+ } finally {
+ AlterTableUtil.releaseLocks(locks)
+ CacheProvider.getInstance().dropAllCache()
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.info("Locks released after alter table add/split partition action.")
+ LOGGER.audit("Locks released after alter table add/split partition action.")
+ if (success) {
+ LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+ }
+ }
+ Seq.empty
+ }
+
+ private def alterTableSplitPartition(
+ sqlContext: SQLContext,
+ partitionId: String,
+ carbonLoadModel: CarbonLoadModel,
+ oldPartitionIdList: List[Int]
+ ): Unit = {
+ LOGGER.audit(s"Add partition request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ try {
+ startSplitThreads(sqlContext,
+ carbonLoadModel,
+ partitionId,
+ oldPartitionIdList)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ private def startSplitThreads(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ partitionId: String,
+ oldPartitionIdList: List[Int]): Unit = {
+ val numberOfCores = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+ CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+ val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+ try {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+ val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+ val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
+ var i = 0
+ validSegments.foreach { segmentId =>
+ threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, oldPartitionIdList)
+ threadArray(i).start()
+ i += 1
+ }
+ threadArray.foreach {
+ thread => thread.join()
+ }
+ val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
+ carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
+ val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+ refresher.refreshSegments(validSegments.asJava)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
+ throw e
+ } finally {
+ executor.shutdown()
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
+ s" ${ e.getMessage }")
+ }
+ }
+ }
+}
+
+case class SplitThread(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ oldPartitionIdList: List[Int]) extends Thread {
+
+ private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ override def run(): Unit = {
+ var triggeredSplitPartitionStatus = false
+ var exception: Exception = null
+ try {
+ executePartitionSplit(sqlContext,
+ carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
+ triggeredSplitPartitionStatus = true
+ } catch {
+ case e: Exception =>
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
+ exception = e
+ }
+ if (!triggeredSplitPartitionStatus) {
+ throw new Exception("Exception in split partition " + exception.getMessage)
+ }
+ }
+
+ private def executePartitionSplit( sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segment: String,
+ partitionId: String,
+ oldPartitionIdList: List[Int]): Unit = {
+ val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+ CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
+ )
+ scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
+ sqlContext, carbonLoadModel, oldPartitionIdList)
+ try {
+ futureList.asScala.foreach { future =>
+ future.get
+ }
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ oldPartitionIdList: List[Int]): Unit = {
+
+ val splitModel = SplitPartitionCallableModel(carbonLoadModel,
+ segmentId,
+ partitionId,
+ oldPartitionIdList,
+ sqlContext)
+
+ val future: Future[Void] = executor.submit(new SplitPartitionCallable(splitModel))
+ futureList.add(future)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
new file mode 100644
index 0000000..ed50835
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Command for show table partitions Command
+ */
+private[sql] case class CarbonShowCarbonPartitionsCommand(
+ tableIdentifier: TableIdentifier)
+ extends MetadataCommand {
+
+ override val output: Seq[Attribute] = CommonUtil.partitionInfoOutput
+
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val carbonTable = relation.carbonTable
+ val tableName = carbonTable.getTableName
+ val partitionInfo = carbonTable.getPartitionInfo(
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
+ if (partitionInfo == null) {
+ throw new AnalysisException(
+ s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+ }
+ val partitionType = partitionInfo.getPartitionType
+ val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
+ val LOGGER = LogServiceFactory.getLogService(CarbonShowCarbonPartitionsCommand.getClass.getName)
+ LOGGER.info("partition column name:" + columnName)
+ CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
deleted file mode 100644
index 903e93b..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/ShowCarbonPartitionsCommand.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.partition
-
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.command.{RunnableCommand, SchemaProcessCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Command for show table partitions Command
- */
-private[sql] case class ShowCarbonPartitionsCommand(
- tableIdentifier: TableIdentifier)
- extends RunnableCommand with SchemaProcessCommand {
-
- override val output: Seq[Attribute] = CommonUtil.partitionInfoOutput
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
- val carbonTable = relation.carbonTable
- val tableName = carbonTable.getTableName
- val partitionInfo = carbonTable.getPartitionInfo(
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
- if (partitionInfo == null) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
- }
- val partitionType = partitionInfo.getPartitionType
- val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
- val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
- LOGGER.info("partition column name:" + columnName)
- CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 3854f76..ddc2bde 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -14,101 +14,116 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.mutable
-import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
/**
* Below command class will be used to create pre-aggregate table
* and updating the parent table about the child table information
- * Failure case:
+ * It will be either success or nothing happen in case of failure:
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param queryString
*/
case class CreatePreAggregateTableCommand(
dataMapName: String,
parentTableIdentifier: TableIdentifier,
dmClassName: String,
- dmproperties: Map[String, String],
+ dmProperties: Map[String, String],
queryString: String)
- extends RunnableCommand with SchemaProcessCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
+ extends AtomicRunnableCommand {
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
- dmproperties.foreach(t => tableProperties.put(t._1, t._2))
- // Create the aggregation table name with parent table name prefix
- val tableIdentifier = TableIdentifier(
- parentTableIdentifier.table +"_" + dataMapName, parentTableIdentifier.database)
+ dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
// prepare table model of the collected tokens
- val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+ ifNotExistPresent = false,
new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
Seq(),
tableProperties,
None,
- false,
+ isAlterFlow = false,
None)
- // getting the parent table
val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
- // getting the table name
- val parentTableName = parentTable.getTableName
- // getting the db name of parent table
- val parentDbName = parentTable.getDatabaseName
-
- assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
+ assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
tableModel.parentTable = Some(parentTable)
tableModel.dataMapRelation = Some(fieldRelationMap)
CarbonCreateTableCommand(tableModel).run(sparkSession)
- try {
- val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
- val tableInfo = table.getTableInfo
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema(dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
- tableInfo.getDatabaseName, queryString, "AGGREGATION")
- dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
- // updating the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(
- s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- CarbonDropTableCommand(
- ifExistsSet = true,
- Some( tableModel.databaseName ), tableModel.tableName ).run(sparkSession)
- throw e
+ val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+ val tableInfo = table.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable.buildChildSchema(
+ dataMapName,
+ CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+ tableInfo.getDatabaseName,
+ queryString,
+ "AGGREGATION")
+ dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+
+ // updating the parent table about child table
+ PreAggregateUtil.updateMainTable(
+ CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
+ parentTableIdentifier.table,
+ childSchema,
+ sparkSession)
+
+ Seq.empty
+ }
+
+ override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+ // drop child table and undo the change in table info of main table
+ CarbonDropTableCommand(
+ ifExistsSet = true,
+ tableIdentifier.database,
+ tableIdentifier.table
+ ).run(sparkSession)
+
+ // TODO: undo the change in table info of main table
+
+ Seq.empty
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ // load child table if parent table has existing segments
+ val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession)
+ val tableName = tableIdentifier.table
+ val metastorePath = CarbonEnv.getMetadataPath(Some(dbName), tableName)(sparkSession)
+ val loadAvailable = SegmentStatusManager.readLoadMetadata(metastorePath).nonEmpty
+ if (loadAvailable) {
+ sparkSession.sql(s"insert into $dbName.$tableName $queryString")
}
Seq.empty
}
+
+ // Create the aggregation table name with parent table name prefix
+ private lazy val tableIdentifier =
+ TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, parentTableIdentifier.database)
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index f64deec..7bc120b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.JavaConverters._
import org.apache.spark.sql.CarbonSession
-import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1ee8dd6..a2809ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -459,10 +459,6 @@ object PreAggregateUtil {
}
}
- def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).nonEmpty
- }
-
/**
* Below method will be used to update logical plan
* this is required for creating pre aggregate tables,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala
deleted file mode 100644
index 26fe36b..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.format.TableInfo
-
-private[sql] case class AlterTableSetCommand(val tableIdentifier: TableIdentifier,
- val properties: Map[String, String],
- val isView: Boolean)
- extends RunnableCommand with SchemaProcessCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- AlterTableUtil.modifyTableComment(tableIdentifier, properties, Nil,
- true)(sparkSession)
- Seq.empty
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala
deleted file mode 100644
index 10367a3..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.schema
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.util.AlterTableUtil
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.format.TableInfo
-
-private[sql] case class AlterTableUnsetCommand(val tableIdentifier: TableIdentifier,
- val propKeys: Seq[String],
- val ifExists: Boolean,
- val isView: Boolean)
- extends RunnableCommand with SchemaProcessCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- AlterTableUtil.modifyTableComment(tableIdentifier, Map.empty[String, String],
- propKeys, false)(sparkSession)
- Seq.empty
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 0a720da..c8f998b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.command.schema
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, MetadataCommand}
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -36,9 +34,9 @@ import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropCo
private[sql] case class CarbonAlterTableAddColumnCommand(
alterTableAddColumnsModel: AlterTableAddColumnsModel)
- extends RunnableCommand {
+ extends MetadataCommand {
- override def run(sparkSession: SparkSession): Seq[Row] = {
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = alterTableAddColumnsModel.tableName
val dbName = alterTableAddColumnsModel.databaseName
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 6e2c83d..dcee7c3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.command.schema
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, MetadataCommand}
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -34,9 +33,9 @@ import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
private[sql] case class CarbonAlterTableDataTypeChangeCommand(
alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
- extends RunnableCommand {
+ extends MetadataCommand {
- override def run(sparkSession: SparkSession): Seq[Row] = {
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = alterTableDataTypeChangeModel.tableName
val dbName = alterTableDataTypeChangeModel.databaseName
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index bcc059f..40e1e15 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -21,8 +21,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonSessionState
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -36,9 +36,9 @@ import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
private[sql] case class CarbonAlterTableDropColumnCommand(
alterTableDropColumnModel: AlterTableDropColumnModel)
- extends RunnableCommand {
+ extends MetadataCommand {
- override def run(sparkSession: SparkSession): Seq[Row] = {
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = alterTableDropColumnModel.tableName
val dbName = alterTableDropColumnModel.databaseName
@@ -137,9 +137,9 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
// event will be fired before dropping the columns
val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent =
AlterTableDropColumnPostEvent(
- carbonTable,
- alterTableDropColumnModel,
- sparkSession)
+ carbonTable,
+ alterTableDropColumnModel,
+ sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext)
LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/90fb6baf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 594b92a..593a675 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.schema
import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand}
+import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
import org.apache.spark.util.AlterTableUtil
@@ -38,9 +38,9 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
private[sql] case class CarbonAlterTableRenameCommand(
alterTableRenameModel: AlterTableRenameModel)
- extends RunnableCommand {
+ extends MetadataCommand {
- override def run(sparkSession: SparkSession): Seq[Row] = {
+ override def processMetadata(sparkSession: SparkSession): Seq[Nothing] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
val newTableIdentifier = alterTableRenameModel.newTableIdentifier