You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/16 09:28:45 UTC
[02/14] incubator-carbondata git commit: code review fixes
code review fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/c230b8cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/c230b8cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/c230b8cc
Branch: refs/heads/master
Commit: c230b8ccab74a01a0112728cd31e1adb8e6a750a
Parents: 6b3b16c
Author: nareshpr <pr...@gmail.com>
Authored: Wed Mar 15 18:37:03 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Mar 16 14:50:43 2017 +0530
----------------------------------------------------------------------
.../carbondata/spark/util/CarbonScalaUtil.scala | 44 --
.../carbondata/spark/util/CommonUtil.scala | 7 -
.../spark/util/DataTypeConverterUtil.scala | 36 +-
.../spark/util/GlobalDictionaryUtil.scala | 10 +
.../readsupport/SparkRowReadSupportImpl.java | 3 +-
.../spark/sql/CarbonDictionaryDecoder.scala | 42 +-
.../execution/command/AlterTableCommands.scala | 383 +++++++++++++++++
.../execution/command/carbonTableSchema.scala | 411 +------------------
.../apache/spark/sql/hive/CarbonMetastore.scala | 2 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 5 +-
.../org/apache/spark/util/AlterTableUtil.scala | 99 +++++
11 files changed, 537 insertions(+), 505 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 39b27ca..3a0e395 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -20,11 +20,8 @@ package org.apache.carbondata.spark.util
import java.io.File
import java.text.SimpleDateFormat
-import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.execution.command.DataTypeInfo
-import org.apache.spark.sql.hive.HiveExternalCatalog._
-import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -270,45 +267,4 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}
-
- /**
- * This method will create a copy of the same object
- *
- * @param thriftColumnSchema object to be cloned
- * @return
- */
- def createColumnSchemaCopyObject(thriftColumnSchema: org.apache.carbondata.format.ColumnSchema)
- : org.apache.carbondata.format.ColumnSchema = {
- val columnSchema = new org.apache.carbondata.format.ColumnSchema
- columnSchema.column_group_id = thriftColumnSchema.column_group_id
- columnSchema.column_name = thriftColumnSchema.column_name
- columnSchema.columnProperties = thriftColumnSchema.columnProperties
- columnSchema.columnReferenceId = thriftColumnSchema.columnReferenceId
- columnSchema.column_id = thriftColumnSchema.column_id
- columnSchema.data_type = thriftColumnSchema.data_type
- columnSchema.default_value = thriftColumnSchema.default_value
- columnSchema.encoders = thriftColumnSchema.encoders
- columnSchema.invisible = thriftColumnSchema.invisible
- columnSchema.columnar = thriftColumnSchema.columnar
- columnSchema.dimension = thriftColumnSchema.dimension
- columnSchema.num_child = thriftColumnSchema.num_child
- columnSchema.precision = thriftColumnSchema.precision
- columnSchema.scale = thriftColumnSchema.scale
- columnSchema.schemaOrdinal = thriftColumnSchema.schemaOrdinal
- columnSchema
- }
-
- def prepareSchemaJsonForAlterTable(sparkConf: SparkConf, schemaJsonString: String): String = {
- val threshold = sparkConf
- .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
- CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
- // Split the JSON string.
- val parts = schemaJsonString.grouped(threshold).toSeq
- var schemaParts: Seq[String] = Seq.empty
- schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
- parts.zipWithIndex.foreach { case (part, index) =>
- schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
- }
- schemaParts.mkString(",")
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index cf88b8c..7592e4e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -345,11 +345,4 @@ object CommonUtil {
csvColumns
}
-
- def validateColumnNames(columnName: String, columnNameCopy: String): Unit = {
- if (!columnName.equalsIgnoreCase(columnNameCopy)) {
- throw new MalformedCarbonCommandException(
- "Column names provided are different. Both the column names should be same")
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 475650f..6a43440 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -18,6 +18,7 @@
package org.apache.carbondata.spark.util
import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.format.{DataType => ThriftDataType}
object DataTypeConverterUtil {
def convertToCarbonType(dataType: String): DataType = {
@@ -89,33 +90,22 @@ object DataTypeConverterUtil {
* @param dataType
* @return
*/
- def convertToThriftDataType(dataType: String): org.apache.carbondata.format.DataType = {
+ def convertToThriftDataType(dataType: String): ThriftDataType = {
if (null == dataType) {
return null
}
dataType match {
- case "string" =>
- org.apache.carbondata.format.DataType.STRING
- case "int" =>
- org.apache.carbondata.format.DataType.INT
- case "short" =>
- org.apache.carbondata.format.DataType.SHORT
- case "long" | "bigint" =>
- org.apache.carbondata.format.DataType.LONG
- case "double" =>
- org.apache.carbondata.format.DataType.DOUBLE
- case "decimal" =>
- org.apache.carbondata.format.DataType.DECIMAL
- case "date" =>
- org.apache.carbondata.format.DataType.DATE
- case "timestamp" =>
- org.apache.carbondata.format.DataType.TIMESTAMP
- case "array" =>
- org.apache.carbondata.format.DataType.ARRAY
- case "struct" =>
- org.apache.carbondata.format.DataType.STRUCT
- case _ =>
- org.apache.carbondata.format.DataType.STRING
+ case "string" => ThriftDataType.STRING
+ case "int" => ThriftDataType.INT
+ case "short" => ThriftDataType.SHORT
+ case "long" | "bigint" => ThriftDataType.LONG
+ case "double" => ThriftDataType.DOUBLE
+ case "decimal" => ThriftDataType.DECIMAL
+ case "date" => ThriftDataType.DATE
+ case "timestamp" => ThriftDataType.TIMESTAMP
+ case "array" => ThriftDataType.ARRAY
+ case "struct" => ThriftDataType.STRUCT
+ case _ => ThriftDataType.STRING
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index dc131b1..13c90fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -789,6 +789,16 @@ object GlobalDictionaryUtil {
}
}
+ /**
+ * This method will write dictionary file, sortindex file and dictionary meta for new dictionary
+ * column with default value
+ *
+ * @param carbonTablePath
+ * @param columnSchema
+ * @param tableIdentifier
+ * @param storePath
+ * @param defaultValue
+ */
def loadDefaultDictionaryValueForNewColumn(carbonTablePath: CarbonTablePath,
columnSchema: ColumnSchema,
tableIdentifier: CarbonTableIdentifier,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index d7bab75..62767fd 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
@@ -47,7 +48,7 @@ public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
if (dictionaries[i] != null) {
data[i] = DataTypeUtil
.getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]),
- dataTypes[i]);
+ (CarbonDimension) carbonColumns[i]);
if (data[i] == null) {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 126f0c5..621a960 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -123,27 +123,28 @@ case class CarbonDictionaryDecoder(
val getDictionaryColumnIds = {
val attributes = child.output
- val dictIds: Array[(String, ColumnIdentifier, DataType)] = attributes.map { a =>
- val attr = aliasMap.getOrElse(a, a)
- val relation = relations.find(p => p.contains(attr))
- if (relation.isDefined && canBeDecoded(attr)) {
- val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
- val carbonDimension =
- carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
- if (carbonDimension != null &&
- carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !carbonDimension.isComplex()) {
- (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
- carbonDimension.getDataType)
+ val dictIds: Array[(String, ColumnIdentifier, DataType, CarbonDimension)] =
+ attributes.map { a =>
+ val attr = aliasMap.getOrElse(a, a)
+ val relation = relations.find(p => p.contains(attr))
+ if (relation.isDefined && canBeDecoded(attr)) {
+ val carbonTable = relation.get.carbonRelation.carbonRelation.metaData.carbonTable
+ val carbonDimension =
+ carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
+ if (carbonDimension != null &&
+ carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+ !carbonDimension.isComplex()) {
+ (carbonTable.getFactTableName, carbonDimension.getColumnIdentifier,
+ carbonDimension.getDataType, carbonDimension)
+ } else {
+ (null, null, null, null)
+ }
} else {
- (null, null, null)
+ (null, null, null, null)
}
- } else {
- (null, null, null)
- }
- }.toArray
+ }.toArray
dictIds
}
@@ -184,6 +185,7 @@ case class CarbonDictionaryDecoder(
)
new Iterator[InternalRow] {
val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
+
override final def hasNext: Boolean = {
iter.hasNext
}
@@ -195,7 +197,7 @@ case class CarbonDictionaryDecoder(
if (data(index) != null) {
data(index) = DataTypeUtil.getDataBasedOnDataType(dicts(index)
.getDictionaryValueForKeyInBytes(data(index).asInstanceOf[Int]),
- getDictionaryColumnIds(index)._3)
+ getDictionaryColumnIds(index)._4)
}
}
val result = unsafeProjection(new GenericMutableRow(data))
@@ -217,7 +219,7 @@ case class CarbonDictionaryDecoder(
}
private def getDictionary(atiMap: Map[String, AbsoluteTableIdentifier],
- cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
+ cache: Cache[DictionaryColumnUniqueIdentifier, Dictionary]) = {
val dictionaryColumnIds = getDictionaryColumnIds.map { dictionaryId =>
if (dictionaryId._2 != null) {
new DictionaryColumnUniqueIdentifier(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
new file mode 100644
index 0000000..003899f
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
+import org.apache.spark.util.AlterTableUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.dictionary.ManageDictionary
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
+
+private[sql] case class AlterTableAddColumns(
+ alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val tableName = alterTableAddColumnsModel.tableName
+ val dbName = alterTableAddColumnsModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
+ val carbonLock = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val thriftTableInfo: TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(thriftTableInfo,
+ dbName,
+ tableName,
+ carbonTable.getStorePath)
+ val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
+ dbName,
+ wrapperTableInfo,
+ carbonTablePath,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).process
+ val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+ schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
+ schemaEvolutionEntry.setAdded(newCols.toList.asJava)
+ val thriftTable = schemaConverter
+ .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
+ thriftTable)(sparkSession,
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog])
+ LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error("Alter table add columns failed : " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table add columns lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table add columns operation")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableRenameModel)
+ extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
+ val newTableIdentifier = alterTableRenameModel.newTableIdentifier
+ val oldDatabaseName = oldTableIdentifier.database
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ val newDatabaseName = newTableIdentifier.database
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
+ throw new MalformedCarbonCommandException("Database name should be same for both tables")
+ }
+ val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
+ if (tableExists) {
+ throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
+ s"already exists")
+ }
+ val oldTableName = oldTableIdentifier.table.toLowerCase
+ val newTableName = newTableIdentifier.table.toLowerCase
+ LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
+ val relation: CarbonRelation =
+ CarbonEnv.get.carbonMetastore
+ .lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Rename table request has failed. " +
+ s"Table $oldDatabaseName.$oldTableName does not exist")
+ }
+ val carbonTable = relation.tableMeta.carbonTable
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.METADATA_LOCK)
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info("Successfully able to get the table metadata file lock")
+ } else {
+ sys.error("Table is locked for updation. Please try after some time")
+ }
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setTableName(newTableName)
+ val fileType = FileFactory.getFileType(tableMetadataFile)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+ .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+ newTableName)
+ if (!rename) {
+ sys.error(s"Rename failed for table $oldTableName")
+ }
+ }
+ val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
+ newTableName,
+ carbonTable.getCarbonTableIdentifier.getTableId)
+ val newTablePath = CarbonEnv.get.carbonMetastore.updateTableSchema(newTableIdentifier,
+ tableInfo,
+ schemaEvolutionEntry,
+ carbonTable.getStorePath)(sparkSession)
+ CarbonEnv.get.carbonMetastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $newTableName")
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ .runSqlHive(
+ s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
+ s"('tableName'='$newTableName', " +
+ s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+ LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
+ LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
+ } catch {
+ case e: Exception => LOGGER.error("Rename table failed: " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during rename table")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class AlterTableDropColumns(
+ alterTableDropColumnModel: AlterTableDropColumnModel) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val tableName = alterTableDropColumnModel.tableName
+ val dbName = alterTableDropColumnModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+ val carbonLock = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ // check each column existence in the table
+ val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
+ var dictionaryColumns = ListBuffer[CarbonColumn]()
+ var keyColumnCountToBeDeleted = 0
+ // TODO: if deleted column list includes shared dictionary/bucketted column throw an error
+ alterTableDropColumnModel.columns.foreach { column =>
+ var columnExist = false
+ tableColumns.foreach { tableColumn =>
+ // column should not be already deleted and should exist in the table
+ if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
+ if (tableColumn.isDimesion) {
+ keyColumnCountToBeDeleted += 1
+ if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
+ dictionaryColumns += tableColumn
+ }
+ }
+ columnExist = true
+ }
+ }
+ if (!columnExist) {
+ sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+ }
+ }
+ // take the total key column count. key column to be deleted should not
+ // be >= key columns in schema
+ var totalKeyColumnInSchema = 0
+ tableColumns.foreach { tableColumn =>
+ // column should not be already deleted and should exist in the table
+ if (!tableColumn.isInvisible && tableColumn.isDimesion) {
+ totalKeyColumnInSchema += 1
+ }
+ }
+ if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
+ sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ // maintain the deleted columns for schema evolution history
+ var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala
+ alterTableDropColumnModel.columns.foreach { column =>
+ columnSchemaList.foreach { columnSchema =>
+ if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
+ deletedColumnSchema += columnSchema.deepCopy
+ columnSchema.invisible = true
+ }
+ }
+ }
+ // add deleted columns to schema evolution history and update the schema
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaEvolutionEntry,
+ tableInfo)(sparkSession,
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog])
+ // TODO: 1. add check for deletion of index tables
+ // delete dictionary files for dictionary column and clear dictionary cache from memory
+ ManageDictionary.deleteDictionaryFileAndCache(dictionaryColumns.toList.asJava, carbonTable)
+ LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error("Alter table drop columns failed : " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table drop columns lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table drop columns operation")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
+
+private[sql] case class AlterTableDataTypeChange(
+ alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand {
+
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ val tableName = alterTableDataTypeChangeModel.tableName
+ val dbName = alterTableDataTypeChangeModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
+ val carbonLock = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+ try {
+ // get the latest carbon table and check for column existence
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val columnName = alterTableDataTypeChangeModel.columnName
+ var carbonColumnToBeModified: CarbonColumn = null
+ val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
+
+ if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Column $columnName does not exist")
+ sys.error(s"Column does not exist: $columnName")
+ }
+ val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
+ if (carbonColumn.size == 1) {
+ CarbonScalaUtil
+ .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn(0))
+ } else {
+ LOGGER.audit(s"Alter table change data type request has failed. " +
+ s"Column $columnName is invalid")
+ sys.error(s"Invalid Column: $columnName")
+ }
+ // read the latest schema file
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier)
+ val tableMetadataFile = carbonTablePath.getSchemaFilePath
+ val tableInfo: TableInfo = CarbonEnv.get.carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ // maintain the added column for schema evolution history
+ var addColumnSchema: ColumnSchema = null
+ var deletedColumnSchema: ColumnSchema = null
+ val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
+ columnSchemaList.foreach { columnSchema =>
+ if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
+ deletedColumnSchema = columnSchema.deepCopy
+ columnSchema.setData_type(DataTypeConverterUtil
+ .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
+ columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
+ columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
+ addColumnSchema = columnSchema
+ }
+ }
+ val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
+ schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
+ schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
+ tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable,
+ schemaEvolutionEntry,
+ tableInfo)(sparkSession,
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog])
+ LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
+ } catch {
+ case e: Exception =>
+ LOGGER.error("Alter table change datatype failed : " + e.getMessage)
+ throw e
+ } finally {
+ // release lock after command execution completion
+ if (carbonLock != null) {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Alter table change data type lock released successfully")
+ } else {
+ LOGGER.error("Unable to release lock during alter table change data type operation")
+ }
+ }
+ }
+ Seq.empty
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index b9160b3..2b75c56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -137,411 +137,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
}
}
-private[sql] case class AlterTableDataTypeChange(
- alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = alterTableDataTypeChangeModel.tableName
- val dbName = alterTableDataTypeChangeModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
- val relation =
- CarbonEnv.get.carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Alter table change data type request has failed. " +
- s"Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- // acquire the lock first
- val table = relation.tableMeta.carbonTable
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- try {
- // get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
- val columnName = alterTableDataTypeChangeModel.columnName
- var carbonColumnToBeModified: CarbonColumn = null
- val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
-
- if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
- LOGGER.audit(s"Alter table change data type request has failed. " +
- s"Column $columnName does not exist")
- sys.error(s"Column does not exist: $columnName")
- }
- val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
- if (carbonColumn.size == 1) {
- CarbonScalaUtil
- .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn(0))
- } else {
- LOGGER.audit(s"Alter table change data type request has failed. " +
- s"Column $columnName is invalid")
- sys.error(s"Invalid Column: $columnName")
- }
- // read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
- // maintain the added column for schema evolution history
- var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
- var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
- val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
- columnSchemaList.foreach { columnSchema =>
- if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
- deletedColumnSchema = CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
- columnSchema.setData_type(DataTypeConverterUtil
- .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
- columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
- columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
- addColumnSchema = columnSchema
- }
- }
- val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
- schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
- schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
- tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis)
- CarbonEnv.get.carbonMetastore
- .updateTableSchema(carbonTable.getCarbonTableIdentifier,
- tableInfo,
- schemaEvolutionEntry,
- carbonTable.getStorePath)(sparkSession)
-
- val tableIdentifier = TableIdentifier(tableName, Some(dbName))
- val schema = CarbonEnv.get.carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).schema.json
- val schemaParts = CarbonScalaUtil
- .prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
- s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
- sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
- LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
- } catch {
- case e: Exception =>
- LOGGER.error("Alter table change datatype failed : " + e.getMessage)
- throw e
- } finally {
- // release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Alter table change data type lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during alter table change data type operation")
- }
- }
- }
- Seq.empty
- }
-}
-
-private[sql] case class AlterTableAddColumns(
- alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = alterTableAddColumnsModel.tableName
- val dbName = alterTableAddColumnsModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
- val relation =
- CarbonEnv.get.carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Alter table add columns request has failed. " +
- s"Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- // acquire the lock first
- val table = relation.tableMeta.carbonTable
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- try {
- // get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
- // read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
- val schemaConverter = new ThriftWrapperSchemaConverterImpl()
- val wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(thriftTableInfo,
- dbName,
- tableName,
- carbonTable.getStorePath)
- val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
- dbName,
- wrapperTableInfo,
- carbonTablePath,
- carbonTable.getCarbonTableIdentifier,
- carbonTable.getStorePath).process
- val schemaEvolutionEntry = new org.apache.carbondata.core.metadata
- .schema.SchemaEvolutionEntry()
- schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
- schemaEvolutionEntry.setAdded(newCols.toList.asJava)
- val thriftTable = schemaConverter
- .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis)
- CarbonEnv.get.carbonMetastore
- .updateTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable,
- schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- carbonTable.getStorePath)(sparkSession)
- val tableIdentifier = TableIdentifier(tableName, Some(dbName))
- val schema = CarbonEnv.get.carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).schema.json
- val schemaParts = CarbonScalaUtil
- .prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
- s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
- sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
- LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
- } catch {
- case e: Exception =>
- LOGGER.error("Alter table add columns failed : " + e.getMessage)
- throw e
- } finally {
- // release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Alter table add columns lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during alter table add columns operation")
- }
- }
- }
- Seq.empty
- }
-}
-
-private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableRenameModel)
- extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier
- val newTableIdentifier = alterTableRenameModel.newTableIdentifier
- val oldDatabaseName = oldTableIdentifier.database
- .getOrElse(sparkSession.catalog.currentDatabase)
- val newDatabaseName = newTableIdentifier.database
- .getOrElse(sparkSession.catalog.currentDatabase)
- if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
- throw new MalformedCarbonCommandException("Database name should be same for both tables")
- }
- val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table)
- if (tableExists) {
- throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " +
- s"already exists")
- }
- val oldTableName = oldTableIdentifier.table.toLowerCase
- val newTableName = newTableIdentifier.table.toLowerCase
- LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
- LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
- val relation: CarbonRelation =
- CarbonEnv.get.carbonMetastore
- .lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Rename table request has failed. " +
- s"Table $oldDatabaseName.$oldTableName does not exist")
- }
- val carbonTable = relation.tableMeta.carbonTable
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- if (carbonLock.lockWithRetries()) {
- LOGGER.info("Successfully able to get the table metadata file lock")
- } else {
- sys.error("Table is locked for updation. Please try after some time")
- }
- try {
- // get the latest carbon table and check for column existence
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
- val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
- schemaEvolutionEntry.setTableName(newTableName)
- val fileType = FileFactory.getFileType(tableMetadataFile)
- if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
- val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
- .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
- newTableName)
- if (!rename) {
- sys.error(s"Rename failed for table $oldTableName")
- }
- }
- val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
- newTableName,
- carbonTable.getCarbonTableIdentifier.getTableId)
- val newTablePath = CarbonEnv.get.carbonMetastore.updateTableSchema(newTableIdentifier,
- tableInfo,
- schemaEvolutionEntry,
- carbonTable.getStorePath)(sparkSession)
- CarbonEnv.get.carbonMetastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- .runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $newTableName")
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
- .runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
- s"('tableName'='$newTableName', " +
- s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
- LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
- LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
- } catch {
- case e: Exception => LOGGER.error("Rename table failed: " + e.getMessage)
- throw e
- } finally {
- // release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during rename table")
- }
- }
- }
- Seq.empty
- }
-}
-
-private[sql] case class AlterTableDropColumns(
- alterTableDropColumnModel: AlterTableDropColumnModel) extends RunnableCommand {
-
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = alterTableDropColumnModel.tableName
- val dbName = alterTableDropColumnModel.databaseName
- .getOrElse(sparkSession.catalog.currentDatabase)
- LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
- val relation =
- CarbonEnv.get.carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession)
- .asInstanceOf[CarbonRelation]
- if (relation == null) {
- LOGGER.audit(s"Alter table drop columns request has failed. " +
- s"Table $dbName.$tableName does not exist")
- sys.error(s"Table $dbName.$tableName does not exist")
- }
- // acquire the lock first
- val table = relation.tableMeta.carbonTable
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- LockUsage.METADATA_LOCK)
- try {
- // get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
- // check each column existence in the table
- val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
- var dictionaryColumns = ListBuffer[CarbonColumn]()
- var keyColumnCountToBeDeleted = 0
- // TODO: if deleted column list includes shared dictionary/bucketted column throw an error
- alterTableDropColumnModel.columns.foreach { column =>
- var columnExist = false
- tableColumns.foreach { tableColumn =>
- // column should not be already deleted and should exist in the table
- if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) {
- if (tableColumn.isDimesion) {
- keyColumnCountToBeDeleted += 1
- if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
- dictionaryColumns += tableColumn
- }
- }
- columnExist = true
- }
- }
- if (!columnExist) {
- sys.error(s"Column $column does not exists in the table $dbName.$tableName")
- }
- }
- // take the total key column count. key column to be deleted should not
- // be >= key columns in schema
- var totalKeyColumnInSchema = 0
- tableColumns.foreach { tableColumn =>
- // column should not be already deleted and should exist in the table
- if (!tableColumn.isInvisible && tableColumn.isDimesion) {
- totalKeyColumnInSchema += 1
- }
- }
- if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
- sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
- }
- // read the latest schema file
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
- val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
- // maintain the deleted columns for schema evolution history
- var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
- val columnSchemaList = tableInfo.fact_table.table_columns.asScala
- alterTableDropColumnModel.columns.foreach { column =>
- columnSchemaList.foreach { columnSchema =>
- if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
- deletedColumnSchema += CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
- columnSchema.invisible = true
- }
- }
- }
- // add deleted columns to schema evolution history and update the schema
- tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
- .setTime_stamp(System.currentTimeMillis)
- val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
- schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
- CarbonEnv.get.carbonMetastore
- .updateTableSchema(carbonTable.getCarbonTableIdentifier,
- tableInfo,
- schemaEvolutionEntry,
- carbonTable.getStorePath)(sparkSession)
-
- val tableIdentifier = TableIdentifier(tableName, Some(dbName))
- val schema = CarbonEnv.get.carbonMetastore
- .lookupRelation(tableIdentifier)(sparkSession).schema.json
- val schemaParts = CarbonScalaUtil
- .prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
- sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
- s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
- sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
- // TODO: 1. add check for deletion of index tables
- // delete dictionary files for dictionary column and clear dictionary cache from memory
- ManageDictionary.deleteDictionaryFileAndCache(dictionaryColumns.toList.asJava, carbonTable)
- LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
- } catch {
- case e: Exception =>
- LOGGER.error("Alter table drop columns failed : " + e.getMessage)
- throw e
- } finally {
- // release lock after command execution completion
- if (carbonLock != null) {
- if (carbonLock.unlock()) {
- LOGGER.info("Alter table drop columns lock released successfully")
- } else {
- LOGGER.error("Unable to release lock during alter table drop columns operation")
- }
- }
- }
- Seq.empty
- }
-}
-
case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -786,8 +381,8 @@ case class LoadTable(
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
val partitionLocation = relation.tableMeta.storePath + "/partition/" +
- relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
- relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+ relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+ relation.tableMeta.carbonTableIdentifier.getTableName + "/"
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
@@ -857,7 +452,7 @@ case class LoadTable(
true
} else {
LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
- "can not be used together, and USE_KETTLE must be set as false")
+ "can not be used together, and USE_KETTLE must be set as false")
false
}
case "false" =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 8c4f9f1..3645f95 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -350,7 +350,6 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
thriftTableInfo,
dbName,
tableName)(sparkSession)
- updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
carbonTablePath
}
@@ -390,6 +389,7 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
val tableMeta = new TableMeta(carbonTableIdentifier, storePath,
CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName))
metadata.tablesMeta += tableMeta
+ updateSchemasUpdatedTime(touchSchemaFileSystemTime(dbName, tableName))
carbonTablePath.getPath
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 59b9f63..75ad559 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -141,7 +141,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
ident ~ opt("(" ~> rep1sep(valueOptions, ",") <~ ")") <~ opt(";") ^^ {
case dbName ~ table ~ change ~ columnName ~ columnNameCopy ~ dataType ~ values =>
// both the column names should be same
- CommonUtil.validateColumnNames(columnName, columnNameCopy)
+ if (!columnName.equalsIgnoreCase(columnNameCopy)) {
+ throw new MalformedCarbonCommandException(
+ "Column names provided are different. Both the column names should be same")
+ }
val alterTableChangeDataTypeModel =
AlterTableDataTypeChangeModel(parseDataType(dataType.toLowerCase, values),
convertDbNameToLowerCase(dbName),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/c230b8cc/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
new file mode 100644
index 0000000..0611e99
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
+import org.apache.spark.sql.hive.HiveExternalCatalog._
+
+import org.apache.carbondata.common.logging.LogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
+
+object AlterTableUtil {
+ def validateTableAndAcquireLock(dbName: String, tableName: String, LOGGER: LogService)
+ (sparkSession: SparkSession): ICarbonLock = {
+ val relation =
+ CarbonEnv.get.carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ LOGGER.audit(s"Alter table request has failed. " +
+ s"Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ // acquire the lock first
+ val table = relation.tableMeta.carbonTable
+ val carbonLock = CarbonLockFactory
+ .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+ LockUsage.METADATA_LOCK)
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info("Successfully able to get the table metadata file lock")
+ } else {
+ sys.error("Table is locked for updation. Please try after some time")
+ }
+ carbonLock
+ }
+
+ def updateSchemaInfo(carbonTable: CarbonTable,
+ schemaEvolutionEntry: SchemaEvolutionEntry,
+ thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {
+ val dbName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getFactTableName
+ CarbonEnv.get.carbonMetastore
+ .updateTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable,
+ schemaEvolutionEntry,
+ carbonTable.getStorePath)(sparkSession)
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ val schema = CarbonEnv.get.carbonMetastore
+ .lookupRelation(tableIdentifier)(sparkSession).schema.json
+ val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
+ catalog.client.runSqlHive(
+ s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ }
+
+ /**
+ * This method will split schema string into multiple parts of configured size and
+ * registers the parts as keys in tableProperties which will be read by spark to prepare
+ * Carbon Table fields
+ *
+ * @param sparkConf
+ * @param schemaJsonString
+ * @return
+ */
+ private def prepareSchemaJsonForAlterTable(sparkConf: SparkConf,
+ schemaJsonString: String): String = {
+ val threshold = sparkConf
+ .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
+ CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
+ // Split the JSON string.
+ val parts = schemaJsonString.grouped(threshold).toSeq
+ var schemaParts: Seq[String] = Seq.empty
+ schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
+ parts.zipWithIndex.foreach { case (part, index) =>
+ schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
+ }
+ schemaParts.mkString(",")
+ }
+}