You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/23 10:32:02 UTC

carbondata git commit: [CARBONDATA-1797] Segment_Index compaction should take compaction lock to support concurrent scenarios better

Repository: carbondata
Updated Branches:
  refs/heads/master 8432860c1 -> a9347b86c


[CARBONDATA-1797] Segment_Index compaction should take compaction lock to support concurrent scenarios better

SEGMENT_INDEX compaction is not taking compaction lock. While concurrent operation, compaction may be successful but the output may not be as expected.

This closes #1553


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9347b86
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9347b86
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9347b86

Branch: refs/heads/master
Commit: a9347b86cfd7e6f3e68daa6f46182d75e3f8943e
Parents: 8432860
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Nov 22 17:11:42 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Nov 23 16:01:19 2017 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataRDDFactory.scala          | 10 ++++++++++
 .../management/AlterTableCompactionCommand.scala  | 18 ++++++++++--------
 2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9347b86/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0619851..284587d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -95,6 +95,16 @@ object CarbonDataRDDFactory {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
           s".${ carbonLoadModel.getTableName }")
       try {
+        if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+          // Just launch job to merge index and return
+          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+            CarbonDataMergerUtil.getValidSegmentList(
+              carbonTable.getAbsoluteTableIdentifier).asScala,
+            carbonLoadModel.getTablePath,
+            carbonTable, true)
+          lock.unlock()
+          return
+        }
         startCompactionThreads(sqlContext,
           carbonLoadModel,
           storeLocation,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9347b86/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 6e11fe4..2cd771c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -139,14 +139,6 @@ case class AlterTableCompactionCommand(
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
-    if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
-      // Just launch job to merge index and return
-      CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-        CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
-        carbonLoadModel.getTablePath,
-        carbonTable, true)
-      return
-    }
     // reading the start time of data load.
     val loadStartTime : Long =
       if (alterTableModel.factTimeStamp.isEmpty) {
@@ -192,6 +184,16 @@ case class AlterTableCompactionCommand(
         LOGGER.info("Acquired the compaction lock for table" +
                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
+          if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+            // Just launch job to merge index and return
+            CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+              CarbonDataMergerUtil.getValidSegmentList(
+                carbonTable.getAbsoluteTableIdentifier).asScala,
+              carbonLoadModel.getTablePath,
+              carbonTable, true)
+            lock.unlock()
+            return
+          }
           CarbonDataRDDFactory.startCompactionThreads(sqlContext,
             carbonLoadModel,
             storeLocation,