You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/18 06:00:10 UTC

carbondata git commit: [CARBONDATA-2470] Refactor AlterTableCompactionPostStatusUpdateEvent usage in compaction flow

Repository: carbondata
Updated Branches:
  refs/heads/master 15ab6b066 -> b88c09707


[CARBONDATA-2470] Refactor AlterTableCompactionPostStatusUpdateEvent usage in compaction flow

AlterTableCompactionPostStatusUpdateEvent is a generic event fired after COMPACTION
but it is controlled only by the preaggregate listener. If the CommitPreAggregateListener
sets the commitComplete property to true, this event will not be fired for the next iteration.
For each and every compacted segments this must be fired.

This closes #2295


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b88c0970
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b88c0970
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b88c0970

Branch: refs/heads/master
Commit: b88c097075b2ef859d25db27da08435d66a814a2
Parents: 15ab6b0
Author: dhatchayani <dh...@gmail.com>
Authored: Thu May 10 19:07:53 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Fri May 18 11:28:37 2018 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonTableCompactor.scala        | 26 ++----
 .../preaaggregate/PreAggregateListeners.scala   | 88 ++++++++++++--------
 2 files changed, 59 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b88c0970/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 199b7a3..155bdd1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -270,26 +270,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         carbonMergerMapping,
         carbonLoadModel,
         mergedLoadName)
-      val commitComplete = try {
-        // Once main table compaction is done and 0.1, 4.1, 8.1 is created commit will happen for
-        // all the tables. The commit listener will compact the child tables until no more segments
-        // are left. But 2nd level compaction is yet to happen on the main table therefore again the
-        // compaction flow will try to commit the child tables which is wrong. This check tell the
-        // 2nd level compaction flow that the commit for datamaps is already done.
-        val isCommitDone = operationContext.getProperty("commitComplete")
-        if (isCommitDone != null) {
-          isCommitDone.toString.toBoolean
-        } else {
-          OperationListenerBus.getInstance()
-            .fireEvent(compactionLoadStatusPostEvent, operationContext)
-          true
-        }
-      } catch {
-        case ex: Exception =>
-          LOGGER.error(ex, "Problem while committing data maps")
-          false
+      OperationListenerBus.getInstance()
+        .fireEvent(compactionLoadStatusPostEvent, operationContext)
+      val commitDone = operationContext.getProperty("commitComplete")
+      val commitComplete = if (null != commitDone) {
+        commitDone.toString.toBoolean
+      } else {
+        true
       }
-      operationContext.setProperty("commitComplete", commitComplete)
       // here either of the conditions can be true, when delete segment is fired after compaction
       // has started, statusFileUpdation will be false , but at the same time commitComplete can be
       // true because compaction for all datamaps will be finished at a time to the maximum level

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b88c0970/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 5e11884..a41f78c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -512,6 +512,14 @@ object CommitPreAggregateListener extends OperationEventListener with CommitHelp
       case loadEvent: LoadTablePostStatusUpdateEvent =>
         loadEvent.getCarbonLoadModel
       case compactionEvent: AlterTableCompactionPostStatusUpdateEvent =>
+        // Once main table compaction is done and 0.1, 4.1, 8.1 is created commit will happen for
+        // all the tables. The commit listener will compact the child tables until no more segments
+        // are left. But 2nd level compaction is yet to happen on the main table therefore again the
+        // compaction flow will try to commit the child tables which is wrong. This check tell the
+        // 2nd level compaction flow that the commit for datamaps is already done.
+        if (null != operationContext.getProperty("commitComplete")) {
+          return
+        }
         compactionEvent.carbonLoadModel
     }
     val isCompactionFlow = Option(
@@ -533,45 +541,53 @@ object CommitPreAggregateListener extends OperationEventListener with CommitHelp
             .asInstanceOf[CarbonLoadDataCommand]
         }
       }
-     if (dataMapSchemas.nonEmpty) {
-       val uuid = operationContext.getProperty("uuid").toString
-      // keep committing until one fails
-      val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
-        val childCarbonTable = childLoadCommand.table
-        // Generate table status file name with UUID, forExample: tablestatus_1
-        val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
-          childCarbonTable.getTablePath, uuid)
-        // Generate table status file name without UUID, forExample: tablestatus
-        val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
-          childCarbonTable.getTablePath)
-        renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
-      }
-      // if true then the commit for one of the child tables has failed
-      val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
-      if (commitFailed) {
-        LOGGER.warn("Reverting table status file to original state")
-        renamedDataMaps.foreach {
-          loadCommand =>
-            val carbonTable = loadCommand.table
-            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
-            val backupTableSchemaPath =
-              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
-            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
-            markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, carbonTable)
-            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+    var commitFailed = false
+    try {
+      if (dataMapSchemas.nonEmpty) {
+        val uuid = operationContext.getProperty("uuid").toString
+        // keep committing until one fails
+        val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
+          val childCarbonTable = childLoadCommand.table
+          // Generate table status file name with UUID, forExample: tablestatus_1
+          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+            childCarbonTable.getTablePath, uuid)
+          // Generate table status file name without UUID, forExample: tablestatus
+          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+            childCarbonTable.getTablePath)
+          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+        }
+        // if true then the commit for one of the child tables has failed
+        commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
+        if (commitFailed) {
+          LOGGER.warn("Reverting table status file to original state")
+          renamedDataMaps.foreach {
+            loadCommand =>
+              val carbonTable = loadCommand.table
+              // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+              val backupTableSchemaPath =
+                CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" +
+                uuid
+              val tableSchemaPath = CarbonTablePath
+                .getTableStatusFilePath(carbonTable.getTablePath)
+              markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, carbonTable)
+              renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+          }
+        }
+        // after success/failure of commit delete all tablestatus files with UUID in their names.
+        // if commit failed then remove the segment directory
+        cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
+          operationContext,
+          uuid)
+        operationContext.setProperty("commitComplete", !commitFailed)
+        if (commitFailed) {
+          sys.error("Failed to update table status for pre-aggregate table")
         }
       }
-      // after success/failure of commit delete all tablestatus files with UUID in their names.
-      // if commit failed then remove the segment directory
-      cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
-        operationContext,
-        uuid)
-      if (commitFailed) {
-        sys.error("Failed to update table status for pre-aggregate table")
-      }
+    } catch {
+      case e: Exception =>
+        operationContext.setProperty("commitComplete", false)
+        LOGGER.error(e, "Problem while committing data maps")
     }
-
-
   }
 }