You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2020/02/06 04:14:52 UTC

[carbondata] branch master updated: [CARBONDATA-3668] Fix compile issue of CarbonSessionCatalog for spark 2.4

This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new eff4aa3  [CARBONDATA-3668] Fix compile issue of CarbonSessionCatalog for spark 2.4
eff4aa3 is described below

commit eff4aa39c1b0c5e929fa55471afd48d6cab9555b
Author: QiangCai <qi...@qq.com>
AuthorDate: Thu Feb 6 09:18:02 2020 +0800

    [CARBONDATA-3668] Fix compile issue of CarbonSessionCatalog for spark 2.4
    
    Why is this PR needed?
    There is a compile issue of CarbonSessionCatalog for spark 2.4
    
    What changes were proposed in this PR?
    CarbonHiveSessionCatalog and CarbonSessionStateBuilder adapte spark 2.3 and 2.4
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3605
---
 .../spark/sql/hive/CarbonSessionCatalogUtil.scala  | 190 +--------------------
 .../sql/hive/CarbonSessionStateBuilder.scala}      | 177 ++-----------------
 .../sql/hive/CarbonSessionStateBuilder.scala}      | 171 ++-----------------
 3 files changed, 29 insertions(+), 509 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index dd2d751..3b4d580 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -17,20 +17,16 @@
 
 package org.apache.spark.sql.hive
 
-import java.util.concurrent.Callable
-
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Analyzer
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.{CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
 
@@ -183,181 +179,3 @@ object CarbonSessionCatalogUtil {
     storage.copy(locationUri = Some(path.toUri))
   }
 }
