You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/04/20 15:13:11 UTC

[1/2] incubator-carbondata git commit: changed CarbonEnv object to class and used sparksession to fetch the carbon catalog which will be used in that spark session.

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 804af9303 -> 9c575bf76


changed CarbonEnv object to class and used sparksession to fetch the carbon catalog which will be used in that spark session.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/049b8aac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/049b8aac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/049b8aac

Branch: refs/heads/master
Commit: 049b8aacc95b8a2468990d0a96b819230ae08546
Parents: 804af93
Author: nareshpr <pr...@gmail.com>
Authored: Thu Apr 13 13:03:54 2017 +0530
Committer: nareshpr <pr...@gmail.com>
Committed: Thu Apr 20 19:41:36 2017 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  26 ++---
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  16 ++-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  12 ++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  34 +++++--
 .../org/apache/spark/sql/CarbonSession.scala    |   2 -
 .../org/apache/spark/sql/CarbonSource.scala     |  36 +++----
 .../execution/CarbonLateDecodeStrategy.scala    |   5 +-
 .../execution/command/AlterTableCommands.scala  |  40 +++++---
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../sql/execution/command/DDLStrategy.scala     |  26 +++--
 .../execution/command/carbonTableSchema.scala   |  71 +++++++------
 .../spark/sql/hive/CarbonSessionState.scala     | 102 +++++++++++++++++--
 .../org/apache/spark/util/AlterTableUtil.scala  |  56 +++++-----
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableAPIUtil.scala    |   3 +-
 .../org/apache/spark/util/TableLoader.scala     |   2 +-
 21 files changed, 292 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ca96a17..631b2a7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -98,7 +98,7 @@ object CarbonDataRDDFactory {
     LOGGER.audit(s"Compaction request received for table " +
         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val tableCreationTime = CarbonEnv.get.carbonMetastore
+    val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
         .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
@@ -277,8 +277,9 @@ object CarbonDataRDDFactory {
             LOGGER.info("System level compaction lock is enabled.")
             val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
             var tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray,
-                  skipCompactionTables.toList.asJava)
+              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+                .metadata.tablesMeta.toArray,
+                skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
                   s"${ tableForCompaction.carbonTable.getDatabaseName }." +
@@ -289,10 +290,10 @@ object CarbonDataRDDFactory {
 
               val newCarbonLoadModel = new CarbonLoadModel()
               DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
-              val tableCreationTime = CarbonEnv.get.carbonMetastore
-                  .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
-                    newCarbonLoadModel.getTableName
-                  )
+              val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession)
+                .carbonMetastore.getTableCreationTime(newCarbonLoadModel.getDatabaseName,
+                newCarbonLoadModel.getTableName
+              )
 
               val compactionSize = CarbonDataMergerUtil
                   .getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -331,9 +332,10 @@ object CarbonDataRDDFactory {
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
-                  .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
-                      .tablesMeta.toArray, skipCompactionTables.asJava
-                  )
+                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession)
+                  .carbonMetastore.metadata
+                  .tablesMeta.toArray, skipCompactionTables.asJava
+                )
             }
           }
           // giving the user his error for telling in the beeline if his triggered table
