You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:24 UTC

[17/50] [abbrv] carbondata git commit: [CARBONDATA-2064] Add compaction listener

[CARBONDATA-2064] Add compaction listener

This closes #1847


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/54a381c2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/54a381c2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/54a381c2

Branch: refs/heads/branch-1.3
Commit: 54a381c27024ece07d400a4a1d36917bd3ca09f9
Parents: 1202e20
Author: dhatchayani <dh...@gmail.com>
Authored: Tue Jan 23 15:26:26 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Feb 1 22:20:33 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 -
 .../hadoop/api/CarbonOutputCommitter.java       |  32 ++--
 .../sdv/generated/MergeIndexTestCase.scala      |  30 ++--
 .../CarbonIndexFileMergeTestCase.scala          |  48 +++---
 .../dataload/TestGlobalSortDataLoad.scala       |   2 +-
 .../StandardPartitionTableLoadingTestCase.scala |   5 -
 .../carbondata/events/AlterTableEvents.scala    |  14 +-
 .../spark/rdd/CarbonMergeFilesRDD.scala         |  84 ----------
 .../carbondata/spark/util/CommonUtil.scala      |  51 ------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  14 --
 .../spark/rdd/CarbonTableCompactor.scala        |   2 -
 .../CarbonAlterTableCompactionCommand.scala     | 165 +++++++++----------
 .../sql/execution/strategy/DDLStrategy.scala    |  17 --
 .../CarbonGetTableDetailComandTestCase.scala    |   6 +-
 .../processing/loading/events/LoadEvents.java   |  12 ++
 .../processing/merger/CompactionType.java       |   1 -
 16 files changed, 155 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 77e8db8..7ae3034 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1478,13 +1478,6 @@ public final class CarbonCommonConstants {
 
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
-  /**
-   * It is internal configuration and used only for test purpose.
-   * It will merge the carbon index files with in the segment to single segment.
-   */
-  public static final String CARBON_MERGE_INDEX_IN_SEGMENT = "carbon.merge.index.in.segment";
-
-  public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
 
   public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler";
   /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 9cca1bb..555ddd2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -25,18 +25,15 @@ import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonSessionInfo;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.events.OperationContext;
 import org.apache.carbondata.events.OperationListenerBus;
 import org.apache.carbondata.processing.loading.events.LoadEvents;
@@ -126,7 +123,16 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
         }
       }
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
-      mergeCarbonIndexFiles(segmentPath);
+      if (operationContext != null) {
+        LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent =
+            new LoadEvents.LoadTableMergePartitionEvent(segmentPath);
+        try {
+          OperationListenerBus.getInstance()
+              .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
       String updateTime =
           context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
       String segmentsToBeDeleted =
@@ -158,24 +164,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
   }
 
   /**
-   * Merge index files to a new single file.
-   */
-  private void mergeCarbonIndexFiles(String segmentPath) throws IOException {
-    boolean mergeIndex = false;
-    try {
-      mergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
-    } catch (Exception e) {
-      mergeIndex = Boolean.parseBoolean(
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT);
-    }
-    if (mergeIndex) {
-      new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
-    }
-  }
-
-  /**
    * Update the tablestatus as fail if any fail happens.
    *
    * @param context

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index cb0d02c..8e71257 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
 /**
  * Test Class for AlterTableTestCase to verify all scenerios
@@ -40,34 +41,30 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
   override protected def afterAll(): Unit = {
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
   }
 
   test("Verify correctness of index merge sdv") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
 sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS carbon_automation_merge")
     sql(s"""create table carbon_automation_merge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADP
 artitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
 
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
 
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
     assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
       sql("""Select count(*) from carbon_automation_merge"""))
   }
 
   test("Verify command of index merge  sdv") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -77,17 +74,18 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect()
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
-    sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
   }
 
   test("Verify index index merge with compaction  sdv") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(s"""drop table if exists carbon_automation_nonmerge""").collect
 
     sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
 ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -99,9 +97,11 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index c66107f..895b0b5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -20,12 +20,11 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 
 class CarbonIndexFileMergeTestCase
   extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -40,13 +39,9 @@ class CarbonIndexFileMergeTestCase
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
   }
 
   test("Verify correctness of index merge") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -57,8 +52,6 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS indexmerge")
     sql(
       """
@@ -68,14 +61,16 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
   }
 
   test("Verify command of index merge") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -90,17 +85,18 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
-    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   test("Verify command of index merge without enabling property") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -115,15 +111,18 @@ class CarbonIndexFileMergeTestCase
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   test("Verify index index merge with compaction") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -141,16 +140,16 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
   test("Verify index index merge for compacted segments") {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql(
       """
@@ -172,7 +171,10 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
+    val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+    new CarbonIndexFileMergeWriter()
+      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 50a38f1..0d9e0fd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -273,7 +273,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
     val segmentDir = carbonTablePath.getSegmentDir("0", "0")
-    assertResult(Math.max(4, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+    assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
   }
 
   test("Query with small files") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 31d2598..16f252b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -319,8 +319,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
   }
 
   test("merge carbon index disable data loading for partition table for three partition column") {
-    CarbonProperties.getInstance.addProperty(
-      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
     sql(
       """
         | CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp,
@@ -340,9 +338,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     val files = carbonFile.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName)
     })
-    CarbonProperties.getInstance.addProperty(
-      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
     assert(files.length == 10)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index ca1948a..671e132 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -17,7 +17,7 @@
 package org.apache.carbondata.events
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
+import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -203,3 +203,15 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
     carbonTable: CarbonTable,
     carbonMergerMapping: CarbonMergerMapping,
     mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
+
+
+/**
+ * Compaction Event for handling exception in compaction
+ *
+ * @param sparkSession
+ * @param carbonTable
+ * @param alterTableModel
+ */
+case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
deleted file mode 100644
index 1087ea7..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
-
-case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentPath: String)
-  extends Partition {
-
-  override val index: Int = idx
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
- * @param sc
- * @param tablePath
- * @param segments segments to be merged
- */
-class CarbonMergeFilesRDD(
-    sc: SparkContext,
-    tablePath: String,
-    segments: Seq[String],
-    readFileFooterFromCarbonDataFile: Boolean)
-  extends CarbonRDD[String](sc, Nil) {
-
-  override def getPartitions: Array[Partition] = {
-    segments.zipWithIndex.map {s =>
-      CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
-    }.toArray
-  }
-
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
-    val iter = new Iterator[String] {
-      val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
-      logInfo("Merging carbon index files of segment : " + split.segmentPath)
-
-      new CarbonIndexFileMergeWriter()
-        .mergeCarbonIndexFilesOfSegment(split.segmentPath, readFileFooterFromCarbonDataFile)
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): String = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        ""
-      }
-
-    }
-    iter
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d96a051..b44a0fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -55,7 +55,6 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
 
 object CommonUtil {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -891,54 +890,4 @@ object CommonUtil {
     (Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
   }
 
-  /**
-   * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
-   *
-   * @param sparkContext
-   * @param segmentIds
-   * @param tablePath
-   * @param carbonTable
-   * @param mergeIndexProperty
-   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
-   *                                         file. This will used in case of upgrade from version
-   *                                         which do not store the blocklet info to current version
-   */
-  def mergeIndexFiles(sparkContext: SparkContext,
-      segmentIds: Seq[String],
-      tablePath: String,
-      carbonTable: CarbonTable,
-      mergeIndexProperty: Boolean,
-      readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
-    if (mergeIndexProperty) {
-      new CarbonMergeFilesRDD(
-        sparkContext,
-        carbonTable.getTablePath,
-        segmentIds,
-        readFileFooterFromCarbonDataFile).collect()
-    } else {
-      try {
-        CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT).toBoolean
-        if (CarbonProperties.getInstance().getProperty(
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
-          new CarbonMergeFilesRDD(
-            sparkContext,
-            carbonTable.getTablePath,
-            segmentIds,
-            readFileFooterFromCarbonDataFile).collect()
-        }
-      } catch {
-        case _: Exception =>
-          if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
-            new CarbonMergeFilesRDD(
-              sparkContext,
-              carbonTable.getTablePath,
-              segmentIds,
-              readFileFooterFromCarbonDataFile).collect()
-          }
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3de0e70..5c43d58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -103,18 +103,6 @@ object CarbonDataRDDFactory {
       LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
           s".${ carbonLoadModel.getTableName }")
       try {
-        if (compactionType == CompactionType.SEGMENT_INDEX) {
-          // Just launch job to merge index and return
-          CommonUtil.mergeIndexFiles(
-            sqlContext.sparkContext,
-            CarbonDataMergerUtil.getValidSegmentList(
-              carbonTable.getAbsoluteTableIdentifier).asScala,
-            carbonLoadModel.getTablePath,
-            carbonTable,
-            true)
-          lock.unlock()
-          return
-        }
         startCompactionThreads(
           sqlContext,
           carbonLoadModel,
@@ -359,8 +347,6 @@ object CarbonDataRDDFactory {
           } else {
             loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
           }
-          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
           val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
           if (status.nonEmpty) {
             status.foreach { eachLoadStatus =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 8406d8d..bfe4e41 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -221,8 +221,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     if (finalMergeStatus) {
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
-      CommonUtil.mergeIndexFiles(
-        sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
       new PartitionMapFileStore().mergePartitionMapFiles(
         CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber),
         carbonLoadModel.getFactTimeStamp + "")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index fb0f9fe..2a77826 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,16 +34,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
+import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -90,52 +91,74 @@ case class CarbonAlterTableCompactionCommand(
       LogServiceFactory.getLogService(this.getClass.getName)
     val tableName = alterTableModel.tableName.toLowerCase
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
-    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
-    if (isLoadInProgress) {
-      val message = "Cannot run data loading and compaction on same table concurrently. " +
-                    "Please wait for load to finish"
-      LOGGER.error(message)
-      throw new ConcurrentOperationException(message)
-    }
-    val carbonLoadModel = new CarbonLoadModel()
-    carbonLoadModel.setTableName(table.getTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    carbonLoadModel.setTableName(table.getTableName)
-    carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTablePath(table.getTablePath)
-
-    var storeLocation = CarbonProperties.getInstance.getProperty(
-      CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
-      System.getProperty("java.io.tmpdir"))
-    storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-    // trigger event for compaction
-    val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
-      AlterTableCompactionPreEvent(sparkSession, table, null, null)
-    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+    operationContext.setProperty("compactionException", "true")
+    var compactionType: CompactionType = null
+    var compactionException = "true"
     try {
-      alterTableForCompaction(
-        sparkSession.sqlContext,
-        alterTableModel,
-        carbonLoadModel,
-        storeLocation,
-        operationContext)
+      compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     } catch {
-      case e: Exception =>
-        if (null != e.getMessage) {
-          CarbonException.analysisException(
-            s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
-        } else {
-          CarbonException.analysisException(
-            "Exception in compaction. Please check logs for more info.")
-        }
+      case _: Exception =>
+        val alterTableCompactionExceptionEvent: AlterTableCompactionExceptionEvent =
+          AlterTableCompactionExceptionEvent(sparkSession, table, alterTableModel)
+        OperationListenerBus.getInstance
+          .fireEvent(alterTableCompactionExceptionEvent, operationContext)
+        compactionException = operationContext.getProperty("compactionException").toString
+    }
+
+    if (compactionException.equalsIgnoreCase("true") && null == compactionType) {
+      throw new MalformedCarbonCommandException(
+        "Unsupported alter operation on carbon table")
+    } else if (compactionException.equalsIgnoreCase("false")) {
+      Seq.empty
+    } else {
+      val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
+      if (isLoadInProgress) {
+        val message = "Cannot run data loading and compaction on same table concurrently. " +
+                      "Please wait for load to finish"
+        LOGGER.error(message)
+        throw new ConcurrentOperationException(message)
+      }
+
+      val carbonLoadModel = new CarbonLoadModel()
+      carbonLoadModel.setTableName(table.getTableName)
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      // Need to fill dimension relation
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setTablePath(table.getTablePath)
+
+      var storeLocation = CarbonProperties.getInstance.getProperty(
+        CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+        System.getProperty("java.io.tmpdir"))
+      storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+      // trigger event for compaction
+      val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+        AlterTableCompactionPreEvent(sparkSession, table, null, null)
+      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+      try {
+        alterTableForCompaction(
+          sparkSession.sqlContext,
+          alterTableModel,
+          carbonLoadModel,
+          storeLocation,
+          operationContext)
+      } catch {
+        case e: Exception =>
+          if (null != e.getMessage) {
+            CarbonException.analysisException(
+              s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+          } else {
+            CarbonException.analysisException(
+              "Exception in compaction. Please check logs for more info.")
+          }
+      }
+      // trigger event for compaction
+      val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+        AlterTableCompactionPostEvent(sparkSession, table, null, null)
+      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+      Seq.empty
     }
-    // trigger event for compaction
-    val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
-      AlterTableCompactionPostEvent(sparkSession, table, null, null)
-    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
-    Seq.empty
   }
 
   private def alterTableForCompaction(sqlContext: SQLContext,
@@ -225,50 +248,14 @@ case class CarbonAlterTableCompactionCommand(
         LOGGER.info("Acquired the compaction lock for table" +
                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         try {
-          if (compactionType == CompactionType.SEGMENT_INDEX) {
-            // Just launch job to merge index and return
-            CommonUtil.mergeIndexFiles(
-              sqlContext.sparkContext,
-              CarbonDataMergerUtil.getValidSegmentList(
-                carbonTable.getAbsoluteTableIdentifier).asScala,
-              carbonLoadModel.getTablePath,
-              carbonTable,
-              mergeIndexProperty = true,
-              readFileFooterFromCarbonDataFile = true)
-
-            val carbonMergerMapping = CarbonMergerMapping(carbonTable.getTablePath,
-              carbonTable.getMetaDataFilepath,
-              "",
-              carbonTable.getDatabaseName,
-              carbonTable.getTableName,
-              Array(),
-              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-              compactionType,
-              maxSegmentColCardinality = null,
-              maxSegmentColumnSchemaList = null,
-              compactionModel.currentPartitions,
-              null)
-
-            // trigger event for compaction
-            val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
-              AlterTableCompactionPreStatusUpdateEvent(sqlContext.sparkSession,
-                carbonTable,
-                carbonMergerMapping,
-                carbonLoadModel,
-                "")
-            OperationListenerBus.getInstance
-              .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
-          lock.unlock()
-          } else {
-            CarbonDataRDDFactory.startCompactionThreads(
-              sqlContext,
-              carbonLoadModel,
-              storeLocation,
-              compactionModel,
-              lock,
-              operationContext
-            )
-          }
+          CarbonDataRDDFactory.startCompactionThreads(
+            sqlContext,
+            carbonLoadModel,
+            storeLocation,
+            compactionModel,
+            lock,
+            operationContext
+          )
         } catch {
           case e: Exception =>
             LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index b174b94..83831e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -100,24 +100,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           .tableExists(TableIdentifier(altertablemodel.tableName,
             altertablemodel.dbName))(sparkSession)
         if (isCarbonTable) {
-          var compactionType: CompactionType = null
-          try {
-            compactionType = CompactionType.valueOf(altertablemodel.compactionType.toUpperCase)
-          } catch {
-            case _: Exception =>
-              throw new MalformedCarbonCommandException(
-                "Unsupported alter operation on carbon table")
-          }
-          if (CompactionType.MINOR == compactionType ||
-              CompactionType.MAJOR == compactionType ||
-              CompactionType.SEGMENT_INDEX == compactionType ||
-              CompactionType.STREAMING == compactionType ||
-              CompactionType.CLOSE_STREAMING == compactionType) {
             ExecutedCommandExec(alterTable) :: Nil
-          } else {
-            throw new MalformedCarbonCommandException(
-              "Unsupported alter operation on carbon table")
-          }
         } else {
           throw new MalformedCarbonCommandException(
             "Operation not allowed : " + altertablemodel.alterSql)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 6265d0d..48733dc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
 
     assertResult(2)(result.length)
     assertResult("table_info1")(result(0).getString(0))
-    // 2143 is the size of carbon table
-    assertResult(2143)(result(0).getLong(1))
+    // 2096 is the size of carbon table
+    assertResult(2096)(result(0).getLong(1))
     assertResult("table_info2")(result(1).getString(0))
-    assertResult(2143)(result(1).getLong(1))
+    assertResult(2096)(result(1).getLong(1))
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 190c72c..a3fa292 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -181,4 +181,16 @@ public class LoadEvents {
       return carbonLoadModel;
     }
   }
+
+  public static class LoadTableMergePartitionEvent extends Event {
+    private String segmentPath;
+
+    public LoadTableMergePartitionEvent(String segmentPath) {
+      this.segmentPath = segmentPath;
+    }
+
+    public String getSegmentPath() {
+      return segmentPath;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 39f56a2..9ed87fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -27,7 +27,6 @@ public enum CompactionType {
     MAJOR,
     IUD_UPDDEL_DELTA,
     IUD_DELETE_DELTA,
-    SEGMENT_INDEX,
     STREAMING,
     CLOSE_STREAMING,
     NONE