You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/30 09:22:04 UTC

[14/35] carbondata git commit: [CARBONDATA-1484] Fixed driver cache issue

[CARBONDATA-1484] Fixed driver cache issue

Driver cache is not cleared when the table is dropped in one driver and queried in another driver.
This PR checks the modified time and refreshes the cache.

This closes #1415


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/311a5b7e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/311a5b7e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/311a5b7e

Branch: refs/heads/streaming_ingest
Commit: 311a5b7e31e98777cc32f27ecdcde86859eadaa5
Parents: 956833e
Author: ravipesala <ra...@gmail.com>
Authored: Fri Oct 13 22:08:49 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 16 16:51:01 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/CarbonFileMetastore.scala     | 10 ++++++----
 .../spark/sql/hive/CarbonSessionState.scala      | 19 +++++++++++++------
 2 files changed, 19 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/311a5b7e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 75ad4ae..16724fc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -442,7 +442,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @param timeStamp
    */
   private def updateSchemasUpdatedTime(timeStamp: Long) {
-    tableModifiedTimeStore.put("default", timeStamp)
+    tableModifiedTimeStore.put(CarbonCommonConstants.DATABASE_DEFAULT_NAME, timeStamp)
   }
 
   def updateAndTouchSchemasUpdatedTime(basePath: String) {
@@ -461,10 +461,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
       LOGGER.audit(s"Creating timestamp file for $basePath")
       FileFactory.createNewFile(timestampFile, timestampFileType)
     }
-    val systemTime = System.currentTimeMillis()
     FileFactory.getCarbonFile(timestampFile, timestampFileType)
-      .setLastModifiedTime(systemTime)
-    systemTime
+      .setLastModifiedTime(System.currentTimeMillis())
+    // since there is no guarantee that exact same set modified time returns when called
+    // lastmodified time, so better get the time from file.
+    FileFactory.getCarbonFile(timestampFile, timestampFileType)
+      .getLastModifiedTime
   }
 
   def checkSchemasModifiedTimeAndReloadTables(storePath: String) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/311a5b7e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 478b178..6892dad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -32,6 +32,9 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
  * carbon catalog is not same as cached carbon relation's carbon table
@@ -96,22 +99,26 @@ class CarbonSessionCatalog(
     }
   }
 
-  private def refreshRelationFromCache(name: TableIdentifier,
+  private def refreshRelationFromCache(identifier: TableIdentifier,
       alias: Option[String],
       carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = {
     var isRefreshed = false
+    val storePath = CarbonEnv.getInstance(sparkSession).storePath
     carbonEnv.carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(sparkSession).storePath)
+      checkSchemasModifiedTimeAndReloadTables(storePath)
 
     val tableMeta = carbonEnv.carbonMetastore
       .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName,
         carbonDatasourceHadoopRelation.carbonTable.getFactTableName)
-    if (tableMeta.isDefined &&
+    if (tableMeta.isEmpty || (tableMeta.isDefined &&
         tableMeta.get.carbonTable.getTableLastUpdatedTime !=
-          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) {
-      refreshTable(name)
+          carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) {
+      refreshTable(identifier)
+      DataMapStoreManager.getInstance().
+        clearDataMap(AbsoluteTableIdentifier.from(storePath,
+          identifier.database.getOrElse("default"), identifier.table))
       isRefreshed = true
-      logInfo(s"Schema changes have been detected for table: $name")
+      logInfo(s"Schema changes have been detected for table: $identifier")
     }
     isRefreshed
   }