You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/01/21 08:17:18 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4076: [CARBONDATA-4107] Added mvExists property for MV fact table and added lock while touchMDTFile

akashrn5 commented on a change in pull request #4076:
URL: https://github.com/apache/carbondata/pull/4076#discussion_r561671488



##########
File path: core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
##########
@@ -569,14 +568,31 @@ private synchronized void touchMDTFile() throws IOException {
         FileFactory.createDirectoryAndSetPermission(this.systemDirectory,
             new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
       }
-      CarbonFile schemaIndexFile = FileFactory.getCarbonFile(this.schemaIndexFilePath);
-      if (schemaIndexFile.exists()) {
-        schemaIndexFile.delete();
+      // two or more JVM process can access this method to update last modified time at same
+      // time causing exception. So take a system level lock on system folder and update
+      // last modified time of schema index file
+      ICarbonLock systemDirLock = CarbonLockFactory
+          .getSystemLevelCarbonLockObj(this.systemDirectory,
+              LockUsage.MATERIALIZED_VIEW_STATUS_LOCK);
+      boolean locked = false;
+      try {
+        locked = systemDirLock.lockWithRetries();
+        if (locked) {
+          CarbonFile schemaIndexFile = FileFactory.getCarbonFile(this.schemaIndexFilePath);
+          if (schemaIndexFile.exists()) {
+            schemaIndexFile.delete();
+          }
+          schemaIndexFile.createNewFile(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+          this.lastModifiedTime = schemaIndexFile.getLastModifiedTime();
+        } else {
+          LOG.warn("Unable to get Lock to refresh schema index last modified time");

Review comment:
       if lock acquire fails, just a warn, dont we need to fail? as it may lead to some other problem?

##########
File path: core/src/main/java/org/apache/carbondata/core/view/MVManager.java
##########
@@ -132,6 +108,31 @@ public boolean isMVInSyncWithParentTables(MVSchema mvSchema) throws IOException
     return schemas;
   }
 
+  /**
+   * It gives all mv schemas from given databases in the store
+   */
+  public List<MVSchema> getSchemas(Map<String, List<String>> mvTablesMap) throws IOException {
+    List<MVSchema> schemas = new ArrayList<>();
+    for (Map.Entry<String, List<String>> databaseEntry : mvTablesMap.entrySet()) {
+      String database = databaseEntry.getKey();
+      List<String> mvTables = databaseEntry.getValue();
+      for (String mvTable : mvTables) {
+        try {
+          schemas.add(this.getSchema(database, mvTable));
+        } catch (IOException ex) {
+          throw ex;

Review comment:
       please add a error log here, before throwing exception

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
##########
@@ -90,6 +95,37 @@ case class CarbonDropMVCommand(
           }
         }
 
+        // Update the mvExists and related databases property to mv fact tables
+        schema.getRelatedTables.asScala.foreach { table =>
+          val dbName = table.getDatabaseName
+          val tableName = table.getTableName
+          try {
+            val carbonTable =
+              CarbonEnv.getCarbonTable(Some(dbName), tableName)(session)
+            val relatedMVTablesMap = carbonTable.getMVTablesMap
+            // check if database has materialized views or not
+            val anotherMVExistsInDb = viewManager.getSchemas(databaseName).asScala.exists {
+              mvSchema =>
+                mvSchema.getRelatedTables.asScala.exists(_.getTableName.equalsIgnoreCase(tableName))
+            }
+            if (!anotherMVExistsInDb) {
+              //  If database dont have any MV, then remove the database from related tables
+              //  property and update table property of fact table
+              relatedMVTablesMap.get(databaseName).remove(name)
+              if (relatedMVTablesMap.get(databaseName).isEmpty) {
+                relatedMVTablesMap.remove(databaseName)
+              }
+              CarbonIndexUtil.addOrModifyTableProperty(carbonTable,
+                Map("relatedMVTablesMap".toLowerCase -> new Gson().toJson(relatedMVTablesMap)),
+                needLock = false)(session)
+              CarbonHiveIndexMetadataUtil.refreshTable(dbName, tableName, session)
+            }
+          } catch {
+            case _: Exception =>
+            // ignore as table is a non-carbon table

Review comment:
       this comment is wrong? this flow will be for carbon tables also right?
   

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
##########
@@ -122,6 +130,30 @@ case class CarbonCreateMVCommand(
 
     this.viewSchema = schema
 
+    // Update the mvExists and related databases property to mv fact tables
+    relatedTableList.asScala.foreach { parentTable =>
+      val dbName = parentTable.getDatabaseName
+      val tableName = parentTable.getTableName
+      try {
+        val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(session)
+        val relatedMVTablesMap = carbonTable.getMVTablesMap
+        if (!relatedMVTablesMap.containsKey(databaseName)) {
+          val mvTables = new util.ArrayList[String]()
+          mvTables.add(name)
+          relatedMVTablesMap.put(databaseName, mvTables)
+        } else if (!relatedMVTablesMap.get(databaseName).contains(name)) {
+          relatedMVTablesMap.get(databaseName).add(name)
+        }
+        CarbonIndexUtil.addOrModifyTableProperty(carbonTable,
+          Map("relatedMVTablesMap".toLowerCase -> new Gson().toJson(relatedMVTablesMap)),

Review comment:
       `relatedMVTablesMap` used in multiple places, please add to a constant




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org