-
-/**
- * This class will have carbon catalog and refresh the relation from cache if the carbontable in
- * carbon catalog is not same as cached carbon relation's carbon table
- *
- * @param externalCatalog
- * @param globalTempViewManager
- * @param sparkSession
- * @param functionResourceLoader
- * @param functionRegistry
- * @param conf
- * @param hadoopConf
- */
-class CarbonHiveSessionCatalog(
-    externalCatalog: HiveExternalCatalog,
-    globalTempViewManager: GlobalTempViewManager,
-    functionRegistry: FunctionRegistry,
-    sparkSession: SparkSession,
-    conf: SQLConf,
-    hadoopConf: Configuration,
-    parser: ParserInterface,
-    functionResourceLoader: FunctionResourceLoader)
-  extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
-    new HiveMetastoreCatalog(sparkSession),
-    functionRegistry,
-    conf,
-    hadoopConf,
-    parser,
-    functionResourceLoader
-  ) with CarbonSessionCatalog {
-
-  private lazy val carbonEnv = {
-    val env = new CarbonEnv
-    env.init(sparkSession)
-    env
-  }
-
-  /**
-   * return's the carbonEnv instance
-   * @return
-   */
-  override def getCarbonEnv() : CarbonEnv = {
-    carbonEnv
-  }
-
-  // Initialize all listeners to the Operation bus.
-  CarbonEnv.init
-
-  override def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    var rtnRelation = super.lookupRelation(name)
-    val isRelationRefreshed =
-      CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
-    if (isRelationRefreshed) {
-      rtnRelation = super.lookupRelation(name)
-      // Reset the stats after lookup.
-      CarbonSessionUtil.refreshRelationAndSetStats(rtnRelation, name)(sparkSession)
-    }
-    rtnRelation
-  }
-
-  override def getCachedPlan(t: QualifiedTableName,
-      c: Callable[LogicalPlan]): LogicalPlan = {
-    val plan = super.getCachedPlan(t, c)
-    CarbonSessionUtil.updateCachedPlan(plan)
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  override def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
-    CarbonSessionCatalogUtil.getClient(sparkSession)
-  }
-
-  override def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  override def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]]): Unit = {
-    CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
-      tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession, identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  override def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    CarbonSessionCatalogUtil.updateStorageLocation(path, storage, newTableName, dbName)
-  }
-}
-
-/**
- * Session state implementation to override sql parser and adding strategies
- *
- * @param sparkSession
- */
-class CarbonSessionStateBuilder(sparkSession: SparkSession,
-    parentState: Option[SessionState] = None)
-  extends HiveSessionStateBuilder(sparkSession, parentState) {
-
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
-
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule)
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  /**
-   * Create a [[CarbonSessionStateBuilder]].
-   */
-  override protected lazy val catalog: CarbonHiveSessionCatalog = {
-    val catalog = new CarbonHiveSessionCatalog(
-      externalCatalog,
-      session.sharedState.globalTempViewManager,
-      functionRegistry,
-      sparkSession,
-      conf,
-      SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),
-      sqlParser,
-      resourceLoader)
-    parentState.foreach(_.catalog.copyStateTo(catalog))
-    catalog
-  }
-
-  private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
-
-  /**
-   * Create a Hive aware resource loader.
-   */
-  override protected lazy val resourceLoader: HiveSessionResourceLoader = {
-    val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
-  }
-
-  override protected def analyzer: Analyzer = {
-    new CarbonAnalyzer(catalog,
-      conf,
-      sparkSession,
-      super.analyzer)
-  }
-}
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
similarity index 52%
copy from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
copy to integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index dd2d751..9ff36bc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ b/integration/spark2/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -21,169 +21,21 @@ import java.util.concurrent.Callable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
-object CarbonSessionCatalogUtil {
-
-  /**
-   * Method used to update the table name
-   * @param oldTableIdentifier old table identifier
-   * @param newTableIdentifier new table identifier
-   * @param newTablePath new table path
-   */
-  def alterTableRename(
-      oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String,
-      sparkSession: SparkSession,
-      isExternal: Boolean
-  ): Unit = {
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='TRUE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
-      s"RENAME TO ${ newTableIdentifier.database.get }.${ newTableIdentifier.table }")
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ newTableIdentifier.database.get }.${ newTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='FALSE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ newTableIdentifier.database.get }.${ newTableIdentifier.table } " +
-      s"SET SERDEPROPERTIES" +
-      s"('tableName'='${ newTableIdentifier.table }', " +
-      s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
-  }
-
-  /**
-   * Below method will be used to update serd properties
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    getClient(sparkSession)
-      .runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ tableIdentifier.table } " +
-                  s"SET TBLPROPERTIES(${ schemaParts })")
-  }
-
-  def alterTableProperties(
-      sparkSession: SparkSession,
-      tableIdentifier: TableIdentifier,
-      properties: Map[String, String],
-      propKeys: Seq[String]
-  ): Unit = {
-    val catalog = sparkSession.sessionState.catalog
-    val table = catalog.getTableMetadata(tableIdentifier)
-    var newProperties = table.storage.properties
-    if (!propKeys.isEmpty) {
-      val updatedPropKeys = propKeys.map(_.toLowerCase)
-      newProperties = newProperties.filter { case (k, _) => !updatedPropKeys.contains(k) }
-    }
-    if (!properties.isEmpty) {
-      newProperties = newProperties ++ CarbonSparkSqlParserUtil.normalizeProperties(properties)
-    }
-    val newTable = table.copy(
-      storage = table.storage.copy(properties = newProperties)
-    )
-    catalog.alterTable(newTable)
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(sparkSession: SparkSession): org.apache.spark.sql.hive.client.HiveClient = {
-    //    For Spark2.2 we need to use unified Spark thrift server instead of carbon thrift
-    //    server. CarbonSession is not available anymore so HiveClient is created directly
-    //    using sparkSession.sharedState which internally contains all required carbon rules,
-    //    optimizers pluged-in through SessionStateBuilder in spark-defaults.conf.
-    //    spark.sql.session.state.builder=org.apache.spark.sql.hive.CarbonSessionStateBuilder
-    CarbonToSparkAdapter.getHiveExternalCatalog(sparkSession).client
-  }
-
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  /**
-   * This method alters table to set serde properties and updates the catalog table with new updated
-   * schema for all the alter operations like add column, drop column, change datatype or rename
-   * column
-   * @param tableIdentifier
-   * @param schemaParts
-   * @param cols
-   */
-  private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    alterTable(tableIdentifier, schemaParts, cols, sparkSession)
-    CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
-      tableIdentifier, cols, schemaParts, sparkSession)
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table
@@ -315,13 +167,9 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
 
   override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
 
