You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/23 13:29:20 UTC

carbondata git commit: [CARBONDATA-1777] Added check to refresh table if catalog relation is present in plan

Repository: carbondata
Updated Branches:
  refs/heads/master f0123e795 -> 7997f226b


[CARBONDATA-1777] Added check to refresh table if catalog relation is present in plan

Analysis: In spark 2.2 while doing lookup relation there was no case to handle CatalogRelation due to which the tables were not getting refreshed in different sessions.

Solution: Add case for CatalogRelation so that the table being referred is refreshed.

This closes #1692


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7997f226
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7997f226
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7997f226

Branch: refs/heads/master
Commit: 7997f226bab2bf78d50c31402df2c9cb62101b5f
Parents: f0123e7
Author: kunal642 <ku...@gmail.com>
Authored: Wed Dec 20 14:05:06 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Dec 23 21:29:10 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  2 +-
 .../datamap/CarbonDropDataMapCommand.scala      |  2 +-
 .../CarbonAlterTableDropPartitionCommand.scala  |  3 +-
 .../CarbonAlterTableSplitPartitionCommand.scala |  2 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 15 ++++-----
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  7 +++--
 .../apache/spark/sql/hive/CarbonMetaStore.scala |  2 +-
 .../org/apache/spark/util/CleanFiles.scala      |  3 +-
 .../org/apache/spark/util/Compaction.scala      |  3 +-
 .../apache/spark/util/DeleteSegmentByDate.scala |  7 +++--
 .../apache/spark/util/DeleteSegmentById.scala   |  4 +--
 .../org/apache/spark/util/TableLoader.scala     |  3 +-
 .../src/main/spark2.1/CarbonSessionState.scala  |  2 +-
 .../src/main/spark2.2/CarbonSessionState.scala  | 33 ++++++++++----------
 14 files changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 6317177..d9c50d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -133,7 +133,7 @@ object CarbonEnv {
     val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
     val catalog = getInstance(sparkSession).carbonMetastore
     // refresh cache
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, databaseNameOp))
 
     // try to get it from catch, otherwise lookup in catalog
     catalog.getTableFromMetadataCache(databaseName, tableName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 9a71523..7f68b05 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -62,7 +62,7 @@ case class CarbonDropDataMapCommand(
     val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
     val tableIdentifier =
       AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index fb515fa..d85e064 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Executors, ExecutorService, Future}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.AlterTableUtil
@@ -60,7 +61,7 @@ case class CarbonAlterTableDropPartitionCommand(
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
     val tablePath = relation.carbonTable.getTablePath
-    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 1a535fd..65a0af3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -67,7 +67,7 @@ case class CarbonAlterTableSplitPartitionCommand(
     if (relation == null) {
       sys.error(s"Table $dbName.$tableName does not exist")
     }
-    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
       sys.error(s"Alter table failed. table not found: $dbName.$tableName")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index ba222e2..719f571 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -440,7 +440,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
       // in the other beeline need to update.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
 
       removeTableFromMetadata(dbName, tableName)
       updateSchemasUpdatedTime(touchSchemaFileSystemTime())
@@ -504,19 +504,20 @@ class CarbonFileMetastore extends CarbonMetaStore {
       .getLastModifiedTime
   }
 
-  def checkSchemasModifiedTimeAndReloadTables() {
+  def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {
     val (timestampFile, timestampFileType) = getTimestampFileAndType()
+    var isRefreshed = false
     if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
       if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
         getLastModifiedTime ==
             tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
-        refreshCache()
+        metadata.carbonTables = metadata.carbonTables.filterNot(
+          table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) &&
+        table.getDatabaseName.equalsIgnoreCase(tableIdentifier.database.getOrElse("default")))
+        isRefreshed = true
       }
     }
-  }
-
-  private def refreshCache() {
-    metadata.carbonTables.clear()
+    isRefreshed
   }
 
   override def isReadFromHiveMetaStore: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 066cb1c..d5ac5ae 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -75,7 +75,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       // clear driver B-tree and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
-    checkSchemasModifiedTimeAndReloadTables()
+    checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
@@ -84,8 +84,9 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
   }
 