@@ -487,9 +489,9 @@ object CarbonDataRDDFactory {
       // reading the start time of data load.
       val loadStartTime = CarbonUpdateUtil.readCurrentTime();
       carbonLoadModel.setFactTimeStamp(loadStartTime)
-      val tableCreationTime = CarbonEnv.get.carbonMetastore
+      val tableCreationTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
           .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
+      val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
           .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 9ad9504..465fad0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -58,7 +58,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
    */
   private def loadTempCSV(options: CarbonOption): Unit = {
     // temporary solution: write to csv file, then load the csv into carbon
-    val storePath = CarbonEnv.get.carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.storePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
       .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 322e1ae..ea3e40d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,16 +46,11 @@ case class CarbonDatasourceHadoopRelation(
   extends BaseRelation {
 
   lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
-  lazy val carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier)
-  lazy val carbonRelation: CarbonRelation = {
-    CarbonRelation(
-      carbonTable.getDatabaseName,
-      carbonTable.getFactTableName,
-      CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
-      None
-    )
-  }
+  lazy val carbonTable = carbonRelation.tableMeta.carbonTable
+  lazy val carbonRelation: CarbonRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    .lookupRelation(Some(absIdentifier.getCarbonTableIdentifier.getDatabaseName),
+      absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
+    .asInstanceOf[CarbonRelation]
 
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
@@ -72,6 +67,7 @@ case class CarbonDatasourceHadoopRelation(
     new CarbonScanRDD(sqlContext.sparkContext, projection, filterExpression.orNull,
       absIdentifier, carbonTable)
   }
+
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
 
   override def toString: String = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d47ff1a..d897d9a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -46,7 +46,8 @@ case class CarbonDictionaryDecoder(
     relations: Seq[CarbonDecoderRelation],
     profile: CarbonProfile,
     aliasMap: CarbonAliasDecoderRelation,
-    child: SparkPlan)
+    child: SparkPlan,
+    sparkSession: SparkSession)
   extends UnaryExecNode with CodegenSupport {
 
   override val output: Seq[Attribute] =
@@ -62,7 +63,7 @@ case class CarbonDictionaryDecoder(
 
   override def doExecute(): RDD[InternalRow] = {
     attachTree(this, "execute") {
-      val storePath = CarbonEnv.get.carbonMetastore.storePath
+      val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
       val absoluteTableIdentifiers = relations.map { relation =>
         val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
         (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -116,7 +117,7 @@ case class CarbonDictionaryDecoder(
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
 
-    val storePath = CarbonEnv.get.carbonMetastore.storePath
+    val storePath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)
@@ -436,10 +437,11 @@ class CarbonDecoderRDD(
     profile: CarbonProfile,
     aliasMap: CarbonAliasDecoderRelation,
     prev: RDD[InternalRow],
-    output: Seq[Attribute])
+    output: Seq[Attribute],
+    sparkSession: SparkSession)
   extends RDD[InternalRow](prev) {
 
-  private val storepath = CarbonEnv.get.carbonMetastore.storePath
+  private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
 
   def canBeDecoded(attr: Attribute): Boolean = {
     profile match {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index d426348..a286e56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.hive.CarbonMetastore
+import java.util.Map
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -28,14 +31,12 @@ import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 /**
  * Carbon Environment for unified context
  */
-case class CarbonEnv(carbonMetastore: CarbonMetastore)
+class CarbonEnv {
 
-object CarbonEnv {
+  var carbonMetastore: CarbonMetastore = _
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  @volatile private var carbonEnv: CarbonEnv = _
-
   // set readsupport class global so that the executor can get it.
   SparkReadSupport.readSupportClass = classOf[SparkRowReadSupportImpl]
 
@@ -43,20 +44,35 @@ object CarbonEnv {
 
   def init(sparkSession: SparkSession): Unit = {
     if (!initialized) {
-      val catalog = {
+      carbonMetastore = {
         val storePath =
           CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
         new CarbonMetastore(sparkSession.conf, storePath)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
-      carbonEnv = CarbonEnv(catalog)
       initialized = true
     }
   }
+}
 
-  def get: CarbonEnv = {
-    carbonEnv
+object CarbonEnv {
+
+  val carbonEnvMap: Map[SparkSession, CarbonEnv] =
+    new ConcurrentHashMap[SparkSession, CarbonEnv]
+
+  def getInstance(sparkSession: SparkSession): CarbonEnv = {
+    if (sparkSession.isInstanceOf[CarbonSession]) {
+      sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv
+    } else {
+      var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
+      if (carbonEnv == null) {
+        carbonEnv = new CarbonEnv
+        carbonEnv.init(sparkSession)
+        carbonEnvMap.put(sparkSession, carbonEnv)
+      }
+      carbonEnv
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 23040b3..1ce170c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -40,8 +40,6 @@ class CarbonSession(@transient val sc: SparkContext,
     this(sc, None)
   }
 
-  CarbonEnv.init(this)
-
   @transient
   override private[sql] lazy val sessionState: SessionState = new CarbonSessionState(this)
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index fce9b4c..536c19f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -45,7 +45,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   // will be called if hive supported create table command is provided
   override def createRelation(sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
-    CarbonEnv.init(sqlContext.sparkSession)
+    CarbonEnv.getInstance(sqlContext.sparkSession)
     // if path is provided we can directly create Hadoop relation. \
     // Otherwise create datasource relation
     parameters.get("tablePath") match {
@@ -61,13 +61,14 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
     }
   }
+
   // called by any write operation like INSERT INTO DDL or DataFrame.write API
   override def createRelation(
       sqlContext: SQLContext,
       mode: SaveMode,
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
-    CarbonEnv.init(sqlContext.sparkSession)
+    CarbonEnv.getInstance(sqlContext.sparkSession)
     // User should not specify path since only one store is supported in carbon currently,
     // after we support multi-store, we can remove this limitation
     require(!parameters.contains("path"), "'path' should not be specified, " +
@@ -109,7 +110,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       sqlContext: SQLContext,
       parameters: Map[String, String],
       dataSchema: StructType): BaseRelation = {
-    CarbonEnv.init(sqlContext.sparkSession)
+    CarbonEnv.getInstance(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
     val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
     CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(path), parameters,
@@ -138,8 +139,9 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
     val options = new CarbonOption(parameters)
     try {
-      CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
-      CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .lookupRelation(Option(dbName), tableName)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
     } catch {
       case ex: NoSuchTableException =>
         val fields = dataSchema.map { col =>
@@ -162,23 +164,23 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         val map = scala.collection.mutable.Map[String, String]()
         parameters.foreach { parameter => map.put(parameter._1, parameter._2.toLowerCase) }
         val bucketFields = if (options.isBucketingEnabled) {
-            if (options.bucketNumber.toString.contains("-") ||
-                options.bucketNumber.toString.contains("+") ) {
-              throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED" +
-                                                        options.bucketNumber.toString)
-            }
-            else {
-              Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
-                options.bucketNumber))
-            }
-          } else {
-            None
+          if (options.bucketNumber.toString.contains("-") ||
+              options.bucketNumber.toString.contains("+")) {
+            throw new MalformedCarbonCommandException("INVALID NUMBER OF BUCKETS SPECIFIED" +
+                                                      options.bucketNumber.toString)
+          }
+          else {
+            Some(BucketFields(options.bucketColumns.toLowerCase.split(",").map(_.trim),
+              options.bucketNumber))
           }
+        } else {
+          None
+        }
 
         val cm = TableCreator.prepareTableModel(ifNotExistPresent = false, Option(dbName),
           tableName, fields, Nil, bucketFields, map)
         CreateTable(cm, false).run(sparkSession)
-        CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName"
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 976759a..39fbb09 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -70,7 +70,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
           CarbonDictionaryDecoder(relations,
             profile,
             aliasMap,
-            planLater(child)
+            planLater(child),
+            SparkSession.getActiveSession.get
           ) :: Nil
         }
       case _ => Nil
@@ -95,7 +96,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       newAttr
     }
     new CarbonDecoderRDD(Seq(relation), IncludeProfile(attrs),
-      CarbonAliasDecoderRelation(), rdd, output)
+      CarbonAliasDecoderRelation(), rdd, output, SparkSession.getActiveSession.get)
   }
 
   private[this] def toCatalystRDD(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 8b194da..6097231 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -53,7 +53,9 @@ private[sql] case class AlterTableAddColumns(
     var locks = List.empty[ICarbonLock]
     var lastUpdatedTime = 0L
     var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
@@ -63,7 +65,7 @@ private[sql] case class AlterTableAddColumns(
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
       val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val thriftTableInfo: TableInfo = CarbonEnv.get.carbonMetastore
+      val thriftTableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
         .readSchemaFile(tableMetadataFile)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
@@ -139,7 +141,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
     LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     val relation: CarbonRelation =
-      CarbonEnv.get.carbonMetastore
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
         .lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
@@ -164,8 +166,8 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
       val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
-        .readSchemaFile(tableMetadataFile)
+      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+        .carbonMetastore.readSchemaFile(tableMetadataFile)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       schemaEvolutionEntry.setTime_stamp(System.currentTimeMillis())
@@ -183,11 +185,13 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName,
         carbonTable.getCarbonTableIdentifier.getTableId)
-      val newTablePath = CarbonEnv.get.carbonMetastore.updateTableSchema(newTableIdentifier,
-        tableInfo,
-        schemaEvolutionEntry,
-        carbonTable.getStorePath)(sparkSession)
-      CarbonEnv.get.carbonMetastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
+      val newTablePath = CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .updateTableSchema(newTableIdentifier,
+          tableInfo,
+          schemaEvolutionEntry,
+          carbonTable.getStorePath)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .removeTableFromMetadata(oldDatabaseName, oldTableName)
       sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
         .runSqlHive(
           s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName")
@@ -202,7 +206,7 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
       case e: Exception => LOGGER
         .error("Rename table failed: " + e.getMessage)
         AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
-            sparkSession)
+          sparkSession)
         renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
     } finally {
@@ -253,7 +257,9 @@ private[sql] case class AlterTableDropColumns(
     var lastUpdatedTime = 0L
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     // get the latest carbon table and check for column existence
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
     try {
       locks = AlterTableUtil
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
@@ -294,8 +300,8 @@ private[sql] case class AlterTableDropColumns(
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
       val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
-        .readSchemaFile(tableMetadataFile)
+      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+        .carbonMetastore.readSchemaFile(tableMetadataFile)
       // maintain the deleted columns for schema evolution history
       var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala
@@ -349,7 +355,9 @@ private[sql] case class AlterTableDataTypeChange(
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     // get the latest carbon table and check for column existence
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
     var lastUpdatedTime = 0L
     try {
       locks = AlterTableUtil
@@ -375,7 +383,7 @@ private[sql] case class AlterTableDataTypeChange(
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
       val tableMetadataFile = carbonTablePath.getSchemaFilePath
-      val tableInfo: TableInfo = CarbonEnv.get.carbonMetastore
+      val tableInfo: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
         .readSchemaFile(tableMetadataFile)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 95f6639..2786620 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -30,13 +30,13 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     // DropHiveDB command will fail if cascade is false and one or more table exists in database
     val rows = command.run(sparkSession)
     if (command.cascade) {
-      val tablesInDB = CarbonEnv.get.carbonMetastore.getAllTables()
+      val tablesInDB = CarbonEnv.getInstance(sparkSession).carbonMetastore.getAllTables()
         .filterNot(_.database.exists(_.equalsIgnoreCase(dbName)))
       tablesInDB.foreach { tableName =>
         CarbonDropTableCommand(true, Some(dbName), tableName.table).run(sparkSession)
       }
     }
-    CarbonEnv.get.carbonMetastore.dropDatabaseDirectory(dbName)
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.dropDatabaseDirectory(dbName)
     rows
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 55148d2..8afadf0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable,
+ShowLoadsCommand, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -33,11 +34,12 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
       case LoadDataCommand(identifier, path, isLocal, isOverwrite, partition)
-        if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) =>
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(identifier)(sparkSession) =>
         ExecutedCommandExec(LoadTable(identifier.database, identifier.table.toLowerCase, path,
           Seq(), Map(), isOverwrite)) :: Nil
       case alter@AlterTableRenameCommand(oldTableIdentifier, newTableIdentifier, _) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(oldTableIdentifier)(
             sparkSession)
         if (isCarbonTable) {
@@ -47,7 +49,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           ExecutedCommandExec(alter) :: Nil
         }
       case DropTableCommand(identifier, ifNotExists, isView, _)
-        if CarbonEnv.get.carbonMetastore
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
           .isTablePathExists(identifier)(sparkSession) =>
         ExecutedCommandExec(
           CarbonDropTableCommand(ifNotExists, identifier.database,
@@ -55,15 +57,15 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommandExec(ShowLoads(databaseName, table.toLowerCase, limit, plan.output)) :: Nil
       case InsertIntoCarbonTable(relation: CarbonDatasourceHadoopRelation,
-        _, child: LogicalPlan, _, _) =>
+      _, child: LogicalPlan, _, _) =>
         ExecutedCommandExec(LoadTableByInsert(relation, child)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
-        CarbonEnv.get.carbonMetastore.createDatabaseDirectory(dbName)
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.createDatabaseDirectory(dbName)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil
       case alterTable@AlterTableCompaction(altertablemodel) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(altertablemodel.tableName,
             altertablemodel.dbName))(sparkSession)
         if (isCarbonTable) {
@@ -79,7 +81,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
             "Operation not allowed : " + altertablemodel.alterSql)
         }
       case dataTypeChange@AlterTableDataTypeChange(alterTableChangeDataTypeModel) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName,
             alterTableChangeDataTypeModel.databaseName))(sparkSession)
         if (isCarbonTable) {
@@ -88,7 +90,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case addColumn@AlterTableAddColumns(alterTableAddColumnsModel) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName,
             alterTableAddColumnsModel.databaseName))(sparkSession)
         if (isCarbonTable) {
@@ -97,7 +99,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case dropColumn@AlterTableDropColumns(alterTableDropColumnModel) =>
-        val isCarbonTable = CarbonEnv.get.carbonMetastore
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(TableIdentifier(alterTableDropColumnModel.tableName,
             alterTableDropColumnModel.databaseName))(sparkSession)
         if (isCarbonTable) {
@@ -106,7 +108,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
-        if CarbonEnv.get.carbonMetastore.tableExists(identifier)(sparkSession) && isFormatted =>
+        if
+        CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(identifier)(sparkSession) && isFormatted =>
         val resolvedTable =
           sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
         val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 37fac95..06ab47d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -57,7 +57,8 @@ import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil,
+DataTypeConverterUtil, GlobalDictionaryUtil}
 
 object Checker {
   def validateTableExists(
@@ -65,7 +66,7 @@ object Checker {
       tableName: String,
       session: SparkSession): Unit = {
     val identifier = TableIdentifier(tableName, dbName)
-    if (!CarbonEnv.get.carbonMetastore.tableExists(identifier)(session)) {
+    if (!CarbonEnv.getInstance(session).carbonMetastore.tableExists(identifier)(session)) {
       val err = s"table $dbName.$tableName not found"
       LogServiceFactory.getLogService(this.getClass.getName).error(err)
       throw new IllegalArgumentException(err)
@@ -85,20 +86,19 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
   def run(sparkSession: SparkSession): Seq[Row] = {
     val tableName = alterTableModel.tableName.toLowerCase
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
-    if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
-      LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
-      sys.error(s"alter table failed. table not found: $databaseName.$tableName")
-    }
-
     val relation =
-      CarbonEnv.get.carbonMetastore
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
         .lookupRelation(Option(databaseName), tableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
       sys.error(s"Table $databaseName.$tableName does not exist")
     }
-    val carbonLoadModel = new CarbonLoadModel()
+    if (null == CarbonMetadata.getInstance.getCarbonTable(databaseName + "_" + tableName)) {
+      LOGGER.error(s"alter table failed. table not found: $databaseName.$tableName")
+      sys.error(s"alter table failed. table not found: $databaseName.$tableName")
+    }
 
+    val carbonLoadModel = new CarbonLoadModel()
 
     val table = relation.tableMeta.carbonTable
     carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
@@ -138,7 +138,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
 case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
-    CarbonEnv.init(sparkSession)
+    CarbonEnv.getInstance(sparkSession).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     cm.databaseName = getDB.getDatabaseName(cm.databaseNameOp, sparkSession)
     val tbName = cm.tableName
@@ -161,7 +161,7 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
       }
     } else {
       // Add Database to catalog and persist
-      val catalog = CarbonEnv.get.carbonMetastore
+      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       // Need to fill partitioner class when we support partition
       val tablePath = catalog.createTableFromThrift(tableInfo, dbName, tbName)(sparkSession)
       if (createDSTable) {
@@ -171,15 +171,15 @@ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends Ru
           cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
           sparkSession.sql(
             s"""CREATE TABLE $dbName.$tbName
-               |(${fields.map(f => f.rawSchema).mkString(",")})
-               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
+                |(${ fields.map(f => f.rawSchema).mkString(",") })
+                |USING org.apache.spark.sql.CarbonSource""".stripMargin +
             s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath "$tablePath") """)
         } catch {
           case e: Exception =>
             val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
             // call the drop table to delete the created table.
 
-            CarbonEnv.get.carbonMetastore
+            CarbonEnv.getInstance(sparkSession).carbonMetastore
               .dropTable(catalog.storePath, identifier)(sparkSession)
 
             LOGGER.audit(s"Table creation with Database name [$dbName] " +
@@ -266,7 +266,7 @@ object LoadTable {
     // write TableInfo
     CarbonMetastore.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
 
-    val catalog = CarbonEnv.get.carbonMetastore
+    val catalog = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
 
     // upate the schema modified time
     catalog.updateSchemasUpdatedTime(catalog.touchSchemaFileSystemTime(
@@ -289,6 +289,7 @@ object LoadTable {
 case class LoadTableByInsert(relation: CarbonDatasourceHadoopRelation, child: LogicalPlan)
   extends RunnableCommand {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   def run(sparkSession: SparkSession): Seq[Row] = {
     val df = Dataset.ofRows(sparkSession, child)
     val header = relation.tableSchema.get.fields.map(_.name).mkString(",")
@@ -321,10 +322,12 @@ case class LoadTable(
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  private def checkDefaultValue(value: String, default: String) = if (StringUtils.isEmpty(value)) {
-    default
-  } else {
-    value
+  private def checkDefaultValue(value: String, default: String) = {
+    if (StringUtils.isEmpty(value)) {
+      default
+    } else {
+      value
+    }
   }
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -340,17 +343,17 @@ case class LoadTable(
     if (isOverwriteExist) {
       sys.error(s"Overwrite is not supported for carbon table with $dbName.$tableName")
     }
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
     if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
       LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
       LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    val relation = CarbonEnv.get.carbonMetastore
-      .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
     CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
@@ -386,8 +389,8 @@ case class LoadTable(
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
 
       val partitionLocation = relation.tableMeta.storePath + "/partition/" +
-          relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-          relation.tableMeta.carbonTableIdentifier.getTableName + "/"
+                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
+                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
 
 
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
@@ -439,7 +442,7 @@ case class LoadTable(
             true
           } else {
             LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
-              "can not be used together")
+                         "can not be used together")
             false
           }
         case "false" =>
@@ -679,7 +682,7 @@ case class CleanFiles(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val relation = CarbonEnv.get.carbonMetastore
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
     CarbonStore.cleanFiles(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
@@ -718,8 +721,10 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
     val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(carbonTableIdentifier, LockUsage.DROP_TABLE_LOCK)
-    val storePath = CarbonEnv.get.carbonMetastore.storePath
+    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val storePath = catalog.storePath
     var isLocked = false
+    catalog.checkSchemasModifiedTimeAndReloadTables()
     try {
       isLocked = carbonLock.lockWithRetries()
       if (isLocked) {
@@ -730,13 +735,15 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
         sys.error("Table is locked for deletion. Please try after some time")
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      val carbonTable = CarbonEnv.get.carbonMetastore.getTableFromMetadata(dbName, tableName)
+      val carbonTable = catalog
+        .getTableFromMetadata(dbName, tableName)
         .map(_.carbonTable).getOrElse(null)
       if (null != carbonTable) {
         // clear driver B-tree and dictionary cache
         ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
       }
-      CarbonEnv.get.carbonMetastore.dropTable(storePath, identifier)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .dropTable(storePath, identifier)(sparkSession)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } finally {
       if (carbonLock != null && isLocked) {
@@ -771,7 +778,7 @@ private[sql] case class DescribeCommandFormatted(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val relation = CarbonEnv.get.carbonMetastore
+    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
     val mapper = new ObjectMapper()
     val colProps = StringBuilder.newBuilder

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 38c7f34..687afc4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,22 +16,98 @@
  */
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
 import org.apache.spark.sql.execution.command.DDLStrategy
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.SparkOptimizer
-import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
-import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.processing.merger.TableMeta
+
+/**
+ * This class will have carbon catalog and refresh the relation from cache if the carbontable in
+ * carbon catalog is not same as cached carbon relation's carbon table
+ *
+ * @param externalCatalog
+ * @param globalTempViewManager
+ * @param sparkSession
+ * @param functionResourceLoader
+ * @param functionRegistry
+ * @param conf
+ * @param hadoopConf
+ */
+class CarbonSessionCatalog(
+    externalCatalog: HiveExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
+    sparkSession: SparkSession,
+    functionResourceLoader: FunctionResourceLoader,
+    functionRegistry: FunctionRegistry,
+    conf: SQLConf,
+    hadoopConf: Configuration)
+  extends HiveSessionCatalog(
+    externalCatalog,
+    globalTempViewManager,
+    sparkSession,
+    functionResourceLoader,
+    functionRegistry,
+    conf,
+    hadoopConf) {
+
+  lazy val carbonEnv = {
+    val env = new CarbonEnv
+    env.init(sparkSession)
+    env
+  }
+
+  /**
+   * This method will invalidate carbonrelation from cache if carbon table is updated in
+   * carbon catalog
+   *
+   * @param name
+   * @param alias
+   * @return
+   */
+  override def lookupRelation(name: TableIdentifier,
+      alias: Option[String]): LogicalPlan = {
+    super.lookupRelation(name, alias) match {
+      case SubqueryAlias(_,
+          LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _),
+          _) =>
+        refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
+        refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation)
+      case relation => relation
+    }
+  }
+
+  private def refreshRelationFromCache(name: TableIdentifier,
+      alias: Option[String],
+      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): LogicalPlan = {
+    carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTables
+    carbonEnv.carbonMetastore
+      .getTableFromMetadata(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
+        carbonDatasourceHadoopRelation.carbonTable.getFactTableName) match {
+      case tableMeta: TableMeta =>
+        if (tableMeta.carbonTable.getTableLastUpdatedTime !=
+            carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) {
+          refreshTable(name)
+        }
+      case _ => refreshTable(name)
+    }
+    super.lookupRelation(name, alias)
+  }
+}
 
 /**
  * Session state implementation to override sql parser and adding strategies
@@ -66,6 +142,20 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
         PreWriteCheck(conf, catalog))
     }
   }
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  override lazy val catalog = {
+    new CarbonSessionCatalog(
+      sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+      sparkSession.sharedState.globalTempViewManager,
+      sparkSession,
+      functionResourceLoader,
+      functionRegistry,
+      conf,
+      newHadoopConf())
+  }
 }
 
 class CarbonOptimizer(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index f5248f5..052edce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -54,7 +54,7 @@ object AlterTableUtil {
       LOGGER: LogService)
     (sparkSession: SparkSession): List[ICarbonLock] = {
     val relation =
-      CarbonEnv.get.carbonMetastore
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
         .lookupRelation(Option(dbName), tableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
@@ -158,13 +158,13 @@ object AlterTableUtil {
       thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getFactTableName
-    CarbonEnv.get.carbonMetastore
+    CarbonEnv.getInstance(sparkSession).carbonMetastore
       .updateTableSchema(carbonTable.getCarbonTableIdentifier,
         thriftTable,
         schemaEvolutionEntry,
         carbonTable.getStorePath)(sparkSession)
     val tableIdentifier = TableIdentifier(tableName, Some(dbName))
-    val schema = CarbonEnv.get.carbonMetastore
+    val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession).schema.json
     val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema)
     catalog.client.runSqlHive(
@@ -209,28 +209,30 @@ object AlterTableUtil {
       lastUpdatedTime: Long)
     (sparkSession: SparkSession): Unit = {
     val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val carbonTable: CarbonTable = CarbonMetadata.getInstance
-      .getCarbonTable(database + "_" + newTableName)
+    val carbonTable: CarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(database), newTableName)(sparkSession).asInstanceOf[CarbonRelation]
+      .tableMeta.carbonTable
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
     val fileType = FileFactory.getFileType(tableMetadataFile)
-    val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore
-      .readSchemaFile(tableMetadataFile)
+    val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+      .carbonMetastore.readSchemaFile(tableMetadataFile)
     val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime > lastUpdatedTime) {
-      LOGGER.error(s"Reverting changes for $database.${oldTableIdentifier.table}")
+      LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
       FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
         .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                      oldTableIdentifier.table)
       val tableIdentifier = new CarbonTableIdentifier(database,
         oldTableIdentifier.table,
         carbonTable.getCarbonTableIdentifier.getTableId)
-      CarbonEnv.get.carbonMetastore.revertTableSchema(tableIdentifier,
+      CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
         tableInfo,
         carbonTable.getStorePath)(sparkSession)
-      CarbonEnv.get.carbonMetastore.removeTableFromMetadata(database, newTableName)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .removeTableFromMetadata(database, newTableName)
     }
   }
 
@@ -244,13 +246,14 @@ object AlterTableUtil {
    */
   def revertAddColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
     (sparkSession: SparkSession): Unit = {
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
-
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
 
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+    val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .readSchemaFile(tableMetadataFile)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
@@ -258,8 +261,9 @@ object AlterTableUtil {
       LOGGER.info(s"Reverting changes for $dbName.$tableName")
       val addedSchemas = evolutionEntryList.get(evolutionEntryList.size() - 1).added
       thriftTable.fact_table.table_columns.removeAll(addedSchemas)
-      CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
-        thriftTable, carbonTable.getStorePath)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+          thriftTable, carbonTable.getStorePath)(sparkSession)
     }
   }
 
@@ -273,11 +277,13 @@ object AlterTableUtil {
    */
   def revertDropColumnChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
     (sparkSession: SparkSession): Unit = {
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+    val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .readSchemaFile(tableMetadataFile)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
@@ -291,8 +297,9 @@ object AlterTableUtil {
           }
         }
       }
-      CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
-        thriftTable, carbonTable.getStorePath)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+          thriftTable, carbonTable.getStorePath)(sparkSession)
     }
   }
 
@@ -306,11 +313,13 @@ object AlterTableUtil {
    */
   def revertDataTypeChanges(dbName: String, tableName: String, lastUpdatedTime: Long)
     (sparkSession: SparkSession): Unit = {
-    val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
+      .carbonTable
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
       carbonTable.getCarbonTableIdentifier)
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
-    val thriftTable: TableInfo = CarbonEnv.get.carbonMetastore
+    val thriftTable: TableInfo = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .readSchemaFile(tableMetadataFile)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
@@ -327,8 +336,9 @@ object AlterTableUtil {
           }
         }
       }
-      CarbonEnv.get.carbonMetastore.revertTableSchema(carbonTable.getCarbonTableIdentifier,
-        thriftTable, carbonTable.getStorePath)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore
+        .revertTableSchema(carbonTable.getCarbonTableIdentifier,
+          thriftTable, carbonTable.getStorePath)(sparkSession)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index e72abd7..b9a6708 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -43,7 +43,7 @@ object CleanFiles {
     val storePath = TableAPIUtil.escape(args(0))
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
-    CarbonEnv.init(spark)
+    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     cleanFiles(spark, dbName, tableName, storePath)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 0d85062..d78fd5f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -56,7 +56,7 @@ object Compaction {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
-    CarbonEnv.init(spark)
+    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 6219f1e..7815417 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -43,7 +43,7 @@ object DeleteSegmentByDate {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
-    CarbonEnv.init(spark)
+    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 303a062..65b76b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -48,7 +48,7 @@ object DeleteSegmentById {
     val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
-    CarbonEnv.init(spark)
+    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index c7286ee..d918381 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -75,7 +75,7 @@ object ShowSegments {
       None
     }
     val spark = TableAPIUtil.spark(storePath, s"ShowSegments: $dbName.$tableName")
-    CarbonEnv.init(spark)
+    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     val rows = showSegments(spark, dbName, tableName, limit)
     System.out.println(showString(rows))
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index cb444d6..a57ab10 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -62,7 +62,8 @@ object TableAPIUtil {
       spark: SparkSession,
       dbName: String,
       tableName: String): Unit = {
-    if (!CarbonEnv.get.carbonMetastore.tableExists(tableName, Some(dbName))(spark)) {
+    if (!CarbonEnv.getInstance(spark).carbonMetastore
+      .tableExists(tableName, Some(dbName))(spark)) {
       val err = s"table $dbName.$tableName not found"
       LOGGER.error(err)
       throw new MalformedCarbonCommandException(err)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/049b8aac/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index 324d3a2..82d8da2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -80,7 +80,7 @@ object TableLoader {
 
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
-    CarbonEnv.init(spark)
+    CarbonEnv.getInstance(spark).carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 


[2/2] incubator-carbondata git commit: [CARBONDATA-925] Synchronize schema meta data for concurrent drivers and users.This closes #792

Posted by gv...@apache.org.
[CARBONDATA-925] Synchronize schema meta data for concurrent drivers and users.This closes #792


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9c575bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9c575bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9c575bf7

Branch: refs/heads/master
Commit: 9c575bf76cefaf7e67f686a66428d8d39b641765
Parents: 804af93 049b8aa
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Thu Apr 20 20:42:52 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Apr 20 20:42:52 2017 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  26 ++---
 .../spark/sql/CarbonDataFrameWriter.scala       |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  16 ++-
 .../spark/sql/CarbonDictionaryDecoder.scala     |  12 ++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  34 +++++--
 .../org/apache/spark/sql/CarbonSession.scala    |   2 -
 .../org/apache/spark/sql/CarbonSource.scala     |  36 +++----
 .../execution/CarbonLateDecodeStrategy.scala    |   5 +-
 .../execution/command/AlterTableCommands.scala  |  40 +++++---
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../sql/execution/command/DDLStrategy.scala     |  26 +++--
 .../execution/command/carbonTableSchema.scala   |  71 +++++++------
 .../spark/sql/hive/CarbonSessionState.scala     | 102 +++++++++++++++++--
 .../org/apache/spark/util/AlterTableUtil.scala  |  56 +++++-----
 .../org/apache/spark/util/CleanFiles.scala      |   2 +-
 .../org/apache/spark/util/Compaction.scala      |   2 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |   2 +-
 .../apache/spark/util/DeleteSegmentById.scala   |   2 +-
 .../org/apache/spark/util/ShowSegments.scala    |   2 +-
 .../org/apache/spark/util/TableAPIUtil.scala    |   3 +-
 .../org/apache/spark/util/TableLoader.scala     |   2 +-
 21 files changed, 292 insertions(+), 155 deletions(-)
----------------------------------------------------------------------