You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/04/20 15:13:11 UTC
[1/2] incubator-carbondata git commit: changed CarbonEnv object to
class and used sparksession to fetch the carbon catalog which will be used in
that spark session.
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 804af9303 -> 9c575bf76
changed CarbonEnv object to class and used sparksession to fetch the carbon catalog which will be used in that spark session.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/049b8aac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/049b8aac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/049b8aac
Branch: refs/heads/master
Commit: 049b8aacc95b8a2468990d0a96b819230ae08546
Parents: 804af93
Author: nareshpr <pr...@gmail.com>
Authored: Thu Apr 13 13:03:54 2017 +0530
Committer: nareshpr <pr...@gmail.com>
Committed: Thu Apr 20 19:41:36 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/CarbonDataRDDFactory.scala | 26 ++---
.../spark/sql/CarbonDataFrameWriter.scala | 2 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 16 ++-
.../spark/sql/CarbonDictionaryDecoder.scala | 12 ++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 34 +++++--
.../org/apache/spark/sql/CarbonSession.scala | 2 -
.../org/apache/spark/sql/CarbonSource.scala | 36 +++----
.../execution/CarbonLateDecodeStrategy.scala | 5 +-
.../execution/command/AlterTableCommands.scala | 40 +++++---
.../execution/command/CarbonHiveCommands.scala | 4 +-
.../sql/execution/command/DDLStrategy.scala | 26 +++--
.../execution/command/carbonTableSchema.scala | 71 +++++++------
.../spark/sql/hive/CarbonSessionState.scala | 102 +++++++++++++++++--
.../org/apache/spark/util/AlterTableUtil.scala | 56 +++++-----
.../org/apache/spark/util/CleanFiles.scala | 2 +-
.../org/apache/spark/util/Compaction.scala | 2 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 2 +-
.../apache/spark/util/DeleteSegmentById.scala | 2 +-
.../org/apache/spark/util/ShowSegments.scala | 2 +-
.../org/apache/spark/util/TableAPIUtil.scala | 3 +-
.../org/apache/spark/util/TableLoader.scala | 2 +-
21 files changed, 292 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ca96a17..631b2a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -98,7 +98,7 @@ object CarbonDataRDDFactory {
LOGGER.audit(s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val tableCreationTime = CarbonEnv.get.carbonMetastore
+ val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
.getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
if (null == carbonLoadModel.getLoadMetadataDetails) {
@@ -277,8 +277,9 @@ object CarbonDataRDDFactory {
LOGGER.info("System level compaction lock is enabled.")
val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
var tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray,
- skipCompactionTables.toList.asJava)
+ .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+ .metadata.tablesMeta.toArray,
+ skipCompactionTables.toList.asJava)
while (null != tableForCompaction) {
LOGGER.info("Compaction request has been identified for table " +
s"${ tableForCompaction.carbonTable.getDatabaseName }." +
@@ -289,10 +290,10 @@ object CarbonDataRDDFactory {
val newCarbonLoadModel = new CarbonLoadModel()
DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
- val tableCreationTime = CarbonEnv.get.carbonMetastore
- .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
- newCarbonLoadModel.getTableName
- )
+ val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession)
+ .carbonMetastore.getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+ newCarbonLoadModel.getTableName
+ )
val compactionSize = CarbonDataMergerUtil
.getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -331,9 +332,10 @@ object CarbonDataRDDFactory {
}
// ********* check again for all the tables.
tableForCompaction = CarbonCompactionUtil
- .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
- .tablesMeta.toArray, skipCompactionTables.asJava
- )
+ .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
+ .carbonMetastore.metadata
+ .tablesMeta.toArray, skipCompactionTables.asJava
+ )
}
}
// giving the user his error for telling in the beeline if his triggered table
@@ -487,9 +489,9 @@ object CarbonDataRDDFactory {
// reading the start time of data load.
val loadStartTime = CarbonUpdateUtil.readCurrentTime();
carbonLoadModel.setFactTimeStamp(loadStartTime)
- val tableCreationTime = CarbonEnv.get.carbonMetastore
+ val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
.getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
- val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
+ val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
.getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
// get partition way from configuration
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 9ad9504..465fad0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
*/
private def loadTempCSV(options: CarbonOption): Unit = {
// temporary solution: write to csv file, then load the csv into carbon
- val storePath = CarbonEnv.get.carbonMetastore.storePath
+ val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath
val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
.append("tempCSV")
.append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 322e1ae..ea3e40d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,16 +46,11 @@ case class CarbonDatasourceHadoopRelation(
extends BaseRelation {
lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
- lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
- lazy val carbonRelation: CarbonRelation = {
- CarbonRelation(
- carbonTable.getDatabaseName,
- carbonTable.getFactTableName,
- CarbonSparkUtil.createSparkMeta(carbonTable),
- new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
- None
- )
- }
+ lazy val carbonTable = carbonRelation.tableMeta.carbonTable
+ lazy val carbonRelation: CarbonRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName),
+ absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
override def sqlContext: SQLContext = sparkSession.sqlContext
@@ -72,6 +67,7 @@ case class CarbonDatasourceHadoopRelation(
new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull,
absIdentifier, carbonTable)
}
+
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
override def toString: String = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d47ff1a..d897d9a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -46,7 +46,8 @@ case class CarbonDictionaryDecoder(
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
- child: SparkPlan)
+ child: SparkPlan,
+ sparkSession: SparkSession)
extends UnaryExecNode with CodegenSupport {
override val output: Seq[Attribute] =
@@ -62,7 +63,7 @@ case class CarbonDictionaryDecoder(
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
- val storePath = CarbonEnv.get.carbonMetastore.storePath
+ val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -116,7 +117,7 @@ case class CarbonDictionaryDecoder(
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
- val storePath = CarbonEnv.get.carbonMetastore.storePath
+ val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
(carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -436,10 +437,11 @@ class CarbonDecoderRDD(
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
prev: RDD[InternalRow],
- output: Seq[Attribute])
+ output: Seq[Attribute],
+ sparkSession: SparkSession)
extends RDD[InternalRow](prev) {
- private val storepath = CarbonEnv.get.carbonMetastore.storePath
+ private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
def canBeDecoded(attr: Attribute): Boolean = {
profile match {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d426348..a286e56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,7 +17,10 @@
package org.apache.spark.sql
-import org.apache.spark.sql.hive.CarbonMetastore
+import java.util.Map
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -28,14 +31,12 @@ import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
/**
* Carbon Environment for unified context
*/
-case class CarbonEnv(carbonMetastore: CarbonMetastore)
+class CarbonEnv {
-object CarbonEnv {
+ var carbonMetastore: CarbonMetastore = _
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- @volatile private var carbonEnv: CarbonEnv = _
-
// set readsupport class global so that the executor can get it.
SparkReadSupport.readSupportClass = classOf[SparkRowReadSupportImpl]
@@ -43,20 +44,35 @@ object CarbonEnv {
def init(sparkSession: SparkSession): Unit = {
if (!initialized) {
- val catalog = {
+ carbonMetastore = {
val storePath =
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
LOGGER.info(s"carbon env initial: $storePath")
new CarbonMetastore(sparkSession.conf, storePath)
}
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
- carbonEnv = CarbonEnv(catalog)
initialized = true
}
}
+}
- def get: CarbonEnv = {
- carbonEnv
+object CarbonEnv {
+
+ val carbonEnvMap: Map[SparkSession, CarbonEnv] =
+ new ConcurrentHashMap[SparkSession, CarbonEnv]
+
+ def getInstance(sparkSession: SparkSession): CarbonEnv = {
+ if (sparkSession.isInstanceOf[CarbonSession]) {
+ sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv
+ } else {
+ var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
+ if (carbonEnv == null) {
+ carbonEnv = new CarbonEnv
+ carbonEnv.init(sparkSession)
+ carbonEnvMap.put(sparkSession, carbonEnv)
+ }
+ carbonEnv
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 23040b3..1ce170c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -40,8 +40,6 @@ class CarbonSession(@transient val sc: SparkContext,
this(sc, None)
}
- CarbonEnv.init(this)
-
@transient
override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fce9b4c..536c19f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -45,7 +45,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
// will be called if hive supported create table command is provided
override def createRelation(sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
- CarbonEnv.init(sqlContext.sparkSession)
+ CarbonEnv.getInstance(sqlContext.sparkSession)
// if path is provided we can directly create Hadoop relation. \
// Otherwise create datasource relation
parameters.get("tablePath") match {
@@ -61,13 +61,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
}
}
+
// called by any write operation like INSERT INTO DDL or DataFrame.write API
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- CarbonEnv.init(sqlContext.sparkSession)
+ CarbonEnv.getInstance(sqlContext.sparkSession)
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
@@ -109,7 +110,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
- CarbonEnv.init(sqlContext.sparkSession)
+ CarbonEnv.getInstance(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
@@ -138,8 +139,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
}
val options = new CarbonOption(parameters)
try {
- CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
- CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
} catch {
case ex: NoSuchTableException =>
val fields = dataSchema.map { col =>
@@ -162,23 +164,23 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
val map = scala.collection.mutable.Map[String, String]()
parameters.foreach { parameter => map.put(parameter._1, parameter._2.toLowerCase) }
val bucketFields = if (options.isBucketingEnabled) {
- if (options.bucketNumber.toString.contains("-") ||
- options.bucketNumber.toString.contains("+") ) {
- throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED" +
- options.bucketNumber.toString)
- }
- else {
- Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
- options.bucketNumber))
- }
- } else {
- None
+ if (options.bucketNumber.toString.contains("-") ||
+ options.bucketNumber.toString.contains("+")) {
+ throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED" +
+ options.bucketNumber.toString)
+ }
+ else {
+ Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
+ options.bucketNumber))
}
+ } else {
+ None
+ }
val cm = TableCreator.prepareTableModel(ifNotExistPresent = false, Option(dbName),
tableName, fields, Nil, bucketFields, map)
CreateTable(cm, false).run(sparkSession)
- CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
case ex: Exception =>
throw new Exception("do not have dbname and tablename for carbon table", ex)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 976759a..39fbb09 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -70,7 +70,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
CarbonDictionaryDecoder(relations,
profile,
aliasMap,
- planLater(child)
+ planLater(child),
+ SparkSession.getActiveSession.get
) :: Nil
}
case _ => Nil
@@ -95,7 +96,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
newAttr
}
new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs),
- CarbonAliasDecoderRelation(), rdd, output)
+ CarbonAliasDecoderRelation(), rdd, output, SparkSession.getActiveSession.get)
}
private[this] def toCatalystRDD(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 8b194da..6097231 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -53,7 +53,9 @@ private[sql] case class AlterTableAddColumns(
var locks = List.empty[ICarbonLock]
var lastUpdatedTime = 0L
var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
@@ -63,7 +65,7 @@ private[sql] case class AlterTableAddColumns(
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTableInfo: TableInfo = CarbonEnv.get.carbonMetastore
+ val thriftTableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
.readSchemaFile(tableMetadataFile)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter
@@ -139,7 +141,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
val relation: CarbonRelation =
- CarbonEnv.get.carbonMetastore
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
@@ -164,8 +166,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+ .carbonMetastore.readSchemaFile(tableMetadataFile)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
schemaEvolutionEntry.setTableName(newTableName)
schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
@@ -183,11 +185,13 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
newTableName,
carbonTable.getCarbonTableIdentifier.getTableId)
- val newTablePath = CarbonEnv.get.carbonMetastore.updateTableSchema(newTableIdentifier,
- tableInfo,
- schemaEvolutionEntry,
- carbonTable.getStorePath)(sparkSession)
- CarbonEnv.get.carbonMetastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+ val newTablePath = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .updateTableSchema(newTableIdentifier,
+ tableInfo,
+ schemaEvolutionEntry,
+ carbonTable.getStorePath)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .removeTableFromMetadata(oldDatabaseName, oldTableName)
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
.runSqlHive(
s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
@@ -202,7 +206,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
case e: Exception => LOGGER
.error("Rename table failed: " + e.getMessage)
AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
- sparkSession)
+ sparkSession)
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
} finally {
@@ -253,7 +257,9 @@ private[sql] case class AlterTableDropColumns(
var lastUpdatedTime = 0L
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
// get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
try {
locks = AlterTableUtil
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
@@ -294,8 +300,8 @@ private[sql] case class AlterTableDropColumns(
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+ .carbonMetastore.readSchemaFile(tableMetadataFile)
// maintain the deleted columns for schema evolution history
var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
val columnSchemaList = tableInfo.fact_table.table_columns.asScala
@@ -349,7 +355,9 @@ private[sql] case class AlterTableDataTypeChange(
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
// get the latest carbon table and check for column existence
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
var lastUpdatedTime = 0L
try {
locks = AlterTableUtil
@@ -375,7 +383,7 @@ private[sql] case class AlterTableDataTypeChange(
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val tableInfo: TableInfo = CarbonEnv.get.carbonMetastore
+ val tableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
.readSchemaFile(tableMetadataFile)
// maintain the added column for schema evolution history
var addColumnSchema: ColumnSchema = null
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 95f6639..2786620 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -30,13 +30,13 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
// DropHiveDB command will fail if cascade is false and one or more table exists in database
val rows = command.run(sparkSession)
if (command.cascade) {
- val tablesInDB = CarbonEnv.get.carbonMetastore.getAllTables()
+ val tablesInDB = CarbonEnv.getInstance(sparkSession).carbonMetastore.getAllTables()
.filterNot(_.database.exists(_.equalsIgnoreCase(dbName)))
tablesInDB.foreach { tableName =>
CarbonDropTableCommand(true, Some(dbName), tableName.table).run(sparkSession)
}
}
- CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.dropDatabaseDirectory(dbName)
rows
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 55148d2..8afadf0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,7 +16,8 @@
*/
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable,
+ShowLoadsCommand, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -33,11 +34,12 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
- if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(identifier)(sparkSession) =>
ExecutedCommandExec(LoadTable(identifier.database, identifier.table.toLowerCase, path,
Seq(), Map(), isOverwrite)) :: Nil
case alter@AlterTableRenameCommand(oldTableIdentifier, newTableIdentifier, _) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(oldTableIdentifier)(
sparkSession)
if (isCarbonTable) {
@@ -47,7 +49,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
ExecutedCommandExec(alter) :: Nil
}
case DropTableCommand(identifier, ifNotExists, isView, _)
- if CarbonEnv.get.carbonMetastore
+ if CarbonEnv.getInstance(sparkSession).carbonMetastore
.isTablePathExists(identifier)(sparkSession) =>
ExecutedCommandExec(
CarbonDropTableCommand(ifNotExists, identifier.database,
@@ -55,15 +57,15 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
case ShowLoadsCommand(databaseName, table, limit) =>
ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output)) :: Nil
case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
- _, child: LogicalPlan, _, _) =>
+ _, child: LogicalPlan, _, _) =>
ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
- CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.createDatabaseDirectory(dbName)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
case alterTable@AlterTableCompaction(altertablemodel) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sparkSession)
if (isCarbonTable) {
@@ -79,7 +81,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
"Operation not allowed : " + altertablemodel.alterSql)
}
case dataTypeChange@AlterTableDataTypeChange(alterTableChangeDataTypeModel) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
alterTableChangeDataTypeModel.databaseName))(sparkSession)
if (isCarbonTable) {
@@ -88,7 +90,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case addColumn@AlterTableAddColumns(alterTableAddColumnsModel) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
alterTableAddColumnsModel.databaseName))(sparkSession)
if (isCarbonTable) {
@@ -97,7 +99,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case dropColumn@AlterTableDropColumns(alterTableDropColumnModel) =>
- val isCarbonTable = CarbonEnv.get.carbonMetastore
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
.tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
alterTableDropColumnModel.databaseName))(sparkSession)
if (isCarbonTable) {
@@ -106,7 +108,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
}
case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
- if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
+ if
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(identifier)(sparkSession) && isFormatted =>
val resolvedTable =
sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 37fac95..06ab47d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -57,7 +57,8 @@ import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil,
+DataTypeConverterUtil, GlobalDictionaryUtil}
object Checker {
def validateTableExists(
@@ -65,7 +66,7 @@ object Checker {
tableName: String,
session: SparkSession): Unit = {
val identifier = TableIdentifier(tableName, dbName)
- if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(session)) {
+ if (!CarbonEnv.getInstance(session).carbonMetastore.tableExists(identifier)(session)) {
val err = s"table $dbName.$tableName not found"
LogServiceFactory.getLogService(this.getClass.getName).error(err)
throw new IllegalArgumentException(err)
@@ -85,20 +86,19 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = alterTableModel.tableName.toLowerCase
val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
- if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
- LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
- sys.error(s"alter table failed. table not found: $databaseName.$tableName")
- }
-
val relation =
- CarbonEnv.get.carbonMetastore
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(databaseName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
sys.error(s"Table $databaseName.$tableName does not exist")
}
- val carbonLoadModel = new CarbonLoadModel()
+ if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
+ LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
+ sys.error(s"alter table failed. table not found: $databaseName.$tableName")
+ }
+ val carbonLoadModel = new CarbonLoadModel()
val table = relation.tableMeta.carbonTable
carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
@@ -138,7 +138,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
- CarbonEnv.init(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
val tbName = cm.tableName
@@ -161,7 +161,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
}
} else {
// Add Database to catalog and persist
- val catalog = CarbonEnv.get.carbonMetastore
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
// Need to fill partitioner class when we support partition
val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
if (createDSTable) {
@@ -171,15 +171,15 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
sparkSession.sql(
s"""CREATE TABLE $dbName.$tbName
- |(${fields.map(f => f.rawSchema).mkString(",")})
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+ |(${ fields.map(f => f.rawSchema).mkString(",") })
+ |USING org.apache.spark.sql.CarbonSource""".stripMargin +
s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
} catch {
case e: Exception =>
val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
// call the drop table to delete the created table.
- CarbonEnv.get.carbonMetastore
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(catalog.storePath, identifier)(sparkSession)
LOGGER.audit(s"Table creation with Database name [$dbName] " +
@@ -266,7 +266,7 @@ object LoadTable {
// write TableInfo
CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
- val catalog = CarbonEnv.get.carbonMetastore
+ val catalog = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
// upate the schema modified time
catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
@@ -289,6 +289,7 @@ object LoadTable {
case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
extends RunnableCommand {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
def run(sparkSession: SparkSession): Seq[Row] = {
val df = Dataset.ofRows(sparkSession, child)
val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
@@ -321,10 +322,12 @@ case class LoadTable(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) {
- default
- } else {
- value
+ private def checkDefaultValue(value: String, default: String) = {
+ if (StringUtils.isEmpty(value)) {
+ default
+ } else {
+ value
+ }
}
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -340,17 +343,17 @@ case class LoadTable(
if (isOverwriteExist) {
sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
}
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
sys.error(s"Data loading failed. table not found: $dbName.$tableName")
}
- val relation = CarbonEnv.get.carbonMetastore
- .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
- if (relation == null) {
- sys.error(s"Table $dbName.$tableName does not exist")
- }
CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
val carbonLock = CarbonLockFactory
.getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
@@ -386,8 +389,8 @@ case class LoadTable(
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
val partitionLocation = relation.tableMeta.storePath + "/partition/" +
- relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
- relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+ relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+ relation.tableMeta.carbonTableIdentifier.getTableName + "/"
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
@@ -439,7 +442,7 @@ case class LoadTable(
true
} else {
LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
- "can not be used together")
+ "can not be used together")
false
}
case "false" =>
@@ -679,7 +682,7 @@ case class CleanFiles(
def run(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
- val relation = CarbonEnv.get.carbonMetastore
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
CarbonStore.cleanFiles(
getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -718,8 +721,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
val carbonLock = CarbonLockFactory
.getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
- val storePath = CarbonEnv.get.carbonMetastore.storePath
+ val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ val storePath = catalog.storePath
var isLocked = false
+ catalog.checkSchemasModifiedTimeAndReloadTables()
try {
isLocked = carbonLock.lockWithRetries()
if (isLocked) {
@@ -730,13 +735,15 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
sys.error("Table is locked for deletion. Please try after some time")
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
- val carbonTable = CarbonEnv.get.carbonMetastore.getTableFromMetadata(dbName, tableName)
+ val carbonTable = catalog
+ .getTableFromMetadata(dbName, tableName)
.map(_.carbonTable).getOrElse(null)
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
- CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .dropTable(storePath, identifier)(sparkSession)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} finally {
if (carbonLock != null && isLocked) {
@@ -771,7 +778,7 @@ private[sql] case class DescribeCommandFormatted(
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- val relation = CarbonEnv.get.carbonMetastore
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
val mapper = new ObjectMapper()
val colProps = StringBuilder.newBuilder
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 38c7f34..687afc4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,22 +16,98 @@
*/
package org.apache.spark.sql.hive
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
import org.apache.spark.sql.execution.command.DDLStrategy
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.processing.merger.TableMeta
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+ externalCatalog: HiveExternalCatalog,
+ globalTempViewManager: GlobalTempViewManager,
+ sparkSession: SparkSession,
+ functionResourceLoader: FunctionResourceLoader,
+ functionRegistry: FunctionRegistry,
+ conf: SQLConf,
+ hadoopConf: Configuration)
+ extends HiveSessionCatalog(
+ externalCatalog,
+ globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ hadoopConf) {
+
+ lazy val carbonEnv = {
+ val env = new CarbonEnv
+ env.init(sparkSession)
+ env
+ }
+
+ /**
+ * This method will invalidate carbonrelation from cache if carbon table is updated in
+ * carbon catalog
+ *
+ * @param name
+ * @param alias
+ * @return
+ */
+ override def lookupRelation(name: TableIdentifier,
+ alias: Option[String]): LogicalPlan = {
+ super.lookupRelation(name, alias) match {
+ case SubqueryAlias(_,
+ LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+ _) =>
+ refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+ case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+ refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+ case relation => relation
+ }
+ }
+
+ private def refreshRelationFromCache(name: TableIdentifier,
+ alias: Option[String],
+ carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): LogicalPlan = {
+ carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTables
+ carbonEnv.carbonMetastore
+ .getTableFromMetadata(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+ carbonDatasourceHadoopRelation.carbonTable.getFactTableName) match {
+ case tableMeta: TableMeta =>
+ if (tableMeta.carbonTable.getTableLastUpdatedTime !=
+ carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) {
+ refreshTable(name)
+ }
+ case _ => refreshTable(name)
+ }
+ super.lookupRelation(name, alias)
+ }
+}
/**
* Session state implementation to override sql parser and adding strategies
@@ -66,6 +142,20 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
PreWriteCheck(conf, catalog))
}
}
+
+ /**
+ * Internal catalog for managing table and database states.
+ */
+ override lazy val catalog = {
+ new CarbonSessionCatalog(
+ sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+ sparkSession.sharedState.globalTempViewManager,
+ sparkSession,
+ functionResourceLoader,
+ functionRegistry,
+ conf,
+ newHadoopConf())
+ }
}
class CarbonOptimizer(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index f5248f5..052edce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -54,7 +54,7 @@ object AlterTableUtil {
LOGGER: LogService)
(sparkSession: SparkSession): List[ICarbonLock] = {
val relation =
- CarbonEnv.get.carbonMetastore
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
@@ -158,13 +158,13 @@ object AlterTableUtil {
thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getFactTableName
- CarbonEnv.get.carbonMetastore
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
.updateTableSchema(carbonTable.getCarbonTableIdentifier,
thriftTable,
schemaEvolutionEntry,
carbonTable.getStorePath)(sparkSession)
val tableIdentifier = TableIdentifier(tableName, Some(dbName))
- val schema = CarbonEnv.get.carbonMetastore
+ val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession).schema.json
val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
catalog.client.runSqlHive(
@@ -209,28 +209,30 @@ object AlterTableUtil {
lastUpdatedTime: Long)
(sparkSession: SparkSession): Unit = {
val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val carbonTable: CarbonTable = CarbonMetadata.getInstance
- .getCarbonTable(database + "_" + newTableName)
+ val carbonTable: CarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(database), newTableName)(sparkSession).asInstanceOf[CarbonRelation]
+ .tableMeta.carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
val fileType = FileFactory.getFileType(tableMetadataFile)
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
- .readSchemaFile(tableMetadataFile)
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+ .carbonMetastore.readSchemaFile(tableMetadataFile)
val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime > lastUpdatedTime) {
- LOGGER.error(s"Reverting changes for $database.${oldTableIdentifier.table}")
+ LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
.renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
oldTableIdentifier.table)
val tableIdentifier = new CarbonTableIdentifier(database,
oldTableIdentifier.table,
carbonTable.getCarbonTableIdentifier.getTableId)
- CarbonEnv.get.carbonMetastore.revertTableSchema(tableIdentifier,
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
tableInfo,
carbonTable.getStorePath)(sparkSession)
- CarbonEnv.get.carbonMetastore.removeTableFromMetadata(database, newTableName)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .removeTableFromMetadata(database, newTableName)
}
}
@@ -244,13 +246,14 @@ object AlterTableUtil {
*/
def revertAddColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
(sparkSession: SparkSession): Unit = {
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
-
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+ val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
.readSchemaFile(tableMetadataFile)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
@@ -258,8 +261,9 @@ object AlterTableUtil {
LOGGER.info(s"Reverting changes for $dbName.$tableName")
val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
thriftTable.fact_table.table_columns.removeAll(addedSchemas)
- CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getStorePath)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getStorePath)(sparkSession)
}
}
@@ -273,11 +277,13 @@ object AlterTableUtil {
*/
def revertDropColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
(sparkSession: SparkSession): Unit = {
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+ val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
.readSchemaFile(tableMetadataFile)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
@@ -291,8 +297,9 @@ object AlterTableUtil {
}
}
}
- CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getStorePath)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getStorePath)(sparkSession)
}
}
@@ -306,11 +313,13 @@ object AlterTableUtil {
*/
def revertDataTypeChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
(sparkSession: SparkSession): Unit = {
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+ .carbonTable
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
- val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+ val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
.readSchemaFile(tableMetadataFile)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
@@ -327,8 +336,9 @@ object AlterTableUtil {
}
}
}
- CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
- thriftTable, carbonTable.getStorePath)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+ thriftTable, carbonTable.getStorePath)(sparkSession)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index e72abd7..b9a6708 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -43,7 +43,7 @@ object CleanFiles {
val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
- CarbonEnv.init(spark)
+ CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
cleanFiles(spark, dbName, tableName, storePath)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 0d85062..d78fd5f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -56,7 +56,7 @@ object Compaction {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
- CarbonEnv.init(spark)
+ CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 6219f1e..7815417 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -43,7 +43,7 @@ object DeleteSegmentByDate {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
- CarbonEnv.init(spark)
+ CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 303a062..65b76b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -48,7 +48,7 @@ object DeleteSegmentById {
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
- CarbonEnv.init(spark)
+ CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index c7286ee..d918381 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -75,7 +75,7 @@ object ShowSegments {
None
}
val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
- CarbonEnv.init(spark)
+ CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
val rows = showSegments(spark, dbName, tableName, limit)
System.out.println(showString(rows))
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index cb444d6..a57ab10 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -62,7 +62,8 @@ object TableAPIUtil {
spark: SparkSession,
dbName: String,
tableName: String): Unit = {
- if (!CarbonEnv.get.carbonMetastore.tableExists(tableName, Some(dbName))(spark)) {
+ if (!CarbonEnv.getInstance(spark).carbonMetastore
+ .tableExists(tableName, Some(dbName))(spark)) {
val err = s"table $dbName.$tableName not found"
LOGGER.error(err)
throw new MalformedCarbonCommandException(err)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 324d3a2..82d8da2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -80,7 +80,7 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
- CarbonEnv.init(spark)
+ CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
[2/2] incubator-carbondata git commit: [CARBONDATA-925] Synchronize
schema meta data for concurrent drivers and users.This closes #792
Posted by gv...@apache.org.
[CARBONDATA-925] Synchronize schema meta data for concurrent drivers and users.This closes #792
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9c575bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9c575bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9c575bf7
Branch: refs/heads/master
Commit: 9c575bf76cefaf7e67f686a66428d8d39b641765
Parents: 804af93 049b8aa
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Thu Apr 20 20:42:52 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Apr 20 20:42:52 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/CarbonDataRDDFactory.scala | 26 ++---
.../spark/sql/CarbonDataFrameWriter.scala | 2 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 16 ++-
.../spark/sql/CarbonDictionaryDecoder.scala | 12 ++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 34 +++++--
.../org/apache/spark/sql/CarbonSession.scala | 2 -
.../org/apache/spark/sql/CarbonSource.scala | 36 +++----
.../execution/CarbonLateDecodeStrategy.scala | 5 +-
.../execution/command/AlterTableCommands.scala | 40 +++++---
.../execution/command/CarbonHiveCommands.scala | 4 +-
.../sql/execution/command/DDLStrategy.scala | 26 +++--
.../execution/command/carbonTableSchema.scala | 71 +++++++------
.../spark/sql/hive/CarbonSessionState.scala | 102 +++++++++++++++++--
.../org/apache/spark/util/AlterTableUtil.scala | 56 +++++-----
.../org/apache/spark/util/CleanFiles.scala | 2 +-
.../org/apache/spark/util/Compaction.scala | 2 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 2 +-
.../apache/spark/util/DeleteSegmentById.scala | 2 +-
.../org/apache/spark/util/ShowSegments.scala | 2 +-
.../org/apache/spark/util/TableAPIUtil.scala | 3 +-
.../org/apache/spark/util/TableLoader.scala | 2 +-
21 files changed, 292 insertions(+), 155 deletions(-)
----------------------------------------------------------------------