You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/04/14 10:54:48 UTC
[1/2] incubator-carbondata git commit: added AlterTableAddColumnRDD
to AlterTableCommands
Repository: incubator-carbondata
Updated Branches:
refs/heads/master f27b4918c -> 19b9223eb
added AlterTableAddColumnRDD to AlterTableCommands
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/36112fa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/36112fa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/36112fa4
Branch: refs/heads/master
Commit: 36112fa46d2a15e6403af97a7d8677231bedc8d0
Parents: f27b491
Author: kunal642 <ku...@knoldus.in>
Authored: Tue Apr 11 14:15:10 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Apr 14 16:16:52 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/AlterTableAddColumnRDD.scala | 6 +-
.../execution/command/carbonTableSchema.scala | 8 +--
.../execution/command/AlterTableCommands.scala | 62 ++++++++++++--------
.../org/apache/spark/util/AlterTableUtil.scala | 16 +++--
.../restructure/AlterTableRevertTestCase.scala | 19 +++++-
5 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index bb65b0b..ab1fd9c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.spark.rdd
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.AlterTableAddColumnsModel
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -49,13 +48,12 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
*/
class AlterTableAddColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
- alterTableModel: AlterTableAddColumnsModel,
carbonTableIdentifier: CarbonTableIdentifier,
carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
- new DropColumnPartition(id, column._2, column._1)
+ new AddColumnPartition(id, column._2, column._1)
}.toArray
}
@@ -65,7 +63,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
val iter = new Iterator[(Int, String)] {
try {
- val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
+ val columnSchema = split.asInstanceOf[AddColumnPartition].columnSchema
// create dictionary file if it is a dictionary column
if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
!columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 117b365..5108df8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -152,7 +152,7 @@ case class AlterTableDropColumnModel(databaseName: Option[String],
tableName: String,
columns: List[String])
-class AlterTableProcessor(
+class AlterTableColumnSchemaGenerator(
alterTableModel: AlterTableAddColumnsModel,
dbName: String,
tableInfo: TableInfo,
@@ -253,12 +253,6 @@ class AlterTableProcessor(
}
}
}
- // generate dictionary files for the newly added columns
- new AlterTableAddColumnRDD(sc,
- newCols,
- alterTableModel,
- tableIdentifier,
- storePath).collect()
tableSchema.setListOfColumns(allColumns.asJava)
tableInfo.setLastUpdatedTime(System.currentTimeMillis())
tableInfo.setFactTable(tableSchema)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index e380217..8b194da 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -28,17 +28,15 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
+import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
private[sql] case class AlterTableAddColumns(
@@ -52,13 +50,15 @@ private[sql] case class AlterTableAddColumns(
.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
- val locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
- // get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ var locks = List.empty[ICarbonLock]
+ var lastUpdatedTime = 0L
var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
- val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ // get the latest carbon table and check for column existence
+ lastUpdatedTime = carbonTable.getTableLastUpdatedTime
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
@@ -71,12 +71,17 @@ private[sql] case class AlterTableAddColumns(
dbName,
tableName,
carbonTable.getStorePath)
- newCols = new AlterTableProcessor(alterTableAddColumnsModel,
+ newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel,
dbName,
wrapperTableInfo,
carbonTablePath,
carbonTable.getCarbonTableIdentifier,
carbonTable.getStorePath, sparkSession.sparkContext).process
+ // generate dictionary files for the newly added columns
+ new AlterTableAddColumnRDD(sparkSession.sparkContext,
+ newCols,
+ carbonTable.getCarbonTableIdentifier,
+ carbonTable.getStorePath).collect()
val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
schemaEvolutionEntry.setAdded(newCols.toList.asJava)
@@ -92,7 +97,7 @@ private[sql] case class AlterTableAddColumns(
} catch {
case e: Exception => LOGGER
.error("Alter table add columns failed :" + e.getMessage)
- if (!newCols.isEmpty) {
+ if (newCols.nonEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
new AlterTableDropColumnRDD(sparkSession.sparkContext,
newCols,
@@ -100,7 +105,7 @@ private[sql] case class AlterTableAddColumns(
carbonTable.getStorePath).collect()
}
AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
- sys.error("Alter table add column operation failed. Please check the logs")
+ sys.error(s"Alter table add operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
AlterTableUtil.releaseLocks(locks, LOGGER)
@@ -147,12 +152,14 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
LockUsage.DELETE_SEGMENT_LOCK,
LockUsage.CLEAN_FILES_LOCK,
LockUsage.DROP_TABLE_LOCK)
- val locks = AlterTableUtil
- .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
- sparkSession)
+ var locks = List.empty[ICarbonLock]
+ var lastUpdatedTime = 0L
val carbonTable = relation.tableMeta.carbonTable
- val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
+ sparkSession)
+ lastUpdatedTime = carbonTable.getTableLastUpdatedTime
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
@@ -197,7 +204,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
sparkSession)
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
- sys.error("Alter table rename table operation failed. Please check the logs")
+ sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
AlterTableUtil.releaseLocks(locks, LOGGER)
@@ -242,13 +249,15 @@ private[sql] case class AlterTableDropColumns(
val dbName = alterTableDropColumnModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
+ var locks = List.empty[ICarbonLock]
+ var lastUpdatedTime = 0L
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
- val locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
- val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ lastUpdatedTime = carbonTable.getTableLastUpdatedTime
// check each column existence in the table
val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -318,7 +327,7 @@ private[sql] case class AlterTableDropColumns(
case e: Exception => LOGGER
.error("Alter table drop columns failed : " + e.getMessage)
AlterTableUtil.revertDropColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
- sys.error("Alter table drop column operation failed. Please check the logs")
+ sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
AlterTableUtil.releaseLocks(locks, LOGGER)
@@ -338,15 +347,16 @@ private[sql] case class AlterTableDataTypeChange(
.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
- val locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ var locks = List.empty[ICarbonLock]
// get the latest carbon table and check for column existence
val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
- val lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+ var lastUpdatedTime = 0L
try {
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ lastUpdatedTime = carbonTable.getTableLastUpdatedTime
val columnName = alterTableDataTypeChangeModel.columnName
val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
-
if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
LOGGER.audit(s"Alter table change data type request has failed. " +
s"Column $columnName does not exist")
@@ -397,7 +407,7 @@ private[sql] case class AlterTableDataTypeChange(
case e: Exception => LOGGER
.error("Alter table change datatype failed : " + e.getMessage)
AlterTableUtil.revertDataTypeChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
- sys.error("Alter table data type change operation failed. Please check the logs")
+ sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
AlterTableUtil.releaseLocks(locks, LOGGER)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 5057d75..f5248f5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -64,11 +64,17 @@ object AlterTableUtil {
}
// acquire the lock first
val table = relation.tableMeta.carbonTable
- var acquiredLocks = ListBuffer[ICarbonLock]()
- locksToBeAcquired.foreach { lock =>
- acquiredLocks += getLockObject(table, lock, LOGGER)
+ val acquiredLocks = ListBuffer[ICarbonLock]()
+ try {
+ locksToBeAcquired.foreach { lock =>
+ acquiredLocks += getLockObject(table, lock, LOGGER)
+ }
+ acquiredLocks.toList
+ } catch {
+ case e: Exception =>
+ releaseLocks(acquiredLocks.toList, LOGGER)
+ throw e
}
- acquiredLocks.toList
}
/**
@@ -249,7 +255,7 @@ object AlterTableUtil {
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime > lastUpdatedTime) {
- LOGGER.error(s"Reverting changes for $dbName.$tableName")
+ LOGGER.info(s"Reverting changes for $dbName.$tableName")
val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
thriftTable.fact_table.table_columns.removeAll(addedSchemas)
CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/36112fa4/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 05b79a8..c9244bc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.common.util.QueryTest
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.metadata.CarbonMetadata
class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
@@ -41,7 +42,7 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
sql(
"Alter table reverttest add columns(newField string) TBLPROPERTIES" +
- "('DICTIONARY_EXCLUDE'='newField','DEFAULT.VALUE.charfield'='def')")
+ "('DEFAULT.VALUE.newField'='def')")
hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
intercept[AnalysisException] {
sql("select newField from reverttest")
@@ -78,6 +79,22 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
sql("select intfield from reverttest").schema.fields.apply(0).dataType.simpleString == "int")
}
+ test("test to check if dictionary files are deleted for new column if query fails") {
+ intercept[RuntimeException] {
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
+ sql(
+ "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
+ "('DEFAULT.VALUE.newField'='def')")
+ hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
+ intercept[AnalysisException] {
+ sql("select newField from reverttest")
+ }
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default_reverttest")
+
+ assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6)
+ }
+ }
+
override def afterAll() {
hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
sql("drop table if exists reverttest")
[2/2] incubator-carbondata git commit: [CARBONDATA-863] Fixed column
clean up during altertable add column failure. This closes #777
Posted by gv...@apache.org.
[CARBONDATA-863] Fixed column clean up during altertable add column failure. This closes #777
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/19b9223e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/19b9223e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/19b9223e
Branch: refs/heads/master
Commit: 19b9223eb08b7673144ba549d0820adf243c01cc
Parents: f27b491 36112fa
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Fri Apr 14 16:23:58 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Apr 14 16:23:58 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/AlterTableAddColumnRDD.scala | 6 +-
.../execution/command/carbonTableSchema.scala | 8 +--
.../execution/command/AlterTableCommands.scala | 62 ++++++++++++--------
.../org/apache/spark/util/AlterTableUtil.scala | 16 +++--
.../restructure/AlterTableRevertTestCase.scala | 19 +++++-
5 files changed, 68 insertions(+), 43 deletions(-)
----------------------------------------------------------------------