-  override def checkSchemasModifiedTimeAndReloadTables() {
-    // do nothing now
+  override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {
+    // do nothing
+    false
   }
 
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index cc0e6ab..7dc3d3f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -136,7 +136,7 @@ trait CarbonMetaStore {
 
   def updateAndTouchSchemasUpdatedTime()
 
-  def checkSchemasModifiedTimeAndReloadTables()
+  def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean
 
   def isReadFromHiveMetaStore: Boolean
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index c05c0f1..eba7dcd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -58,7 +59,7 @@ object CleanFiles {
     }
     val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 200a926..d4ec81e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
 import org.apache.spark.sql.util.CarbonException
@@ -58,7 +59,7 @@ object Compaction {
     val compactionType = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     compaction(spark, dbName, tableName, compactionType)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index d682b21..fcc9f2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- package org.apache.spark.util
+package org.apache.spark.util
 
- import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -45,7 +46,7 @@ object DeleteSegmentByDate {
     val dateValue = TableAPIUtil.escape(args(2))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     deleteSegmentByDate(spark, dbName, tableName, dateValue)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 5b58c8d..13ef933 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.api.CarbonStore
 
@@ -51,7 +51,7 @@ object DeleteSegmentById {
     val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
     val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     deleteSegmentById(spark, dbName, tableName, segmentIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index d439cb1..80f781e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -24,6 +24,7 @@ import scala.collection.{immutable, mutable}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -81,7 +82,7 @@ object TableLoader {
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
     CarbonEnv.getInstance(spark).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     loadTable(spark, Option(dbName), tableName, inputPaths, map)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index b55a6aa..a6f28c9 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -108,7 +108,7 @@ class CarbonSessionCatalog(
     var isRefreshed = false
     val storePath = CarbonProperties.getStorePath
     carbonEnv.carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
+      checkSchemasModifiedTimeAndReloadTable(identifier)
 
     val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
       carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index c951e5e..a722cbf 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.{SQLConf, SessionState}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
 import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -90,23 +91,15 @@ class CarbonSessionCatalog(
   }
 
 
-  private def refreshRelationFromCache(identifier: TableIdentifier,
-      carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
+  private def refreshRelationFromCache(identifier: TableIdentifier): Boolean = {
     var isRefreshed = false
-    val storePath = CarbonProperties.getStorePath
-    carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
-
-    val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
-      carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
-      carbonDatasourceHadoopRelation.carbonTable.getTableName)
-    if (table.isEmpty || (table.isDefined && table.get.getTableLastUpdatedTime !=
-       carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+    if (carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) {
       refreshTable(identifier)
       DataMapStoreManager.getInstance().
-        clearDataMaps(AbsoluteTableIdentifier.from(storePath,
+        clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,
           identifier.database.getOrElse("default"), identifier.table))
-      isRefreshed = true
       logInfo(s"Schema changes have been detected for table: $identifier")
+      isRefreshed = true
     }
     isRefreshed
   }
@@ -117,10 +110,18 @@ class CarbonSessionCatalog(
     var toRefreshRelation = false
     rtnRelation match {
       case SubqueryAlias(_,
-      LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
-        toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
-      case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
-        toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
+      LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
+        toRefreshRelation = refreshRelationFromCache(name)
+      case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+        toRefreshRelation = refreshRelationFromCache(name)
+      case SubqueryAlias(_, relation) if
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+      relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+      relation.getClass.getName.equals(
+        "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
+        val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta",
+          relation).asInstanceOf[CatalogTable]
+        toRefreshRelation = refreshRelationFromCache(catalogTable.identifier)
       case _ =>
     }