You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/23 10:32:02 UTC
carbondata git commit: [CARBONDATA-1797] Segment_Index compaction
should take compaction lock to support concurrent scenarios better
Repository: carbondata
Updated Branches:
refs/heads/master 8432860c1 -> a9347b86c
[CARBONDATA-1797] Segment_Index compaction should take compaction lock to support concurrent scenarios better
SEGMENT_INDEX compaction is not taking compaction lock. While concurrent operation, compaction may be successful but the output may not be as expected.
This closes #1553
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a9347b86
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a9347b86
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a9347b86
Branch: refs/heads/master
Commit: a9347b86cfd7e6f3e68daa6f46182d75e3f8943e
Parents: 8432860
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Nov 22 17:11:42 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Nov 23 16:01:19 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/CarbonDataRDDFactory.scala | 10 ++++++++++
.../management/AlterTableCompactionCommand.scala | 18 ++++++++++--------
2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9347b86/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0619851..284587d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -95,6 +95,16 @@ object CarbonDataRDDFactory {
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
s".${ carbonLoadModel.getTableName }")
try {
+ if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+ // Just launch job to merge index and return
+ CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+ CarbonDataMergerUtil.getValidSegmentList(
+ carbonTable.getAbsoluteTableIdentifier).asScala,
+ carbonLoadModel.getTablePath,
+ carbonTable, true)
+ lock.unlock()
+ return
+ }
startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a9347b86/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 6e11fe4..2cd771c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -139,14 +139,6 @@ case class AlterTableCompactionCommand(
if (null == carbonLoadModel.getLoadMetadataDetails) {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
- if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
- // Just launch job to merge index and return
- CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
- CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
- carbonLoadModel.getTablePath,
- carbonTable, true)
- return
- }
// reading the start time of data load.
val loadStartTime : Long =
if (alterTableModel.factTimeStamp.isEmpty) {
@@ -192,6 +184,16 @@ case class AlterTableCompactionCommand(
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
+ if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+ // Just launch job to merge index and return
+ CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+ CarbonDataMergerUtil.getValidSegmentList(
+ carbonTable.getAbsoluteTableIdentifier).asScala,
+ carbonLoadModel.getTablePath,
+ carbonTable, true)
+ lock.unlock()
+ return
+ }
CarbonDataRDDFactory.startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,