You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/07 11:52:04 UTC
carbondata git commit: [CARBONDATA-1552] Fixed Alter table commands
issues with spark 2.2
Repository: carbondata
Updated Branches:
refs/heads/master 2bad144a2 -> f0f4d7d09
[CARBONDATA-1552] Fixed Alter table commands issues with spark 2.2
This closes #1628
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f0f4d7d0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f0f4d7d0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f0f4d7d0
Branch: refs/heads/master
Commit: f0f4d7d09a412475dcfde993341340fe25d2aad6
Parents: 2bad144
Author: Manohar <ma...@gmail.com>
Authored: Thu Dec 7 01:24:14 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Dec 7 17:21:40 2017 +0530
----------------------------------------------------------------------
.../apache/spark/sql/common/util/QueryTest.scala | 6 +++---
.../schema/CarbonAlterTableAddColumnCommand.scala | 4 +++-
.../CarbonAlterTableDataTypeChangeCommand.scala | 5 ++++-
.../schema/CarbonAlterTableDropColumnCommand.scala | 5 ++++-
.../schema/CarbonAlterTableRenameCommand.scala | 6 +++---
.../schema/CarbonAlterTableSetCommand.scala | 4 +++-
.../schema/CarbonAlterTableUnsetCommand.scala | 4 +++-
.../spark/sql/hive/CarbonHiveMetaStore.scala | 4 ++--
.../org/apache/spark/util/AlterTableUtil.scala | 17 ++++++++---------
.../src/main/spark2.1/CarbonSessionState.scala | 9 +++++++++
.../src/main/spark2.2/CarbonSessionState.scala | 10 ++++++++++
.../spark/sql/common/util/Spark2QueryTest.scala | 6 +++---
12 files changed, 55 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index d482c1d..d80efb8 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.command.LoadDataCommand
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog}
import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
import org.scalatest.Suite
@@ -138,8 +138,8 @@ class QueryTest extends PlanTest with Suite {
val resourcesPath = TestQueryExecutor.resourcesPath
- val hiveClient = sqlContext.sparkSession.asInstanceOf[CarbonSession].sharedState.
- externalCatalog.asInstanceOf[HiveExternalCatalog].client
+ val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .getClient();
}
object QueryTest {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index c8f998b..f3f01bb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -91,7 +92,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
AlterTableUtil
.updateSchemaInfo(carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- thriftTable)(sparkSession)
+ thriftTable)(sparkSession,
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
new AlterTableAddColumnPostEvent(sparkSession,
carbonTable, alterTableAddColumnsModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index dcee7c3..9bea935 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -96,7 +97,9 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
.setTime_stamp(System.currentTimeMillis)
- AlterTableUtil.updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,
alterTableDataTypeChangeModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index c5924b6..0319d9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, MetadataCommand}
+import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -116,7 +117,9 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
timeStamp = System.currentTimeMillis
val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
- AlterTableUtil.updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession)
+ AlterTableUtil
+ .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
// TODO: 1. add check for deletion of index tables
// delete dictionary files for dictionary column and clear dictionary cache from memory
new AlterTableDropColumnRDD(sparkSession.sparkContext,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 9cf36fe..8ebaea1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.schema
import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
-import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -109,8 +109,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
- val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
+ val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .getClient()
hiveClient.runSqlHive(
s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
hiveClient.runSqlHive(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
index ffd69df..51c0e6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.schema
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
private[sql] case class CarbonAlterTableSetCommand(
@@ -37,7 +38,8 @@ private[sql] case class CarbonAlterTableSetCommand(
tableIdentifier,
properties,
Nil,
- set = true)(sparkSession)
+ set = true)(sparkSession,
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
index 490dd61..5be5b2c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.schema
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -38,7 +39,8 @@ private[sql] case class CarbonAlterTableUnsetCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String],
- propKeys, false)(sparkSession)
+ propKeys, false)(sparkSession,
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index a41c51e..4b33c40 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -169,8 +169,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val dbName = oldTableIdentifier.getDatabaseName
val tableName = oldTableIdentifier.getTableName
val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
- val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
+ val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .getClient()
hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 58b3362..e757836 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -127,11 +127,11 @@ object AlterTableUtil {
* @param schemaEvolutionEntry
* @param thriftTable
* @param sparkSession
- * @param sessionState
+ * @param catalog
*/
def updateSchemaInfo(carbonTable: CarbonTable,
schemaEvolutionEntry: SchemaEvolutionEntry,
- thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
+ thriftTable: TableInfo)(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -145,8 +145,7 @@ object AlterTableUtil {
val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession).schema.json
val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
- val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
+ val hiveClient = catalog.getClient();
hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
}
@@ -326,11 +325,11 @@ object AlterTableUtil {
* @param propKeys
* @param set
* @param sparkSession
- * @param sessionState
+ * @param catalog
*/
def modifyTableProperties(tableIdentifier: TableIdentifier, properties: Map[String, String],
- propKeys: Seq[String], set: Boolean)
- (sparkSession: SparkSession): Unit = {
+ propKeys: Seq[String], set: Boolean)
+ (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName")
@@ -385,7 +384,7 @@ object AlterTableUtil {
updateSchemaInfo(carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- thriftTable)(sparkSession)
+ thriftTable)(sparkSession, catalog)
LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index 00de331..7113e63 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -125,6 +125,15 @@ class CarbonSessionCatalog(
}
isRefreshed
}
+
+ /**
+ * returns hive client from session state
+ *
+ * @return
+ */
+ def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index 87aebc0..61149eb 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -123,6 +123,16 @@ class CarbonSessionCatalog(
rtnRelation
}
}
+
+ /**
+ * returns hive client from HiveExternalCatalog
+ *
+ * @return
+ */
+ def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
+ .asInstanceOf[HiveExternalCatalog].client
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
index 73bcddd..169bcf2 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala
@@ -18,13 +18,13 @@
package org.apache.spark.sql.common.util
import org.apache.spark.sql.CarbonSession
-import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog}
import org.apache.spark.sql.test.util.QueryTest
class Spark2QueryTest extends QueryTest {
- val hiveClient = sqlContext.sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
- .asInstanceOf[HiveExternalCatalog].client
+ val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .getClient()
}
\ No newline at end of file