You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:42 UTC
[46/50] [abbrv] carbondata git commit: [CARBONDATA-1777] Added check
to refresh table if catalog relation is present in plan
[CARBONDATA-1777] Added check to refresh table if catalog relation is present in plan
Analysis: In spark 2.2 while doing lookup relation there was no case to handle CatalogRelation due to which the tables were not getting refreshed in different sessions.
Solution: Add case for CatalogRelation so that the table being referred is refreshed.
This closes #1692
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7997f226
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7997f226
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7997f226
Branch: refs/heads/fgdatamap
Commit: 7997f226bab2bf78d50c31402df2c9cb62101b5f
Parents: f0123e7
Author: kunal642 <ku...@gmail.com>
Authored: Wed Dec 20 14:05:06 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Dec 23 21:29:10 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/CarbonEnv.scala | 2 +-
.../datamap/CarbonDropDataMapCommand.scala | 2 +-
.../CarbonAlterTableDropPartitionCommand.scala | 3 +-
.../CarbonAlterTableSplitPartitionCommand.scala | 2 +-
.../spark/sql/hive/CarbonFileMetastore.scala | 15 ++++-----
.../spark/sql/hive/CarbonHiveMetaStore.scala | 7 +++--
.../apache/spark/sql/hive/CarbonMetaStore.scala | 2 +-
.../org/apache/spark/util/CleanFiles.scala | 3 +-
.../org/apache/spark/util/Compaction.scala | 3 +-
.../apache/spark/util/DeleteSegmentByDate.scala | 7 +++--
.../apache/spark/util/DeleteSegmentById.scala | 4 +--
.../org/apache/spark/util/TableLoader.scala | 3 +-
.../src/main/spark2.1/CarbonSessionState.scala | 2 +-
.../src/main/spark2.2/CarbonSessionState.scala | 33 ++++++++++----------
14 files changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 6317177..d9c50d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -133,7 +133,7 @@ object CarbonEnv {
val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
val catalog = getInstance(sparkSession).carbonMetastore
// refresh cache
- catalog.checkSchemasModifiedTimeAndReloadTables()
+ catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, databaseNameOp))
// try to get it from catch, otherwise lookup in catalog
catalog.getTableFromMetadataCache(databaseName, tableName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 9a71523..7f68b05 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -62,7 +62,7 @@ case class CarbonDropDataMapCommand(
val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
val tableIdentifier =
AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
- catalog.checkSchemasModifiedTimeAndReloadTables()
+ catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
locksToBeAcquired foreach {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index fb515fa..d85e064 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.{Executors, ExecutorService, Future}
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.AlterTableUtil
@@ -60,7 +61,7 @@ case class CarbonAlterTableDropPartitionCommand(
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
val tablePath = relation.carbonTable.getTablePath
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 1a535fd..65a0af3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -67,7 +67,7 @@ case class CarbonAlterTableSplitPartitionCommand(
if (relation == null) {
sys.error(s"Table $dbName.$tableName does not exist")
}
- carbonMetaStore.checkSchemasModifiedTimeAndReloadTables()
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
sys.error(s"Alter table failed. table not found: $dbName.$tableName")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index ba222e2..719f571 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -440,7 +440,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
// in the other beeline need to update.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
removeTableFromMetadata(dbName, tableName)
updateSchemasUpdatedTime(touchSchemaFileSystemTime())
@@ -504,19 +504,20 @@ class CarbonFileMetastore extends CarbonMetaStore {
.getLastModifiedTime
}
- def checkSchemasModifiedTimeAndReloadTables() {
+ def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {
val (timestampFile, timestampFileType) = getTimestampFileAndType()
+ var isRefreshed = false
if (FileFactory.isFileExist(timestampFile, timestampFileType)) {
if (!(FileFactory.getCarbonFile(timestampFile, timestampFileType).
getLastModifiedTime ==
tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
- refreshCache()
+ metadata.carbonTables = metadata.carbonTables.filterNot(
+ table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) &&
+ table.getDatabaseName.equalsIgnoreCase(tableIdentifier.database.getOrElse("default")))
+ isRefreshed = true
}
}
- }
-
- private def refreshCache() {
- metadata.carbonTables.clear()
+ isRefreshed
}
override def isReadFromHiveMetaStore: Boolean = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 066cb1c..d5ac5ae 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -75,7 +75,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
@@ -84,8 +84,9 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
}
- override def checkSchemasModifiedTimeAndReloadTables() {
- // do nothing now
+ override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {
+ // do nothing
+ false
}
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index cc0e6ab..7dc3d3f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -136,7 +136,7 @@ trait CarbonMetaStore {
def updateAndTouchSchemasUpdatedTime()
- def checkSchemasModifiedTimeAndReloadTables()
+ def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean
def isReadFromHiveMetaStore: Boolean
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index c05c0f1..eba7dcd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -18,6 +18,7 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.api.CarbonStore
@@ -58,7 +59,7 @@ object CleanFiles {
}
val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
cleanFiles(spark, dbName, tableName, storePath, forceTableClean)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 200a926..d4ec81e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -17,6 +17,7 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
import org.apache.spark.sql.util.CarbonException
@@ -58,7 +59,7 @@ object Compaction {
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"Compaction: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
compaction(spark, dbName, tableName, compactionType)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index d682b21..fcc9f2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- package org.apache.spark.util
+package org.apache.spark.util
- import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.api.CarbonStore
@@ -45,7 +46,7 @@ object DeleteSegmentByDate {
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentByDate: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
deleteSegmentByDate(spark, dbName, tableName, dateValue)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 5b58c8d..13ef933 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.api.CarbonStore
@@ -51,7 +51,7 @@ object DeleteSegmentById {
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"DeleteSegmentById: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
deleteSegmentById(spark, dbName, tableName, segmentIds)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index d439cb1..80f781e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -24,6 +24,7 @@ import scala.collection.{immutable, mutable}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -81,7 +82,7 @@ object TableLoader {
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
CarbonEnv.getInstance(spark).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
loadTable(spark, Option(dbName), tableName, inputPaths, map)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index b55a6aa..a6f28c9 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -108,7 +108,7 @@ class CarbonSessionCatalog(
var isRefreshed = false
val storePath = CarbonProperties.getStorePath
carbonEnv.carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables()
+ checkSchemasModifiedTimeAndReloadTable(identifier)
val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7997f226/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index c951e5e..a722cbf 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.{SQLConf, SessionState}
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule}
import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser}
import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -90,23 +91,15 @@ class CarbonSessionCatalog(
}
- private def refreshRelationFromCache(identifier: TableIdentifier,
- carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
+ private def refreshRelationFromCache(identifier: TableIdentifier): Boolean = {
var isRefreshed = false
- val storePath = CarbonProperties.getStorePath
- carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTables()
-
- val table = carbonEnv.carbonMetastore.getTableFromMetadataCache(
- carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
- carbonDatasourceHadoopRelation.carbonTable.getTableName)
- if (table.isEmpty || (table.isDefined && table.get.getTableLastUpdatedTime !=
- carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+ if (carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) {
refreshTable(identifier)
DataMapStoreManager.getInstance().
- clearDataMaps(AbsoluteTableIdentifier.from(storePath,
+ clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath,
identifier.database.getOrElse("default"), identifier.table))
- isRefreshed = true
logInfo(s"Schema changes have been detected for table: $identifier")
+ isRefreshed = true
}
isRefreshed
}
@@ -117,10 +110,18 @@ class CarbonSessionCatalog(
var toRefreshRelation = false
rtnRelation match {
case SubqueryAlias(_,
- LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _)) =>
- toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
- case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) =>
- toRefreshRelation = refreshRelationFromCache(name, carbonDatasourceHadoopRelation)
+ LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) =>
+ toRefreshRelation = refreshRelationFromCache(name)
+ case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) =>
+ toRefreshRelation = refreshRelationFromCache(name)
+ case SubqueryAlias(_, relation) if
+ relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+ relation.getClass.getName.equals(
+ "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") =>
+ val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta",
+ relation).asInstanceOf[CatalogTable]
+ toRefreshRelation = refreshRelationFromCache(catalogTable.identifier)
case _ =>
}