-  experimentalMethods.extraStrategies =
-    Seq(new StreamingTableStrategy(sparkSession),
-      new CarbonLateDecodeStrategy,
-      new DDLStrategy(sparkSession)
-    )
-  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule,
-    new CarbonUDFTransformRule)
+  experimentalMethods.extraStrategies =Seq(new StreamingTableStrategy(sparkSession),
+      new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))
+  experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule, new CarbonUDFTransformRule)
 
   /**
    * Internal catalog for managing table and database states.
@@ -354,10 +202,7 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
     new HiveSessionResourceLoader(session, client)
   }
 
-  override protected def analyzer: Analyzer = {
-    new CarbonAnalyzer(catalog,
-      conf,
-      sparkSession,
-      super.analyzer)
-  }
+  override protected def analyzer: Analyzer =
+    new CarbonAnalyzer(catalog, conf, sparkSession, super.analyzer)
 }
+
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
similarity index 53%
copy from integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
copy to integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index dd2d751..33438e4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ b/integration/spark2/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -21,169 +21,21 @@ import java.util.concurrent.Callable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, SparkSession}
-import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, ExternalCatalogWithListener, FunctionResourceLoader, GlobalTempViewManager}
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
-import org.apache.spark.sql.parser.{CarbonSparkSqlParser, CarbonSparkSqlParserUtil}
+import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
-object CarbonSessionCatalogUtil {
-
-  /**
-   * Method used to update the table name
-   * @param oldTableIdentifier old table identifier
-   * @param newTableIdentifier new table identifier
-   * @param newTablePath new table path
-   */
-  def alterTableRename(
-      oldTableIdentifier: TableIdentifier,
-      newTableIdentifier: TableIdentifier,
-      newTablePath: String,
-      sparkSession: SparkSession,
-      isExternal: Boolean
-  ): Unit = {
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='TRUE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " +
-      s"RENAME TO ${ newTableIdentifier.database.get }.${ newTableIdentifier.table }")
-    if (!isExternal) {
-      getClient(sparkSession).runSqlHive(
-        s"ALTER TABLE ${ newTableIdentifier.database.get }.${ newTableIdentifier.table } " +
-        s"SET TBLPROPERTIES('EXTERNAL'='FALSE')")
-    }
-    getClient(sparkSession).runSqlHive(
-      s"ALTER TABLE ${ newTableIdentifier.database.get }.${ newTableIdentifier.table } " +
-      s"SET SERDEPROPERTIES" +
-      s"('tableName'='${ newTableIdentifier.table }', " +
-      s"'dbName'='${ newTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')")
-  }
-
-  /**
-   * Below method will be used to update serd properties
-   * @param tableIdentifier table identifier
-   * @param schemaParts schema parts
-   * @param cols cols
-   */
-  def alterTable(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    getClient(sparkSession)
-      .runSqlHive(s"ALTER TABLE ${ tableIdentifier.database.get }.${ tableIdentifier.table } " +
-                  s"SET TBLPROPERTIES(${ schemaParts })")
-  }
-
-  def alterTableProperties(
-      sparkSession: SparkSession,
-      tableIdentifier: TableIdentifier,
-      properties: Map[String, String],
-      propKeys: Seq[String]
-  ): Unit = {
-    val catalog = sparkSession.sessionState.catalog
-    val table = catalog.getTableMetadata(tableIdentifier)
-    var newProperties = table.storage.properties
-    if (!propKeys.isEmpty) {
-      val updatedPropKeys = propKeys.map(_.toLowerCase)
-      newProperties = newProperties.filter { case (k, _) => !updatedPropKeys.contains(k) }
-    }
-    if (!properties.isEmpty) {
-      newProperties = newProperties ++ CarbonSparkSqlParserUtil.normalizeProperties(properties)
-    }
-    val newTable = table.copy(
-      storage = table.storage.copy(properties = newProperties)
-    )
-    catalog.alterTable(newTable)
-  }
-
-  /**
-   * returns hive client from HiveExternalCatalog
-   *
-   * @return
-   */
-  def getClient(sparkSession: SparkSession): org.apache.spark.sql.hive.client.HiveClient = {
-    //    For Spark2.2 we need to use unified Spark thrift server instead of carbon thrift
-    //    server. CarbonSession is not available anymore so HiveClient is created directly
-    //    using sparkSession.sharedState which internally contains all required carbon rules,
-    //    optimizers pluged-in through SessionStateBuilder in spark-defaults.conf.
-    //    spark.sql.session.state.builder=org.apache.spark.sql.hive.CarbonSessionStateBuilder
-    CarbonToSparkAdapter.getHiveExternalCatalog(sparkSession).client
-  }
-
-  def alterAddColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  def alterDropColumns(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  def alterColumnChangeDataTypeOrRename(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]], sparkSession: SparkSession): Unit = {
-    updateCatalogTableForAlter(tableIdentifier, schemaParts, cols, sparkSession)
-  }
-
-  /**
-   * This method alters table to set serde properties and updates the catalog table with new updated
-   * schema for all the alter operations like add column, drop column, change datatype or rename
-   * column
-   * @param tableIdentifier
-   * @param schemaParts
-   * @param cols
-   */
-  private def updateCatalogTableForAlter(tableIdentifier: TableIdentifier,
-      schemaParts: String,
-      cols: Option[Seq[ColumnSchema]],
-      sparkSession: SparkSession): Unit = {
-    alterTable(tableIdentifier, schemaParts, cols, sparkSession)
-    CarbonSessionUtil.alterExternalCatalogForTableWithUpdatedSchema(
-      tableIdentifier, cols, schemaParts, sparkSession)
-  }
-
-  /**
-   * This is alternate way of getting partition information. It first fetches all partitions from
-   * hive and then apply filter instead of querying hive along with filters.
-   * @param partitionFilters
-   * @param sparkSession
-   * @param identifier
-   * @return
-   */
-  def getPartitionsAlternate(
-      partitionFilters: Seq[Expression],
-      sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
-  }
-
-  /**
-   * Update the storageformat with new location information
-   */
-  def updateStorageLocation(
-      path: Path,
-      storage: CatalogStorageFormat,
-      newTableName: String,
-      dbName: String): CatalogStorageFormat = {
-    storage.copy(locationUri = Some(path.toUri))
-  }
-}
-
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table
@@ -206,8 +58,8 @@ class CarbonHiveSessionCatalog(
     parser: ParserInterface,
     functionResourceLoader: FunctionResourceLoader)
   extends HiveSessionCatalog (
-    externalCatalog,
-    globalTempViewManager,
+    () => externalCatalog,
+    () => globalTempViewManager,
     new HiveMetastoreCatalog(sparkSession),
     functionRegistry,
     conf,
@@ -344,14 +196,19 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession,
   }
 
   private def externalCatalog: HiveExternalCatalog =
-    session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+    session
+      .sharedState
+      .externalCatalog
+      .asInstanceOf[ExternalCatalogWithListener]
+      .unwrapped
+      .asInstanceOf[HiveExternalCatalog]
 
   /**
    * Create a Hive aware resource loader.
    */
   override protected lazy val resourceLoader: HiveSessionResourceLoader = {
     val client: HiveClient = externalCatalog.client.newSession()
-    new HiveSessionResourceLoader(session, client)
+    new HiveSessionResourceLoader(session, () => client)
   }
 
   override protected def analyzer: Analyzer = {