You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/20 15:09:37 UTC
carbondata git commit: [CARBONDATA-2312]Support In Memory Catalog
Repository: carbondata
Updated Branches:
refs/heads/master a9d5e9dec -> e6d03d112
[CARBONDATA-2312]Support In Memory Catalog
Support Storing Catalog in memory(not in hive) for each session, after session restart user can create eternal table and run select query
This closes #2103
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e6d03d11
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e6d03d11
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e6d03d11
Branch: refs/heads/master
Commit: e6d03d11212f8c5c66052b0f6e97144cf327bea6
Parents: a9d5e9d
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Apr 4 12:03:47 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Apr 20 23:09:09 2018 +0800
----------------------------------------------------------------------
.../spark/sql/common/util/QueryTest.scala | 2 +-
.../spark/util/CarbonReflectionUtils.scala | 25 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 5 +-
.../org/apache/spark/sql/CarbonSession.scala | 23 +-
.../CarbonAlterTableCompactionCommand.scala | 5 +-
.../CarbonAlterTableAddColumnCommand.scala | 11 +-
.../CarbonAlterTableDataTypeChangeCommand.scala | 11 +-
.../CarbonAlterTableDropColumnCommand.scala | 17 +-
.../schema/CarbonAlterTableRenameCommand.scala | 40 +--
.../schema/CarbonAlterTableSetCommand.scala | 4 +-
.../schema/CarbonAlterTableUnsetCommand.scala | 4 +-
.../spark/sql/hive/CarbonHiveMetaStore.scala | 9 +-
.../spark/sql/hive/CarbonSessionCatalog.scala | 56 +++-
.../org/apache/spark/util/AlterTableUtil.scala | 27 +-
.../spark/sql/hive/CarbonSessionState.scala | 50 +++-
.../apache/spark/sql/hive/CarbonAnalyzer.scala | 34 +++
.../sql/hive/CarbonInMemorySessionState.scala | 276 +++++++++++++++++++
.../apache/spark/sql/hive/CarbonOptimizer.scala | 77 ++++++
.../spark/sql/hive/CarbonSessionState.scala | 231 ++++------------
.../spark/sql/hive/CarbonSqlAstBuilder.scala | 124 +++++++++
20 files changed, 777 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 9c5bc38..d45e759 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.command.LoadDataCommand
-import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonSessionCatalog}
import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor}
import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext}
import org.scalatest.Suite
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 69eb021..4264aa1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -194,19 +194,30 @@ object CarbonReflectionUtils {
}
}
- def getSessionState(sparkContext: SparkContext, carbonSession: Object): Any = {
+ def getSessionState(sparkContext: SparkContext,
+ carbonSession: Object,
+ useHiveMetaStore: Boolean): Any = {
if (SPARK_VERSION.startsWith("2.1")) {
val className = sparkContext.conf.get(
CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
"org.apache.spark.sql.hive.CarbonSessionState")
createObject(className, carbonSession)._1
} else if (SPARK_VERSION.startsWith("2.2")) {
- val className = sparkContext.conf.get(
- CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
- "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
- val tuple = createObject(className, carbonSession, None)
- val method = tuple._2.getMethod("build")
- method.invoke(tuple._1)
+ if (useHiveMetaStore) {
+ val className = sparkContext.conf.get(
+ CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+ "org.apache.spark.sql.hive.CarbonSessionStateBuilder")
+ val tuple = createObject(className, carbonSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
+ } else {
+ val className = sparkContext.conf.get(
+ CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
+ "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder")
+ val tuple = createObject(className, carbonSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
+ }
} else {
throw new UnsupportedOperationException("Spark version not supported")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 593ecce..00e0aed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -23,6 +23,7 @@ import scala.util.Try
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
import org.apache.spark.sql.hive.{HiveSessionCatalog, _}
@@ -123,7 +124,7 @@ object CarbonEnv {
def getInstance(sparkSession: SparkSession): CarbonEnv = {
if (sparkSession.isInstanceOf[CarbonSession]) {
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv()
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv
} else {
var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
if (carbonEnv == null) {
@@ -249,7 +250,7 @@ object CarbonEnv {
*/
def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
var databaseLocation =
- sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
+ sparkSession.sessionState.catalog.asInstanceOf[SessionCatalog].getDatabaseMetadata(dbName)
.locationUri.toString
// for default database and db ends with .db
// check whether the carbon store and hive store is same or different.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 7ee3038..1d5a82d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -43,7 +43,8 @@ import org.apache.carbondata.streaming.CarbonStreamingQueryListener
* User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session.
*/
class CarbonSession(@transient val sc: SparkContext,
- @transient private val existingSharedState: Option[SharedState]
+ @transient private val existingSharedState: Option[SharedState],
+ @transient useHiveMetaStore: Boolean = true
) extends SparkSession(sc) { self =>
def this(sc: SparkContext) {
@@ -51,8 +52,10 @@ class CarbonSession(@transient val sc: SparkContext,
}
@transient
- override lazy val sessionState: SessionState =
- CarbonReflectionUtils.getSessionState(sparkContext, this).asInstanceOf[SessionState]
+ override lazy val sessionState: SessionState = {
+ CarbonReflectionUtils.getSessionState(sparkContext, this, useHiveMetaStore)
+ .asInstanceOf[SessionState]
+ }
/**
* State shared across sessions, including the `SparkContext`, cached data, listener,
@@ -74,7 +77,7 @@ class CarbonSession(@transient val sc: SparkContext,
}
override def newSession(): SparkSession = {
- new CarbonSession(sparkContext, Some(sharedState))
+ new CarbonSession(sparkContext, Some(sharedState), useHiveMetaStore)
}
override def sql(sqlText: String): DataFrame = {
@@ -116,10 +119,16 @@ object CarbonSession {
private val statementId = new AtomicLong(0)
+ private var enableInMemCatlog: Boolean = false
+
private[sql] val threadStatementId = new ThreadLocal[Long]()
implicit class CarbonBuilder(builder: Builder) {
+ def enableInMemoryCatalog(): Builder = {
+ enableInMemCatlog = true
+ builder
+ }
def getOrCreateCarbonSession(): SparkSession = {
getOrCreateCarbonSession(null, null)
}
@@ -132,7 +141,9 @@ object CarbonSession {
def getOrCreateCarbonSession(storePath: String,
metaStorePath: String): SparkSession = synchronized {
- builder.enableHiveSupport()
+ if (!enableInMemCatlog) {
+ builder.enableHiveSupport()
+ }
val options =
getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]]
val userSuppliedContext: Option[SparkContext] =
@@ -205,7 +216,7 @@ object CarbonSession {
sc
}
- session = new CarbonSession(sparkContext)
+ session = new CarbonSession(sparkContext, None, !enableInMemCatlog)
val carbonProperties = CarbonProperties.getInstance()
if (storePath != null) {
carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index a7b5f7e..c462c9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
+import org.apache.spark.sql.hive.{CarbonRelation}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.AlterTableUtil
@@ -299,8 +299,7 @@ case class CarbonAlterTableCompactionCommand(
tableIdentifier,
Map("streaming" -> "false"),
Seq.empty,
- true)(sparkSession,
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ true)(sparkSession)
// 5. remove checkpoint
FileFactory.deleteAllFilesOfDir(
new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 4962c3a..d33fc6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -85,11 +85,14 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
schemaEvolutionEntry.setAdded(newCols.toList.asJava)
val thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- AlterTableUtil
- .updateSchemaInfo(carbonTable,
+ val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+ carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- thriftTable)(sparkSession,
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ thriftTable,
+ Some(newCols))(sparkSession)
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
+ tableIdentifier, schemaParts, cols)
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
new AlterTableAddColumnPostEvent(sparkSession,
carbonTable, alterTableAddColumnsModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index ff17cfd..accaa27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
@@ -96,9 +97,13 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
.setTime_stamp(System.currentTimeMillis)
- AlterTableUtil.updateSchemaInfo(
- carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
+ val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+ carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession)
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .alterColumnChangeDataType(tableIdentifier, schemaParts, cols)
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,
alterTableDataTypeChangeModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index a64bdb9..ff1541b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -27,9 +27,9 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -116,9 +116,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
timeStamp = System.currentTimeMillis
val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
- AlterTableUtil
- .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession,
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val delCols = deletedColumnSchema.map { deleteCols =>
+ schemaConverter.fromExternalToWrapperColumnSchema(deleteCols)
+ }
+ val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo(
+ carbonTable,
+ schemaEvolutionEntry,
+ tableInfo,
+ Some(delCols))(sparkSession)
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .alterDropColumns(tableIdentifier, schemaParts, cols)
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
// TODO: 1. add check for deletion of index tables
// delete dictionary files for dictionary column and clear dictionary cache from memory
new AlterTableDropColumnRDD(sparkSession.sparkContext,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index e349e93..af52d6b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -125,26 +125,21 @@ private[sql] case class CarbonAlterTableRenameCommand(
val fileType = FileFactory.getFileType(tableMetadataFile)
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
+ val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName))
+ val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
var newTablePath = CarbonTablePath.getNewTablePath(
oldTableIdentifier.getTablePath, newTableIdentifier.getTableName)
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
- val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
- .getClient()
var partitions: Seq[CatalogTablePartition] = Seq.empty
if (carbonTable.isHivePartitionTable) {
partitions =
- sparkSession.sessionState.catalog.listPartitions(
- TableIdentifier(oldTableName, Some(oldDatabaseName)))
+ sparkSession.sessionState.catalog.listPartitions(oldIdentifier)
}
- sparkSession.catalog.refreshTable(TableIdentifier(oldTableName,
- Some(oldDatabaseName)).quotedString)
- hiveClient.runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
- hiveClient.runSqlHive(
- s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
- s"('tableName'='$newTableName', " +
- s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
-
+ sparkSession.catalog.refreshTable(oldIdentifier.quotedString)
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename(
+ oldIdentifier,
+ newIdentifier,
+ newTablePath)
// changed the rename order to deal with situation when carbon table and hive table
// will point to the same tablePath
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -160,16 +155,19 @@ private[sql] case class CarbonAlterTableRenameCommand(
partitions,
oldTableIdentifier.getTablePath,
newTablePath,
- sparkSession)
+ sparkSession,
+ newIdentifier.table,
+ oldDatabaseName)
- val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
// Update the storage location with new path
sparkSession.sessionState.catalog.alterTable(
catalogTable.copy(storage = sparkSession.sessionState.catalog.
asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
new Path(newTablePath),
- catalogTable.storage)))
+ catalogTable.storage,
+ newIdentifier.table,
+ oldDatabaseName)))
if (updatedParts.nonEmpty) {
// Update the new updated partitions specs with new location.
sparkSession.sessionState.catalog.alterPartitions(
@@ -232,14 +230,20 @@ private[sql] case class CarbonAlterTableRenameCommand(
partitions: Seq[CatalogTablePartition],
oldTablePath: String,
newTablePath: String,
- sparkSession: SparkSession): Seq[CatalogTablePartition] = {
+ sparkSession: SparkSession,
+ newTableName: String,
+ dbName: String): Seq[CatalogTablePartition] = {
partitions.map{ part =>
if (part.storage.locationUri.isDefined) {
val path = new Path(part.location)
if (path.toString.contains(oldTablePath)) {
val newPath = new Path(path.toString.replace(oldTablePath, newTablePath))
part.copy(storage = sparkSession.sessionState.catalog.
- asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, part.storage))
+ asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
+ newPath,
+ part.storage,
+ newTableName,
+ dbName))
} else {
part
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
index 51c0e6e..ffd69df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.schema
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
private[sql] case class CarbonAlterTableSetCommand(
@@ -38,8 +37,7 @@ private[sql] case class CarbonAlterTableSetCommand(
tableIdentifier,
properties,
Nil,
- set = true)(sparkSession,
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ set = true)(sparkSession)
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
index 2490f9e..d5bdd80 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.schema
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonSessionCatalog
import org.apache.spark.util.AlterTableUtil
@@ -37,8 +36,7 @@ private[sql] case class CarbonAlterTableUnsetCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String],
- propKeys, false)(sparkSession,
- sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ propKeys, false)(sparkSession)
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 96ef473..76aa73e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -18,8 +18,7 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonSession, SparkSession}
+import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -168,10 +167,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val dbName = newTableIdentifier.getDatabaseName
val tableName = newTableIdentifier.getTableName
val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
- val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
- .getClient()
- hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)")
-
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
+ .alterTable(TableIdentifier(tableName, Some(dbName)), schemaParts, None)
sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
removeTableFromMetadata(dbName, tableName)
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 1c69309..f00739e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -63,5 +63,59 @@ trait CarbonSessionCatalog {
/**
* Update the storageformat with new location information
*/
- def updateStorageLocation(path: Path, storage: CatalogStorageFormat): CatalogStorageFormat
+ def updateStorageLocation(
+ path: Path,
+ storage: CatalogStorageFormat,
+ newTableName: String,
+ dbName: String): CatalogStorageFormat
+
+ /**
+ * Method used to update the table name
+ * @param oldTableIdentifier old table identifier
+ * @param newTableIdentifier new table identifier
+ * @param newTablePath new table path
+ */
+ def alterTableRename(oldTableIdentifier: TableIdentifier,
+ newTableIdentifier: TableIdentifier,
+ newTablePath: String): Unit
+
+ /**
+ * Below method will be used to update serd properties
+ * @param tableIdentifier table identifier
+ * @param schemaParts schema parts
+ * @param cols cols
+ */
+ def alterTable(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
+
+ /**
+ * Below method will be used to add new column
+ * @param tableIdentifier table identifier
+ * @param schemaParts schema parts
+ * @param cols cols
+ */
+ def alterAddColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
+
+ /**
+ * Below method will be used to drop column
+ * @param tableIdentifier table identifier
+ * @param schemaParts schema parts
+ * @param cols cols
+ */
+ def alterDropColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
+
+ /**
+ * Below method will be used to alter data type of column in schema
+ * @param tableIdentifier table identifier
+ * @param schemaParts schema parts
+ * @param cols cols
+ */
+ def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 1c6756b..ac2bf9f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -21,7 +21,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
-import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -36,9 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
object AlterTableUtil {
@@ -127,11 +124,16 @@ object AlterTableUtil {
* @param schemaEvolutionEntry
* @param thriftTable
* @param sparkSession
- * @param catalog
*/
def updateSchemaInfo(carbonTable: CarbonTable,
schemaEvolutionEntry: SchemaEvolutionEntry,
- thriftTable: TableInfo)(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
+ thriftTable: TableInfo,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]] =
+ None)
+ (sparkSession: SparkSession):
+ (TableIdentifier,
+ String,
+ Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) = {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -145,9 +147,7 @@ object AlterTableUtil {
val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession).schema.json
val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
- val hiveClient = catalog.getClient();
- hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)")
- sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ (tableIdentifier, schemaParts, cols)
}
/**
@@ -306,11 +306,10 @@ object AlterTableUtil {
* @param propKeys
* @param set
* @param sparkSession
- * @param catalog
*/
def modifyTableProperties(tableIdentifier: TableIdentifier, properties: Map[String, String],
propKeys: Seq[String], set: Boolean)
- (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
+ (sparkSession: SparkSession): Unit = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName")
@@ -360,10 +359,12 @@ object AlterTableUtil {
}
}
}
-
- updateSchemaInfo(carbonTable,
+ val (tableIdentifier, schemParts, cols) = updateSchemaInfo(carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
- thriftTable)(sparkSession, catalog)
+ thriftTable)(sparkSession)
+ sparkSession.asInstanceOf[CarbonSession].sessionState.catalog
+ .asInstanceOf[CarbonSessionCatalog].alterTable(tableIdentifier, schemParts, cols)
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index c37e6fa..9fe7241 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -72,7 +72,7 @@ class CarbonHiveSessionCatalog(
conf,
hadoopConf) with CarbonSessionCatalog {
- lazy val carbonEnv = {
+ private lazy val carbonEnv = {
val env = new CarbonEnv
env.init(sparkSession)
env
@@ -84,6 +84,50 @@ class CarbonHiveSessionCatalog(
override def getCarbonEnv() : CarbonEnv = {
carbonEnv
}
+
+ def alterTableRename(oldTableIdentifier: TableIdentifier,
+ newTableIdentifier: TableIdentifier,
+ newTablePath: String): Unit = {
+ getClient().runSqlHive(
+ s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table }" +
+ s" RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+ getClient().runSqlHive(
+ s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }" +
+ s" SET SERDEPROPERTIES" +
+ s"('tableName'='${ newTableIdentifier.table }', " +
+ s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+ }
+
+ def alterTable(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ getClient()
+ .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${tableIdentifier.table } " +
+ s"SET TBLPROPERTIES(${ schemaParts })")
+ }
+
+ def alterAddColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ def alterDropColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
// Initialize all listeners to the Operation bus.
CarbonEnv.init(sparkSession)
@@ -195,7 +239,9 @@ class CarbonHiveSessionCatalog(
*/
override def updateStorageLocation(
path: Path,
- storage: CatalogStorageFormat): CatalogStorageFormat = {
+ storage: CatalogStorageFormat,
+ newTableName: String,
+ dbName: String): CatalogStorageFormat = {
storage.copy(locationUri = Some(path.toString))
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
new file mode 100644
index 0000000..88beb68
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Analyzer
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.internal.SQLConf
+
+class CarbonAnalyzer(catalog: SessionCatalog,
+ conf: SQLConf,
+ sparkSession: SparkSession,
+ analyzer: Analyzer) extends Analyzer(catalog, conf) {
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ var logicalPlan = analyzer.execute(plan)
+ logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
+ CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
new file mode 100644
index 0000000..e8ab84a
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
+import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder}
+import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class InMemorySessionCatalog(
+ externalCatalog: ExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ functionRegistry: FunctionRegistry,
+ sparkSession: SparkSession,
+ conf: SQLConf,
+ hadoopConf: Configuration,
+ parser: ParserInterface,
+ functionResourceLoader: FunctionResourceLoader)
+ extends SessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ functionRegistry,
+ conf,
+ hadoopConf,
+ parser,
+ functionResourceLoader
+ ) with CarbonSessionCatalog {
+
+ override def alterTableRename(oldTableIdentifier: TableIdentifier,
+ newTableIdentifier: TableIdentifier,
+ newTablePath: String): Unit = {
+ sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier)
+ }
+
+ override def alterTable(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ // NOt Required in case of In-memory catalog
+ }
+
+ override def alterAddColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+ val structType = catalogTable.schema
+ var newStructType = structType
+ newColumns.get.foreach {cols =>
+ newStructType = structType
+ .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType))
+ }
+ alterSchema(newStructType, catalogTable, tableIdentifier)
+ }
+
+ override def alterDropColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+ val fields = catalogTable.schema.fields.filterNot { field =>
+ dropCols.get.exists { col =>
+ col.getColumnName.equalsIgnoreCase(field.name)
+ }
+ }
+ alterSchema(new StructType(fields), catalogTable, tableIdentifier)
+ }
+
+ override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier)
+ val a = catalogTable.schema.fields.flatMap { field =>
+ columns.get.map { col =>
+ if (col.getColumnName.equalsIgnoreCase(field.name)) {
+ StructField(col.getColumnName,
+ CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType))
+ } else {
+ field
+ }
+ }
+ }
+ alterSchema(new StructType(a), catalogTable, tableIdentifier)
+ }
+
+ private def alterSchema(structType: StructType,
+ catalogTable: CatalogTable,
+ tableIdentifier: TableIdentifier): Unit = {
+ val copy = catalogTable.copy(schema = structType)
+ sparkSession.sessionState.catalog.alterTable(copy)
+ sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
+ }
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+ def getCarbonEnv() : CarbonEnv = {
+ carbonEnv
+ }
+
+ // Initialize all listeners to the Operation bus.
+ CarbonEnv.initListeners()
+
+ def getThriftTableInfo(tablePath: String): TableInfo = {
+ val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
+ CarbonUtil.readSchemaFile(tableMetadataFile)
+ }
+
+ override def lookupRelation(name: TableIdentifier): LogicalPlan = {
+ val rtnRelation = super.lookupRelation(name)
+ val isRelationRefreshed =
+ CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession)
+ if (isRelationRefreshed) {
+ super.lookupRelation(name)
+ } else {
+ rtnRelation
+ }
+ }
+
+ /**
+ * returns hive client from HiveExternalCatalog
+ *
+ * @return
+ */
+ def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
+ null
+ }
+
+ override def createPartitions(
+ tableName: TableIdentifier,
+ parts: Seq[CatalogTablePartition],
+ ignoreIfExists: Boolean): Unit = {
+ try {
+ val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+ super.createPartitions(tableName, updatedParts, ignoreIfExists)
+ } catch {
+ case e: Exception =>
+ super.createPartitions(tableName, parts, ignoreIfExists)
+ }
+ }
+
+ /**
+ * This is alternate way of getting partition information. It first fetches all partitions from
+ * hive and then apply filter instead of querying hive along with filters.
+ * @param partitionFilters
+ * @param sparkSession
+ * @param identifier
+ * @return
+ */
+ override def getPartitionsAlternate(partitionFilters: Seq[Expression],
+ sparkSession: SparkSession,
+ identifier: TableIdentifier) = {
+ CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+ }
+
+ /**
+ * Update the storageformat with new location information
+ */
+ override def updateStorageLocation(
+ path: Path,
+ storage: CatalogStorageFormat,
+ newTableName: String,
+ dbName: String): CatalogStorageFormat = {
+ storage.copy(locationUri = Some(path.toUri))
+ }
+}
+
+class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession,
+ parentState: Option[SessionState] = None)
+ extends SessionStateBuilder(sparkSession, parentState) {
+
+ override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
+
+ experimentalMethods.extraStrategies =
+ Seq(new StreamingTableStrategy(sparkSession),
+ new CarbonLateDecodeStrategy,
+ new DDLStrategy(sparkSession)
+ )
+ experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
+ new CarbonUDFTransformRule,
+ new CarbonLateDecodeRule)
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ override protected lazy val catalog: InMemorySessionCatalog = {
+ val catalog = new InMemorySessionCatalog(
+ externalCatalog,
+ session.sharedState.globalTempViewManager,
+ functionRegistry,
+ sparkSession,
+ conf,
+ SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
+ sqlParser,
+ resourceLoader)
+ parentState.foreach(_.catalog.copyStateTo(catalog))
+ catalog
+ }
+
+ private def externalCatalog: ExternalCatalog =
+ session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog]
+
+ override protected lazy val resourceLoader: SessionResourceLoader = {
+ new SessionResourceLoader(session)
+ }
+
+ override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
+
+ override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession,
+ new Analyzer(catalog, conf) {
+ override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
+ new FindDataSourceTable(session) +:
+ new ResolveSQLOnFile(session) +:
+ new CarbonIUDAnalysisRule(sparkSession) +:
+ new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules
+ override val extendedCheckRules: Seq[LogicalPlan => Unit] =
+ PreWriteCheck :: HiveOnlyCheck :: Nil
+ override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
+ PreprocessTableCreation(session) +:
+ PreprocessTableInsertion(conf) +:
+ DataSourceAnalysis(conf) +:
+ customPostHocResolutionRules
+ }
+ )
+ override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _)
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
new file mode 100644
index 0000000..a046763
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+
+
+class CarbonOptimizer(
+ catalog: SessionCatalog,
+ conf: SQLConf,
+ experimentalMethods: ExperimentalMethods)
+ extends SparkOptimizer(catalog, conf, experimentalMethods) {
+
+ override def execute(plan: LogicalPlan): LogicalPlan = {
+ val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
+ super.execute(transFormedPlan)
+ }
+}
+
+object CarbonOptimizerUtil {
+ def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
+ // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
+ // optimize whole plan at once.
+ val transFormedPlan = plan.transform {
+ case filter: Filter =>
+ filter.transformExpressions {
+ case s: ScalarSubquery =>
+ val tPlan = s.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ ScalarSubquery(tPlan, s.children, s.exprId)
+ case e: Exists =>
+ val tPlan = e.plan.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
+
+ case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
+ val tPlan = sub.transform {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
+ lr
+ }
+ In(value, Seq(ListQuery(tPlan, l.children, exprId)))
+ }
+ }
+ transFormedPlan
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 479c9ce..de37a35 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -22,29 +22,18 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
-import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
-import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _}
+import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _}
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
-import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder}
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{SQLConf, SessionState}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
-import org.apache.spark.sql.types.DecimalType
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
@@ -79,7 +68,7 @@ class CarbonHiveSessionCatalog(
functionResourceLoader
) with CarbonSessionCatalog {
- lazy val carbonEnv = {
+ private lazy val carbonEnv = {
val env = new CarbonEnv
env.init(sparkSession)
env
@@ -95,9 +84,6 @@ class CarbonHiveSessionCatalog(
// Initialize all listeners to the Operation bus.
CarbonEnv.initListeners()
-
-
-
override def lookupRelation(name: TableIdentifier): LogicalPlan = {
val rtnRelation = super.lookupRelation(name)
val isRelationRefreshed =
@@ -119,6 +105,49 @@ class CarbonHiveSessionCatalog(
.asInstanceOf[HiveExternalCatalog].client
}
+ def alterTableRename(oldTableIdentifier: TableIdentifier,
+ newTableIdentifier: TableIdentifier,
+ newTablePath: String): Unit = {
+ getClient().runSqlHive(
+ s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
+ s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }")
+ getClient().runSqlHive(
+ s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " +
+ s"SET SERDEPROPERTIES" +
+ s"('tableName'='${ newTableIdentifier.table }', " +
+ s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
+ }
+
+ override def alterTable(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ getClient()
+ .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " +
+ s"SET TBLPROPERTIES(${ schemaParts })")
+ }
+
+ override def alterAddColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ override def alterDropColumns(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
+ override def alterColumnChangeDataType(tableIdentifier: TableIdentifier,
+ schemaParts: String,
+ cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]])
+ : Unit = {
+ alterTable(tableIdentifier, schemaParts, cols)
+ }
+
override def createPartitions(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
@@ -152,24 +181,13 @@ class CarbonHiveSessionCatalog(
*/
override def updateStorageLocation(
path: Path,
- storage: CatalogStorageFormat): CatalogStorageFormat = {
+ storage: CatalogStorageFormat,
+ newTableName: String,
+ dbName: String): CatalogStorageFormat = {
storage.copy(locationUri = Some(path.toUri))
}
}
-
-class CarbonAnalyzer(catalog: SessionCatalog,
- conf: SQLConf,
- sparkSession: SparkSession,
- analyzer: Analyzer) extends Analyzer(catalog, conf) {
- override def execute(plan: LogicalPlan): LogicalPlan = {
- var logicalPlan = analyzer.execute(plan)
- logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan)
- CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan)
- }
-}
-
-
/**
* Session state implementation to override sql parser and adding strategies
*
@@ -194,7 +212,7 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
* Internal catalog for managing table and database states.
*/
/**
- * Create a [[CarbonSessionCatalogBuild]].
+ * Create a [[CarbonSessionStateBuilder]].
*/
override protected lazy val catalog: CarbonHiveSessionCatalog = {
val catalog = new CarbonHiveSessionCatalog(
@@ -248,147 +266,4 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
)
override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _)
-
-}
-
-
-class CarbonOptimizer(
- catalog: SessionCatalog,
- conf: SQLConf,
- experimentalMethods: ExperimentalMethods)
- extends SparkOptimizer(catalog, conf, experimentalMethods) {
-
- override def execute(plan: LogicalPlan): LogicalPlan = {
- val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan)
- super.execute(transFormedPlan)
- }
-}
-
-object CarbonOptimizerUtil {
- def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = {
- // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And
- // optimize whole plan at once.
- val transFormedPlan = plan.transform {
- case filter: Filter =>
- filter.transformExpressions {
- case s: ScalarSubquery =>
- val tPlan = s.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- ScalarSubquery(tPlan, s.children, s.exprId)
- case e: Exists =>
- val tPlan = e.plan.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- Exists(tPlan, e.children.map(_.canonicalized), e.exprId)
-
- case In(value, Seq(l@ListQuery(sub, _, exprId))) =>
- val tPlan = sub.transform {
- case lr: LogicalRelation
- if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true
- lr
- }
- In(value, Seq(ListQuery(tPlan, l.children, exprId)))
- }
- }
- transFormedPlan
- }
-}
-
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
- extends SparkSqlAstBuilder(conf) {
-
- val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
-
- override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
- val fileStorage = helper.getFileStorage(ctx.createFileFormat)
-
- if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("carbondata") ||
- fileStorage.equalsIgnoreCase("'carbonfile'") ||
- fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
- val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
- ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
- Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
- helper.createCarbonTable(createTableTuple)
- } else {
- super.visitCreateHiveTable(ctx)
- }
- }
-
- override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
-
- val newColumn = visitColType(ctx.colType)
- if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
- throw new MalformedCarbonCommandException(
- "Column names provided are different. Both the column names should be same")
- }
-
- val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
- case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
- case _ => (newColumn.dataType.typeName.toLowerCase, None)
- }
-
- val alterTableChangeDataTypeModel =
- AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
- new CarbonSpark2SqlParser()
- .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
- ctx.tableIdentifier().table.getText.toLowerCase,
- ctx.identifier.getText.toLowerCase,
- newColumn.name.toLowerCase)
-
- CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
- }
-
-
- override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
-
- val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
- val fields = parser.getFields(cols)
- val tblProperties = scala.collection.mutable.Map.empty[String, String]
- val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
- new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
- .map(_.getText)),
- ctx.tableIdentifier.table.getText.toLowerCase,
- fields,
- Seq.empty,
- tblProperties,
- None,
- true)
-
- val alterTableAddColumnsModel = AlterTableAddColumnsModel(
- Option(ctx.tableIdentifier().db).map(_.getText),
- ctx.tableIdentifier.table.getText,
- tblProperties.toMap,
- tableModel.dimCols,
- tableModel.msrCols,
- tableModel.highcardinalitydims.getOrElse(Seq.empty))
-
- CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
- }
-
- override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
- super.visitCreateTable(ctx)
- }
-
- override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
- withOrigin(ctx) {
- if (CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
- CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
- super.visitShowTables(ctx)
- } else {
- CarbonShowTablesCommand(
- Option(ctx.db).map(_.getText),
- Option(ctx.pattern).map(string))
- }
- }
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
new file mode 100644
index 0000000..b0702ae
--- /dev/null
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkSqlAstBuilder
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel}
+import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand}
+import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser}
+import org.apache.spark.sql.types.DecimalType
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession)
+ extends SparkSqlAstBuilder(conf) {
+
+ val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
+
+ override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = {
+ val fileStorage = helper.getFileStorage(ctx.createFileFormat)
+
+ if (fileStorage.equalsIgnoreCase("'carbondata'") ||
+ fileStorage.equalsIgnoreCase("carbondata") ||
+ fileStorage.equalsIgnoreCase("'carbonfile'") ||
+ fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
+ val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
+ ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
+ Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage)
+ helper.createCarbonTable(createTableTuple)
+ } else {
+ super.visitCreateHiveTable(ctx)
+ }
+ }
+
+ override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = {
+
+ val newColumn = visitColType(ctx.colType)
+ if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) {
+ throw new MalformedCarbonCommandException(
+ "Column names provided are different. Both the column names should be same")
+ }
+
+ val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match {
+ case d:DecimalType => ("decimal", Some(List((d.precision, d.scale))))
+ case _ => (newColumn.dataType.typeName.toLowerCase, None)
+ }
+
+ val alterTableChangeDataTypeModel =
+ AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values),
+ new CarbonSpark2SqlParser()
+ .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)),
+ ctx.tableIdentifier().table.getText.toLowerCase,
+ ctx.identifier.getText.toLowerCase,
+ newColumn.name.toLowerCase)
+
+ CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel)
+ }
+
+
+ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = {
+ val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList)
+ val fields = parser.getFields(cols)
+ val tblProperties = scala.collection.mutable.Map.empty[String, String]
+ val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db)
+ .map(_.getText)),
+ ctx.tableIdentifier.table.getText.toLowerCase,
+ fields,
+ Seq.empty,
+ tblProperties,
+ None,
+ true)
+
+ val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+ Option(ctx.tableIdentifier().db).map(_.getText),
+ ctx.tableIdentifier.table.getText,
+ tblProperties.toMap,
+ tableModel.dimCols,
+ tableModel.msrCols,
+ tableModel.highcardinalitydims.getOrElse(Seq.empty))
+
+ CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel)
+ }
+
+ override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ super.visitCreateTable(ctx)
+ }
+
+ override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = {
+ withOrigin(ctx) {
+ if (CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS,
+ CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) {
+ super.visitShowTables(ctx)
+ } else {
+ CarbonShowTablesCommand(
+ Option(ctx.db).map(_.getText),
+ Option(ctx.pattern).map(string))
+ }
+ }
+ }
+}