You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/09/27 06:34:07 UTC
[4/6] carbondata git commit: [CARBONDATA-1151] Refactor all carbon
command to separate file in spark2 integration
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
deleted file mode 100644
index 7ed280e..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ /dev/null
@@ -1,1315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command
-
-import java.text.SimpleDateFormat
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
-import scala.language.implicitConversions
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
-import org.apache.spark.util.{AlterTableUtil, CausedBy, FileUtils, PartitionUtils}
-import org.codehaus.jackson.map.ObjectMapper
-
-import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.exception.InvalidConfigurationException
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.format
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.newflow.exception.NoRetryException
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
-
-object Checker {
- def validateTableExists(
- dbName: Option[String],
- tableName: String,
- session: SparkSession): Unit = {
- val identifier = TableIdentifier(tableName, dbName)
- if (!CarbonEnv.getInstance(session).carbonMetastore.tableExists(identifier)(session)) {
- val err = s"table $dbName.$tableName not found"
- LogServiceFactory.getLogService(this.getClass.getName).error(err)
- throw new IllegalArgumentException(err)
- }
- }
-}
-
-/**
- * Interface for command that modifies schema
- */
-trait SchemaProcessCommand {
- def processSchema(sparkSession: SparkSession): Seq[Row]
-}
-
-/**
- * Interface for command that need to process data in file system
- */
-trait DataProcessCommand {
- def processData(sparkSession: SparkSession): Seq[Row]
-}
-
-/**
- * Command for show table partitions Command
- *
- * @param tableIdentifier
- */
-private[sql] case class ShowCarbonPartitionsCommand(
- tableIdentifier: TableIdentifier) extends RunnableCommand with SchemaProcessCommand {
-
- override val output = CommonUtil.partitionInfoOutput
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).
- asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- val tableName = carbonTable.getFactTableName
- val partitionInfo = carbonTable.getPartitionInfo(
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
- if (partitionInfo == null) {
- throw new AnalysisException(
- s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
- }
- val partitionType = partitionInfo.getPartitionType
- val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName
- val LOGGER = LogServiceFactory.getLogService(ShowCarbonPartitionsCommand.getClass.getName)
- LOGGER.info("partition column name:" + columnName)
- CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
- }
-}
-
-/**
- * Command for the compaction in alter table command
- *
- * @param alterTableModel
- */
-case class AlterTableCompaction(alterTableModel: AlterTableModel) extends RunnableCommand
- with DataProcessCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val tableName = alterTableModel.tableName.toLowerCase
- val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
- val relation =
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(databaseName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $databaseName.$tableName does not exist")
- }
- if (null == relation.tableMeta.carbonTable) {
- LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
- sys.error(s"alter table failed. table not found: $databaseName.$tableName")
- }
-
- val carbonLoadModel = new CarbonLoadModel()
-
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
-
- var storeLocation = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir")
- )
- storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
- try {
- CarbonDataRDDFactory
- .alterTableForCompaction(sparkSession.sqlContext,
- alterTableModel,
- carbonLoadModel,
- storeLocation
- )
- } catch {
- case e: Exception =>
- if (null != e.getMessage) {
- sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
- } else {
- sys.error("Exception in compaction. Please check logs for more info.")
- }
- }
- Seq.empty
- }
-}
-
-/**
- * Command for Alter Table Add & Split partition
- * Add is a special case of Splitting the default partition (part0)
- * @param splitPartitionModel
- */
-case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitPartitionModel)
- extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val tableName = splitPartitionModel.tableName
- val splitInfo = splitPartitionModel.splitInfo
- val partitionId = splitPartitionModel.partitionId.toInt
- var partitionInfo: PartitionInfo = null
- var carbonMetaStore: CarbonMetaStore = null
- var relation: CarbonRelation = null
- var dbName: String = null
- var storePath: String = null
- var table: CarbonTable = null
- var carbonTableIdentifier: CarbonTableIdentifier = null
- val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
- val timestampFormatter = new SimpleDateFormat(CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
- val dateFormatter = new SimpleDateFormat(CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
- LockUsage.COMPACTION_LOCK,
- LockUsage.DELETE_SEGMENT_LOCK,
- LockUsage.DROP_TABLE_LOCK,
- LockUsage.CLEAN_FILES_LOCK,
- LockUsage.ALTER_PARTITION_LOCK)
-
- // TODO will add rollback function incase process data failure
- def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- processData(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- dbName = splitPartitionModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- storePath = relation.tableMeta.storePath
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
- if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
- LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
- sys.error(s"Alter table failed. table not found: $dbName.$tableName")
- }
- table = relation.tableMeta.carbonTable
- partitionInfo = table.getPartitionInfo(tableName)
- val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
- // keep a copy of partitionIdList before update partitionInfo.
- // will be used in partition data scan
- oldPartitionIds.addAll(partitionIds.asJava)
-
- if (partitionInfo == null) {
- sys.error(s"Table $tableName is not a partition table.")
- }
- if (partitionInfo.getPartitionType == PartitionType.HASH) {
- sys.error(s"Hash partition table cannot be added or split!")
- }
- PartitionUtils.updatePartitionInfo(partitionInfo, partitionIds, partitionId,
- splitInfo, timestampFormatter, dateFormatter)
-
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- // read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
- dbName, tableName, storePath)
- val tableSchema = wrapperTableInfo.getFactTable
- tableSchema.setPartitionInfo(partitionInfo)
- wrapperTableInfo.setFactTable(tableSchema)
- wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
- val thriftTable =
- schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
- dbName, tableName, storePath)
- CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
- // update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
- sparkSession.catalog.refreshTable(tableName)
- Seq.empty
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- var locks = List.empty[ICarbonLock]
- var success = false
- try {
- locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
- locksToBeAcquired)(sparkSession)
- val carbonLoadModel = new CarbonLoadModel()
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(storePath)
- val loadStartTime = CarbonUpdateUtil.readCurrentTime
- carbonLoadModel.setFactTimeStamp(loadStartTime)
- CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext,
- partitionId.toString,
- carbonLoadModel,
- oldPartitionIds.asScala.toList
- )
- success = true
- } catch {
- case e: Exception =>
- success = false
- sys.error(s"Add/Split Partition failed. Please check logs for more info. ${ e.getMessage }")
- } finally {
- AlterTableUtil.releaseLocks(locks)
- CacheProvider.getInstance().dropAllCache()
- LOGGER.info("Locks released after alter table add/split partition action.")
- LOGGER.audit("Locks released after alter table add/split partition action.")
- if (success) {
- LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
- }
- }
- Seq.empty
- }
-}
-
-case class AlterTableDropPartition(alterTableDropPartitionModel: AlterTableDropPartitionModel)
- extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val tableName = alterTableDropPartitionModel.tableName
- var dbName: String = null
- val partitionId = alterTableDropPartitionModel.partitionId
- val dropWithData = alterTableDropPartitionModel.dropWithData
- if (partitionId == 0 ) {
- sys.error(s"Cannot drop default partition! Please use delete statement!")
- }
- var partitionInfo: PartitionInfo = null
- var carbonMetaStore: CarbonMetaStore = null
- var relation: CarbonRelation = null
- var storePath: String = null
- var table: CarbonTable = null
- var carbonTableIdentifier: CarbonTableIdentifier = null
- val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
- LockUsage.COMPACTION_LOCK,
- LockUsage.DELETE_SEGMENT_LOCK,
- LockUsage.DROP_TABLE_LOCK,
- LockUsage.CLEAN_FILES_LOCK,
- LockUsage.ALTER_PARTITION_LOCK)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- processData(sparkSession)
- Seq.empty
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- dbName = alterTableDropPartitionModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
- storePath = relation.tableMeta.storePath
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
- LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
- sys.error(s"Alter table failed. table not found: $dbName.$tableName")
- }
- table = relation.tableMeta.carbonTable
- partitionInfo = table.getPartitionInfo(tableName)
- if (partitionInfo == null) {
- sys.error(s"Table $tableName is not a partition table.")
- }
- val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
- // keep a copy of partitionIdList before update partitionInfo.
- // will be used in partition data scan
- oldPartitionIds.addAll(partitionIds.asJava)
- val partitionIndex = partitionIds.indexOf(Integer.valueOf(partitionId))
- partitionInfo.getPartitionType match {
- case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
- case PartitionType.RANGE =>
- val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
- val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
- rangeInfo.remove(rangeToRemove)
- partitionInfo.setRangeInfo(rangeInfo)
- case PartitionType.LIST =>
- val listInfo = new util.ArrayList(partitionInfo.getListInfo)
- val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
- listInfo.remove(listToRemove)
- partitionInfo.setListInfo(listInfo)
- case PartitionType.RANGE_INTERVAL =>
- sys.error(s"Dropping range interval partition isn't support yet!")
- }
- partitionInfo.dropPartition(partitionIndex)
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
- // read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
-
- val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
- dbName, tableName, storePath)
- val tableSchema = wrapperTableInfo.getFactTable
- tableSchema.setPartitionInfo(partitionInfo)
- wrapperTableInfo.setFactTable(tableSchema)
- wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
- val thriftTable =
- schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis)
- carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
- dbName, tableName, storePath)
- CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
- // update the schema modified time
- carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
- // sparkSession.catalog.refreshTable(tableName)
- Seq.empty
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- var locks = List.empty[ICarbonLock]
- var success = false
- try {
- locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
- locksToBeAcquired)(sparkSession)
- val carbonLoadModel = new CarbonLoadModel()
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(storePath)
- val loadStartTime = CarbonUpdateUtil.readCurrentTime
- carbonLoadModel.setFactTimeStamp(loadStartTime)
- CarbonDataRDDFactory.alterTableDropPartition(sparkSession.sqlContext,
- partitionId,
- carbonLoadModel,
- dropWithData,
- oldPartitionIds.asScala.toList
- )
- success = true
- } catch {
- case e: Exception =>
- sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
- success = false
- } finally {
- CacheProvider.getInstance().dropAllCache()
- AlterTableUtil.releaseLocks(locks)
- LOGGER.info("Locks released after alter table drop partition action.")
- LOGGER.audit("Locks released after alter table drop partition action.")
- }
- LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
- Seq.empty
- }
-}
-
- case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand
- with SchemaProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
-
- def setV(ref: Any, name: String, value: Any): Unit = {
- ref.getClass.getFields.find(_.getName == name).get
- .set(ref, value.asInstanceOf[AnyRef])
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
-
- val tableInfo: TableInfo = TableNewProcessor(cm)
-
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(s"Passing invalid SORT_SCOPE '$sortScope'," +
- s" valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
-
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
-
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
- }
-
- LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
- }
- Seq.empty
- }
-}
-
-case class DeleteLoadsById(
- loadids: Seq[String],
- databaseNameOp: Option[String],
- tableName: String) extends RunnableCommand with DataProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
- CarbonStore.deleteLoadById(
- loadids,
- getDB.getDatabaseName(databaseNameOp, sparkSession),
- tableName,
- carbonTable
- )
- Seq.empty
- }
-}
-
-case class DeleteLoadsByLoadDate(
- databaseNameOp: Option[String],
- tableName: String,
- dateField: String,
- loadDate: String) extends RunnableCommand with DataProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
- CarbonStore.deleteLoadByDate(
- loadDate,
- getDB.getDatabaseName(databaseNameOp, sparkSession),
- tableName,
- carbonTable
- )
- Seq.empty
- }
-}
-
-object LoadTable {
-
- def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- noDictDimension: Array[CarbonDimension]): Unit = {
- val sparkSession = sqlContext.sparkSession
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
- model.table)
-
- val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
- // read TableInfo
- val tableInfo: format.TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-
- // modify TableInfo
- val columns = tableInfo.getFact_table.getTable_columns
- for (i <- 0 until columns.size) {
- if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
- columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
- }
- }
- val entry = tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- entry.setTime_stamp(System.currentTimeMillis())
-
- // write TableInfo
- metastore.updateTableSchema(carbonTablePath.getCarbonTableIdentifier,
- carbonTablePath.getCarbonTableIdentifier,
- tableInfo, entry, carbonTablePath.getPath)(sparkSession)
-
- // update the schema modified time
- metastore.updateAndTouchSchemasUpdatedTime(model.hdfsLocation)
-
- // update CarbonDataLoadSchema
- val carbonTable = metastore.lookupRelation(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext.sparkSession).asInstanceOf[CarbonRelation].tableMeta
- .carbonTable
- carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
- }
-
-}
-
-case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation,
- child: LogicalPlan,
- overwrite: Boolean)
- extends RunnableCommand with DataProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val df = Dataset.ofRows(sparkSession, child)
- val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
- val load = LoadTable(
- Some(relation.carbonRelation.databaseName),
- relation.carbonRelation.tableName,
- null,
- Seq(),
- scala.collection.immutable.Map("fileheader" -> header),
- overwrite,
- null,
- Some(df)).run(sparkSession)
- // updating relation metadata. This is in case of auto detect high cardinality
- relation.carbonRelation.metaData =
- CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
- load
- }
-}
-
-case class LoadTable(
- databaseNameOp: Option[String],
- tableName: String,
- factPathFromUser: String,
- dimFilesPath: Seq[DataLoadTableFileMapping],
- options: scala.collection.immutable.Map[String, String],
- isOverwriteTable: Boolean,
- var inputSqlString: String = null,
- dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None) extends RunnableCommand with DataProcessCommand {
-
- private def getFinalOptions(carbonProperty: CarbonProperties):
- scala.collection.mutable.Map[String, String] = {
- var optionsFinal = scala.collection.mutable.Map[String, String]()
- optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
- optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
- optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
- optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
- optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
- optionsFinal.put("columndict", options.getOrElse("columndict", null))
- optionsFinal
- .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N"))
- optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable",
- carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
- val badRecordActionValue = carbonProperty
- .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
- optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- badRecordActionValue)))
- optionsFinal
- .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
- optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
- optionsFinal
- .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$"))
- optionsFinal
- .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:"))
- optionsFinal.put("dateformat", options.getOrElse("dateformat",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
- optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions",
- carbonProperty
- .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
-
- optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
- optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
- optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
- val useOnePass = options.getOrElse("single_pass",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
- case "true" =>
- true
- case "false" =>
- // when single_pass = false and if either alldictionarypath
- // or columnDict is configured the do not allow load
- if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
- StringUtils.isNotEmpty(optionsFinal("columndict"))) {
- throw new MalformedCarbonCommandException(
- "Can not use all_dictionary_path or columndict without single_pass.")
- } else {
- false
- }
- case illegal =>
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
- "Please set it as 'true' or 'false'")
- false
- }
- optionsFinal.put("single_pass", useOnePass.toString)
- optionsFinal
- }
-
- private def checkDefaultValue(value: String, default: String) = {
- if (StringUtils.isEmpty(value)) {
- default
- } else {
- value
- }
- }
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- if (dataFrame.isDefined && updateModel.isEmpty) {
- val rdd = dataFrame.get.rdd
- if (rdd.partitions == null || rdd.partitions.length == 0) {
- LOGGER.warn("DataLoading finished. No data was loaded.")
- return Seq.empty
- }
- }
-
- val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- if (null == relation.tableMeta.carbonTable) {
- LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
- LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
- sys.error(s"Data loading failed. table not found: $dbName.$tableName")
- }
-
- val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
- carbonProperty.addProperty("zookeeper.enable.lock", "false")
- val optionsFinal = getFinalOptions(carbonProperty)
-
- val tableProperties = relation.tableMeta.carbonTable.getTableInfo
- .getFactTable.getTableProperties
-
- optionsFinal.put("sort_scope", tableProperties.getOrDefault("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-
- try {
- val factPath = if (dataFrame.isDefined) {
- ""
- } else {
- FileUtils.getPaths(
- CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
- }
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
- carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
-
- val table = relation.tableMeta.carbonTable
- carbonLoadModel.setTableName(table.getFactTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-
- val partitionLocation = relation.tableMeta.storePath + "/partition/" +
- relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
- relation.tableMeta.carbonTableIdentifier.getTableName + "/"
- val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
- val sort_scope = optionsFinal("sort_scope")
- val single_pass = optionsFinal("single_pass")
- val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
- val bad_records_action = optionsFinal("bad_records_action")
- val bad_record_path = optionsFinal("bad_record_path")
- val global_sort_partitions = optionsFinal("global_sort_partitions")
- val dateFormat = optionsFinal("dateformat")
- val delimeter = optionsFinal("delimiter")
- val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
- val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
- val all_dictionary_path = optionsFinal("all_dictionary_path")
- val column_dict = optionsFinal("columndict")
- ValidateUtil.validateDateFormat(dateFormat, table, tableName)
- ValidateUtil.validateSortScope(table, sort_scope)
-
- if (bad_records_logger_enable.toBoolean ||
- LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
- if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
- sys.error("Invalid bad records location.")
- }
- }
- carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
- ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
- carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
- carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
- // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
- // we should use table schema to generate file header.
- var fileHeader = optionsFinal("fileheader")
- val headerOption = options.get("header")
- if (headerOption.isDefined) {
- // whether the csv file has file header
- // the default value is true
- val header = try {
- headerOption.get.toBoolean
- } catch {
- case ex: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(
- "'header' option should be either 'true' or 'false'. " + ex.getMessage)
- }
- header match {
- case true =>
- if (fileHeader.nonEmpty) {
- throw new MalformedCarbonCommandException(
- "When 'header' option is true, 'fileheader' option is not required.")
- }
- case false =>
- // generate file header
- if (fileHeader.isEmpty) {
- fileHeader = table.getCreateOrderColumn(table.getFactTableName)
- .asScala.map(_.getColName).mkString(",")
- }
- }
- }
-
- carbonLoadModel.setDateFormat(dateFormat)
- carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
- carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
- carbonLoadModel
- .setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
- optionsFinal("serialization_null_format"))
- carbonLoadModel
- .setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
- carbonLoadModel
- .setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
- carbonLoadModel
- .setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
- optionsFinal("is_empty_data_bad_record"))
- carbonLoadModel.setSortScope(sort_scope)
- carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
- carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setUseOnePass(single_pass.toBoolean)
- if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
- complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
- delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
- sys.error(s"Field Delimiter & Complex types delimiter are same")
- }
- else {
- carbonLoadModel.setComplexDelimiterLevel1(
- CarbonUtil.delimiterConverter(complex_delimeter_level1))
- carbonLoadModel.setComplexDelimiterLevel2(
- CarbonUtil.delimiterConverter(complex_delimeter_level2))
- }
- // set local dictionary path, and dictionary file extension
- carbonLoadModel.setAllDictPath(all_dictionary_path)
-
- val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- try {
- // First system has to partition the data first and then call the load data
- LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
- carbonLoadModel.setFactFilePath(factPath)
- carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
- carbonLoadModel.setCsvHeader(fileHeader)
- carbonLoadModel.setColDictFilePath(column_dict)
- carbonLoadModel.setDirectLoad(true)
- carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
- val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
- optionsFinal("maxcolumns"))
- carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
- GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
- val storePath = relation.tableMeta.storePath
- // add the start entry for the new load in the table status file
- if (!updateModel.isDefined) {
- CommonUtil.
- readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath, isOverwriteTable)
- }
- if (isOverwriteTable) {
- LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
- }
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- }
- if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
- StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
- LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
- LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
- carbonLoadModel.setUseOnePass(false)
- }
- // Create table and metadata folders if not exist
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(storePath, table.getCarbonTableIdentifier)
- val metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath
- val fileType = FileFactory.getFileType(metadataDirectoryPath)
- if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
- FileFactory.mkdirs(metadataDirectoryPath, fileType)
- }
- if (carbonLoadModel.getUseOnePass) {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- .getCarbonTableIdentifier
- val carbonTablePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier)
- val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
- val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
- val colDictFilePath = carbonLoadModel.getColDictFilePath
- if (!StringUtils.isEmpty(colDictFilePath)) {
- carbonLoadModel.initPredefDictMap()
- // generate predefined dictionary
- GlobalDictionaryUtil
- .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
- dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
- }
- if (!StringUtils.isEmpty(all_dictionary_path)) {
- carbonLoadModel.initPredefDictMap()
- GlobalDictionaryUtil
- .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
- carbonLoadModel,
- storePath,
- carbonTableIdentifier,
- dictFolderPath,
- dimensions,
- all_dictionary_path)
- }
- // dictionaryServerClient dictionary generator
- val dictionaryServerPort = carbonProperty
- .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
- CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
- val sparkDriverHost = sparkSession.sqlContext.sparkContext.
- getConf.get("spark.driver.host")
- carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
- // start dictionary server when use one pass load and dimension with DICTIONARY
- // encoding is present.
- val allDimensions = table.getAllDimensions.asScala.toList
- val createDictionary = allDimensions.exists {
- carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
- }
- val server: Option[DictionaryServer] = if (createDictionary) {
- val dictionaryServer = DictionaryServer
- .getInstance(dictionaryServerPort.toInt, carbonTable)
- carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
- sparkSession.sparkContext.addSparkListener(new SparkListener() {
- override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
- dictionaryServer.shutdown()
- }
- })
- Some(dictionaryServer)
- } else {
- None
- }
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- columnar,
- partitionStatus,
- server,
- isOverwriteTable,
- dataFrame,
- updateModel)
- } else {
- val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
- val fields = dataFrame.get.schema.fields
- import org.apache.spark.sql.functions.udf
- // extracting only segment from tupleId
- val getSegIdUDF = udf((tupleId: String) =>
- CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
- // getting all fields except tupleId field as it is not required in the value
- var otherFields = fields.toSeq
- .filter(field => !field.name
- .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
- .map(field => new Column(field.name))
-
- // extract tupleId field which will be used as a key
- val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
- .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
- as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
- // use dataFrameWithoutTupleId as dictionaryDataFrame
- val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
- otherFields = otherFields :+ segIdColumn
- // use dataFrameWithTupleId as loadDataFrame
- val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
- (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
- } else {
- (dataFrame, dataFrame)
- }
-
- GlobalDictionaryUtil.generateGlobalDictionary(
- sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- dictionaryDataFrame)
- CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
- carbonLoadModel,
- relation.tableMeta.storePath,
- columnar,
- partitionStatus,
- None,
- isOverwriteTable,
- loadDataFrame,
- updateModel)
- }
- } catch {
- case CausedBy(ex: NoRetryException) =>
- LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
- throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
- case ex: Exception =>
- LOGGER.error(ex)
- LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
- throw ex
- } finally {
- // Once the data load is successful delete the unwanted partition files
- try {
- val fileType = FileFactory.getFileType(partitionLocation)
- if (FileFactory.isFileExist(partitionLocation, fileType)) {
- val file = FileFactory
- .getCarbonFile(partitionLocation, fileType)
- CarbonUtil.deleteFoldersAndFiles(file)
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(ex)
- LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
- "Problem deleting the partition folder")
- throw ex
- }
-
- }
- } catch {
- case dle: DataLoadingException =>
- LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
- throw dle
- case mce: MalformedCarbonCommandException =>
- LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
- throw mce
- }
- Seq.empty
- }
-}
-
-case class CleanFiles(
- databaseNameOp: Option[String],
- tableName: String, forceTableClean: Boolean = false)
- extends RunnableCommand with DataProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- if (forceTableClean) {
- CarbonStore.cleanFiles(
- getDB.getDatabaseName(databaseNameOp, sparkSession),
- tableName,
- CarbonEnv.getInstance(sparkSession).storePath,
- null,
- forceTableClean)
- } else {
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val relation = catalog
- .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
- val carbonTable = relation.tableMeta.carbonTable
- CarbonStore.cleanFiles(
- getDB.getDatabaseName(databaseNameOp, sparkSession),
- tableName,
- relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
- carbonTable,
- forceTableClean)
- }
- Seq.empty
- }
-}
-
-case class ShowLoads(
- databaseNameOp: Option[String],
- tableName: String,
- limit: Option[String],
- override val output: Seq[Attribute]) extends RunnableCommand with DataProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
- lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
- tableMeta.carbonTable
- CarbonStore.showSegments(
- getDB.getDatabaseName(databaseNameOp, sparkSession),
- tableName,
- limit,
- carbonTable.getMetaDataFilepath
- )
- }
-}
-
-case class CarbonDropTableCommand(ifExistsSet: Boolean,
- databaseNameOp: Option[String],
- tableName: String)
- extends RunnableCommand with SchemaProcessCommand with DataProcessCommand {
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- processData(sparkSession)
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
- val identifier = TableIdentifier(tableName, Option(dbName))
- val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
- val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
- val carbonEnv = CarbonEnv.getInstance(sparkSession)
- val catalog = carbonEnv.carbonMetastore
- val storePath = carbonEnv.storePath
- val tableIdentifier =
- AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath,
- dbName.toLowerCase, tableName.toLowerCase)
- catalog.checkSchemasModifiedTimeAndReloadTables(tableIdentifier.getStorePath)
- val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
- try {
- locksToBeAcquired foreach {
- lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
- }
- LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
- LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
- } catch {
- case ex: Exception =>
- LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
- sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
- } finally {
- if (carbonLocks.nonEmpty) {
- val unlocked = carbonLocks.forall(_.unlock())
- if (unlocked) {
- logInfo("Table MetaData Unlocked Successfully")
- }
- }
- }
- Seq.empty
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- // delete the table folder
- val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tableIdentifier =
- AbsoluteTableIdentifier.from(CarbonEnv.getInstance(sparkSession).storePath, dbName, tableName)
- val metadataFilePath =
- CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
- val fileType = FileFactory.getFileType(metadataFilePath)
- if (FileFactory.isFileExist(metadataFilePath, fileType)) {
- val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
- CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
- }
- Seq.empty
- }
-}
-
-private[sql] case class DescribeCommandFormatted(
- child: SparkPlan,
- override val output: Seq[Attribute],
- tblIdentifier: TableIdentifier)
- extends RunnableCommand with SchemaProcessCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processSchema(sparkSession)
- }
-
- private def getColumnGroups(dimensions: List[CarbonDimension]): Seq[(String, String, String)] = {
- var results: Seq[(String, String, String)] =
- Seq(("", "", ""), ("##Column Group Information", "", ""))
- val groupedDimensions = dimensions.groupBy(x => x.columnGroupId()).filter {
- case (groupId, _) => groupId != -1
- }.toSeq.sortBy(_._1)
- val groups = groupedDimensions.map(colGroups => {
- colGroups._2.map(dim => dim.getColName).mkString(", ")
- })
- var index = 1
- groups.foreach { x =>
- results = results :+ (s"Column Group $index", x, "")
- index = index + 1
- }
- results
- }
-
- override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
- val mapper = new ObjectMapper()
- val colProps = StringBuilder.newBuilder
- val dims = relation.metaData.dims.map(x => x.toLowerCase)
- var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
- val fieldName = field.name.toLowerCase
- val comment = if (dims.contains(fieldName)) {
- val dimension = relation.metaData.carbonTable.getDimensionByName(
- relation.tableMeta.carbonTableIdentifier.getTableName, fieldName)
- if (null != dimension.getColumnProperties && !dimension.getColumnProperties.isEmpty) {
- colProps.append(fieldName).append(".")
- .append(mapper.writeValueAsString(dimension.getColumnProperties))
- .append(",")
- }
- if (dimension.hasEncoding(Encoding.DICTIONARY) &&
- !dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- "DICTIONARY, KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
- case false => ",NOINVERTEDINDEX"
- case _ => ""
- })
- } else {
- "KEY COLUMN" + (dimension.hasEncoding(Encoding.INVERTED_INDEX) match {
- case false => ",NOINVERTEDINDEX"
- case _ => ""
- })
- }
- } else {
- "MEASURE"
- }
- (field.name, field.dataType.simpleString, comment)
- }
- val colPropStr = if (colProps.toString().trim().length() > 0) {
- // drops additional comma at end
- colProps.toString().dropRight(1)
- } else {
- colProps.toString()
- }
- results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
- results ++= Seq(("Database Name: ", relation.tableMeta.carbonTableIdentifier
- .getDatabaseName, "")
- )
- results ++= Seq(("Table Name: ", relation.tableMeta.carbonTableIdentifier.getTableName, ""))
- results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, ""))
- val carbonTable = relation.tableMeta.carbonTable
- results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", ""))
- results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable
- .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
- .LOAD_SORT_SCOPE_DEFAULT), CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
- results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
- if (colPropStr.length() > 0) {
- results ++= Seq((colPropStr, "", ""))
- } else {
- results ++= Seq(("ADAPTIVE", "", ""))
- }
- results ++= Seq(("SORT_COLUMNS", relation.metaData.carbonTable.getSortColumns(
- relation.tableMeta.carbonTableIdentifier.getTableName).asScala
- .map(column => column).mkString(","), ""))
- val dimension = carbonTable
- .getDimensionByTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
- results ++= getColumnGroups(dimension.asScala.toList)
- if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
- results ++=
- Seq(("Partition Columns: ", carbonTable.getPartitionInfo(carbonTable.getFactTableName)
- .getColumnSchemaList.asScala.map(_.getColumnName).mkString(","), ""))
- }
- results.map { case (name, dataType, comment) =>
- Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
new file mode 100644
index 0000000..28c53a1
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{AlterTableModel, DataProcessCommand, RunnableCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/**
+ * Command for the compaction in alter table command
+ */
+case class AlterTableCompactionCommand(
+ alterTableModel: AlterTableModel)
+ extends RunnableCommand with DataProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val tableName = alterTableModel.tableName.toLowerCase
+ val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
+ val relation =
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(databaseName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ sys.error(s"Table $databaseName.$tableName does not exist")
+ }
+ if (null == relation.tableMeta.carbonTable) {
+ LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
+ sys.error(s"alter table failed. table not found: $databaseName.$tableName")
+ }
+
+ val carbonLoadModel = new CarbonLoadModel()
+
+ val table = relation.tableMeta.carbonTable
+ carbonLoadModel.setTableName(table.getFactTableName)
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
+ carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
+ carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
+
+ var storeLocation = CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+ System.getProperty("java.io.tmpdir")
+ )
+ storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+ try {
+ CarbonDataRDDFactory
+ .alterTableForCompaction(sparkSession.sqlContext,
+ alterTableModel,
+ carbonLoadModel,
+ storeLocation
+ )
+ } catch {
+ case e: Exception =>
+ if (null != e.getMessage) {
+ sys.error(s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+ } else {
+ sys.error("Exception in compaction. Please check logs for more info.")
+ }
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
new file mode 100644
index 0000000..2003bb1
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.api.CarbonStore
+
+case class CarbonShowLoadsCommand(
+ databaseNameOp: Option[String],
+ tableName: String,
+ limit: Option[String],
+ override val output: Seq[Attribute])
+ extends RunnableCommand with DataProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
+ CarbonStore.showSegments(
+ GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ limit,
+ carbonTable.getMetaDataFilepath
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
new file mode 100644
index 0000000..9406335
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.api.CarbonStore
+
+case class CleanFilesCommand(
+ databaseNameOp: Option[String],
+ tableName: String, forceTableClean: Boolean = false)
+ extends RunnableCommand with DataProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ if (forceTableClean) {
+ CarbonStore.cleanFiles(
+ GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ CarbonEnv.getInstance(sparkSession).storePath,
+ null,
+ forceTableClean)
+ } else {
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val relation = catalog
+ .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ val carbonTable = relation.tableMeta.carbonTable
+ CarbonStore.cleanFiles(
+ GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+ carbonTable,
+ forceTableClean)
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
new file mode 100644
index 0000000..1ea4508
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.api.CarbonStore
+
+case class DeleteLoadByIdCommand(
+ loadIds: Seq[String],
+ databaseNameOp: Option[String],
+ tableName: String) extends RunnableCommand with DataProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
+ CarbonStore.deleteLoadById(
+ loadIds,
+ GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ carbonTable
+ )
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
new file mode 100644
index 0000000..3d06b18
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, RunnableCommand}
+import org.apache.spark.sql.hive.CarbonRelation
+
+import org.apache.carbondata.api.CarbonStore
+
+case class DeleteLoadByLoadDateCommand(
+ databaseNameOp: Option[String],
+ tableName: String,
+ dateField: String,
+ loadDate: String)
+ extends RunnableCommand with DataProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
+ tableMeta.carbonTable
+ CarbonStore.deleteLoadByDate(
+ loadDate,
+ GetDB.getDatabaseName(databaseNameOp, sparkSession),
+ tableName,
+ carbonTable
+ )
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6627cac0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
new file mode 100644
index 0000000..3f0e093
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableByInsertCommand.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.{DataProcessCommand, RunnableCommand}
+
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
+case class LoadTableByInsertCommand(
+ relation: CarbonDatasourceHadoopRelation,
+ child: LogicalPlan,
+ overwrite: Boolean)
+ extends RunnableCommand with DataProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processData(sparkSession)
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val df = Dataset.ofRows(sparkSession, child)
+ val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
+ val load = LoadTableCommand(
+ Some(relation.carbonRelation.databaseName),
+ relation.carbonRelation.tableName,
+ null,
+ Seq(),
+ scala.collection.immutable.Map("fileheader" -> header),
+ overwrite,
+ null,
+ Some(df)).run(sparkSession)
+ // updating relation metadata. This is in case of auto detect high cardinality
+ relation.carbonRelation.metaData =
+ CarbonSparkUtil.createSparkMeta(relation.carbonRelation.tableMeta.carbonTable)
+ load
+ }
+}