You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 15:51:35 UTC

[1/2] carbondata git commit: [CARBONDATA-1526] [PreAgg] Added support to compact segments in pre-agg table

Repository: carbondata
Updated Branches:
  refs/heads/master 6dcf4eb95 -> 2304303ca


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index c602b0a..851b851 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableSchema}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -541,4 +541,21 @@ object PreAggregateUtil {
     }
   }
 
+  def createChildSelectQuery(tableSchema: TableSchema, databaseName: String): String = {
+    val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
+    val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
+    tableSchema.getListOfColumns.asScala.foreach {
+      a => if (a.getAggFunction.nonEmpty) {
+        aggregateColumns += s"${a.getAggFunction match {
+          case "count" => "sum"
+          case _ => a.getAggFunction}}(${a.getColumnName})"
+      } else {
+        groupingExpressions += a.getColumnName
+      }
+    }
+    s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
+    } from $databaseName.${ tableSchema.getTableName } group by ${
+      groupingExpressions.mkString(",") }"
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 3922e76..d7e9867 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -274,9 +274,16 @@ public final class CarbonLoaderUtil {
         if (loadStartEntry) {
           String segmentId =
               String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
-          newMetaEntry.setLoadName(segmentId);
           loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
-          loadModel.setSegmentId(segmentId);
+          // Segment id would be provided in case this is compaction flow for aggregate data map.
+          // If that is true then used the segment id as the load name.
+          if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !loadModel
+              .getSegmentId().isEmpty()) {
+            newMetaEntry.setLoadName(loadModel.getSegmentId());
+          } else {
+            newMetaEntry.setLoadName(segmentId);
+            loadModel.setSegmentId(segmentId);
+          }
           // Exception should be thrown if:
           // 1. If insert overwrite is in progress and any other load or insert operation
           // is triggered


[2/2] carbondata git commit: [CARBONDATA-1526] [PreAgg] Added support to compact segments in pre-agg table

Posted by ra...@apache.org.
[CARBONDATA-1526] [PreAgg] Added support to compact segments in pre-agg table

This PR will add to compact the pre-aggregate tables.

A pre-aggregate table can be compacted using the alter command i.e alter table table_name compact 'minor/major'.
If a table with some pre-aggregate table is compacted, then all the pre-aggregate tables are also compacted with the parent table

This closes #1605


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2304303c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2304303c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2304303c

Branch: refs/heads/master
Commit: 2304303ca4917b087159ae9888c8bddbb761b048
Parents: 6dcf4eb
Author: kunal642 <ku...@gmail.com>
Authored: Wed Nov 22 19:33:37 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 7 21:20:16 2017 +0530

----------------------------------------------------------------------
 .../TestPreAggregateCompaction.scala            | 181 ++++++++++++++
 .../spark/compaction/CompactionCallable.java    |  44 ----
 .../org/apache/carbondata/api/CarbonStore.scala |   6 +-
 .../carbondata/events/AlterTableEvents.scala    |  64 ++---
 .../org/apache/carbondata/events/Events.scala   |   4 +-
 .../apache/carbondata/events/LoadEvents.scala   |   8 +
 .../apache/carbondata/spark/rdd/Compactor.scala | 167 -------------
 .../spark/rdd/DataManagementFunc.scala          | 225 ------------------
 .../carbondata/spark/util/DataLoadingUtil.scala |  75 +++++-
 .../spark/rdd/AggregateDataMapCompactor.scala   | 118 +++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  34 +--
 .../spark/rdd/CarbonTableCompactor.scala        | 238 +++++++++++++++++++
 .../spark/rdd/CompactionFactory.scala           |  53 +++++
 .../apache/carbondata/spark/rdd/Compactor.scala |  63 +++++
 .../org/apache/spark/sql/CarbonSession.scala    |   4 +-
 .../management/CarbonLoadDataCommand.scala      |   6 +-
 .../preaaggregate/PreAggregateListeners.scala   |  38 ++-
 .../preaaggregate/PreAggregateUtil.scala        |  19 +-
 .../processing/util/CarbonLoaderUtil.java       |  11 +-
 19 files changed, 864 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
new file mode 100644
index 0000000..89cf8eb
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.Matchers._
+
+class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+  val testData = s"$resourcesPath/sample.csv"
+
+  override def beforeEach(): Unit = {
+    sql("drop database if exists compaction cascade")
+    sql("create database if not exists compaction")
+    sql("use compaction")
+    sql("create table testtable (id int, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+        .stripMargin)
+    sql(
+      s"""create datamap preagg_avg on table maintable using 'preaggregate' as select id,avg(age) from maintable group by id"""
+        .stripMargin)
+  }
+
+  test("test if pre-agg table is compacted with parent table minor compaction") {
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("insert into testtable select * from maintable")
+    val sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+    sql("alter table maintable compact 'minor'")
+    val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+    checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+    val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+    segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
+    checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
+  }
+
+  test("test if pre-agg table is compacted with parent table major compaction") {
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable compact 'major'")
+    sql("insert into testtable select * from maintable")
+    val sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+    sql("alter table maintable compact 'minor'")
+    val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+    checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+    val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+    segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
+    checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
+  }
+
+  test("test if 2nd level minor compaction is successful for pre-agg table") {
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable compact 'minor'")
+    var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+    sql("insert into testtable select * from maintable")
+    var sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    var avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+    checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+    var segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+    segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
+    checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable compact 'minor'")
+    sql("insert overwrite table testtable select * from maintable")
+    sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+    segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
+    checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+    segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+    segmentNamesAvg.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
+    checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable compact 'minor'")
+    sql("insert overwrite table testtable select * from maintable")
+    sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+    segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
+    checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+    segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+    segmentNamesAvg should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
+    checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult)
+  }
+
+  test("test direct minor compaction on pre-agg tables") {
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable_preagg_sum compact 'minor'")
+    var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+    sql("insert into testtable select * from maintable")
+    var sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable_preagg_sum compact 'minor'")
+    sql("insert overwrite table testtable select * from maintable")
+    sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
+    checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable_preagg_sum compact 'minor'")
+    sql("insert overwrite table testtable select * from maintable")
+    sumResult = sql("select id, sum(age) from testtable group by id").collect()
+    segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
+    checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+  }
+
+  test("test if minor/major compaction is successful for pre-agg table") {
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable_preagg_sum compact 'minor'")
+    var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum should equal (Array("3","2","1","0.1", "0"))
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql("alter table maintable_preagg_sum compact 'major'")
+    segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+    segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
+  }
+
+  override def afterAll(): Unit = {
+    sql("drop database if exists compaction cascade")
+    sql("use default")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
deleted file mode 100644
index 2773eef..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.compaction;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.spark.rdd.Compactor;
-
-import org.apache.spark.sql.execution.command.CompactionCallableModel;
-
-/**
- * Callable class which is used to trigger the compaction in a separate callable.
- */
-public class CompactionCallable implements Callable<Void> {
-
-  private final CompactionCallableModel compactionCallableModel;
-
-  public CompactionCallable(CompactionCallableModel compactionCallableModel) {
-
-    this.compactionCallableModel = compactionCallableModel;
-  }
-
-  @Override public Void call() throws Exception {
-
-    Compactor.triggerCompaction(compactionCallableModel);
-    return null;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index f4f569b..2b127e4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -22,9 +22,7 @@ import java.lang.Long
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types.TimestampType
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -37,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.DataManagementFunc
+import org.apache.carbondata.spark.util.DataLoadingUtil
 
 object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -127,7 +125,7 @@ object CarbonStore {
           FileFactory.getCarbonFile(absIdent.getTablePath,
             FileFactory.getFileType(absIdent.getTablePath)))
       } else {
-        DataManagementFunc.deleteLoadsAndUpdateMetadata(
+        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
           isForceDeletion = true, carbonTable)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 1a0c305..7caad43 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.events
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
 
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -29,7 +29,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
  * @param alterTableDropColumnModel
  * @param sparkSession
  */
-case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
+case class AlterTableDropColumnPreEvent(
+    carbonTable: CarbonTable,
     alterTableDropColumnModel: AlterTableDropColumnModel,
     sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
 
@@ -40,7 +41,9 @@ case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
  * @param carbonTable
  * @param alterTableDataTypeChangeModel
  */
-case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableDataTypeChangePreEvent(
+    sparkSession: SparkSession,
+    carbonTable: CarbonTable,
         alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
   extends Event with AlterTableDataTypeChangeEventInfo
 
@@ -50,7 +53,9 @@ case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTa
  * @param carbonTable
  * @param alterTableDataTypeChangeModel
  */
-case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableDataTypeChangePostEvent(
+    sparkSession: SparkSession,
+    carbonTable: CarbonTable,
     alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
   extends Event with AlterTableDataTypeChangeEventInfo
 
@@ -60,7 +65,8 @@ case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonT
  * @param alterTableDropColumnModel
  * @param sparkSession
  */
-case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable,
+case class AlterTableDropColumnPostEvent(
+    carbonTable: CarbonTable,
     alterTableDropColumnModel: AlterTableDropColumnModel,
     sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
 
@@ -71,7 +77,8 @@ case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable,
  * @param alterTableDropColumnModel
  * @param sparkSession
  */
-case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable,
+case class AlterTableDropColumnAbortEvent(
+    carbonTable: CarbonTable,
     alterTableDropColumnModel: AlterTableDropColumnModel,
     sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
 
@@ -83,7 +90,8 @@ case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable,
  * @param newTablePath
  * @param sparkSession
  */
-case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
+case class AlterTableRenamePreEvent(
+    carbonTable: CarbonTable,
     alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
     sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
 
@@ -92,7 +100,9 @@ case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
  * @param carbonTable
  * @param alterTableAddColumnsModel
  */
-case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableAddColumnPreEvent(
+    sparkSession: SparkSession,
+    carbonTable: CarbonTable,
     alterTableAddColumnsModel: AlterTableAddColumnsModel)
   extends Event with AlterTableAddColumnEventInfo
 
@@ -101,7 +111,9 @@ case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable:
  * @param carbonTable
  * @param alterTableAddColumnsModel
  */
-case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableAddColumnPostEvent(
+    sparkSession: SparkSession,
+    carbonTable: CarbonTable,
     alterTableAddColumnsModel: AlterTableAddColumnsModel)
   extends Event with AlterTableAddColumnEventInfo
 
@@ -113,7 +125,8 @@ case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable:
  * @param newTablePath
  * @param sparkSession
  */
-case class AlterTableRenamePostEvent(carbonTable: CarbonTable,
+case class AlterTableRenamePostEvent(
+    carbonTable: CarbonTable,
     alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
     sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
 
@@ -125,33 +138,29 @@ case class AlterTableRenamePostEvent(carbonTable: CarbonTable,
  * @param newTablePath
  * @param sparkSession
  */
-case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
+case class AlterTableRenameAbortEvent(
+    carbonTable: CarbonTable,
     alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
     sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
 
 
-/**
- *
- * @param carbonTable
- * @param carbonLoadModel
- * @param mergedLoadName
- * @param sQLContext
- */
-case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
-    carbonLoadModel: CarbonLoadModel,
+case class AlterTableCompactionPreEvent(
+    carbonTable: CarbonTable,
+    carbonMergerMapping: CarbonMergerMapping,
     mergedLoadName: String,
-    sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
+    sqlContext: SQLContext) extends Event with AlterTableCompactionEventInfo
 
 
 /**
  *
  * @param carbonTable
- * @param carbonLoadModel
+ * @param carbonMergerMapping
  * @param mergedLoadName
  * @param sQLContext
  */
-case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
-    carbonLoadModel: CarbonLoadModel,
+case class AlterTableCompactionPostEvent(
+    carbonTable: CarbonTable,
+    carbonMergerMapping: CarbonMergerMapping,
     mergedLoadName: String,
     sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
 
@@ -160,11 +169,12 @@ case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable
  * Class for handling clean up in case of any failure and abort the operation
  *
  * @param carbonTable
- * @param carbonLoadModel
+ * @param carbonMergerMapping
  * @param mergedLoadName
  * @param sQLContext
  */
-case class AlterTableCompactionAbortEvent(carbonTable: CarbonTable,
-    carbonLoadModel: CarbonLoadModel,
+case class AlterTableCompactionAbortEvent(
+    carbonTable: CarbonTable,
+    carbonMergerMapping: CarbonMergerMapping,
     mergedLoadName: String,
     sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 6279fca..4af337b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -18,7 +18,7 @@
 package org.apache.carbondata.events
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
 
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -95,7 +95,7 @@ trait AlterTableAddColumnEventInfo {
  */
 trait AlterTableCompactionEventInfo {
   val carbonTable: CarbonTable
-  val carbonLoadModel: CarbonLoadModel
+  val carbonMergerMapping: CarbonMergerMapping
   val mergedLoadName: String
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
index 12f2922..84dde84 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -45,6 +45,14 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
     carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
 
 /**
+ * Class for handling operations after data load completion and before final commit of load
+ * operation. Example usage: For loading pre-aggregate tables
+ */
+case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession,
+    carbonTableIdentifier: CarbonTableIdentifier,
+    carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
+
+/**
  * Class for handling clean up in case of any failure and abort the operation.
  */
 case class LoadTableAbortExecutionEvent(sparkSession: SparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
deleted file mode 100644
index e41211a..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events._
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Compactor class which handled the compaction cases.
- */
-object Compactor {
-
-  val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
-
-  def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
-
-    val carbonTable = compactionCallableModel.carbonTable
-    val loadsToMerge = compactionCallableModel.loadsToMerge
-    val sc = compactionCallableModel.sqlContext
-    val carbonLoadModel = compactionCallableModel.carbonLoadModel
-    val compactionType = compactionCallableModel.compactionType
-    val storePath = carbonLoadModel.getTablePath
-    val startTime = System.nanoTime()
-    val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
-    var finalMergeStatus = false
-    val databaseName: String = carbonLoadModel.getDatabaseName
-    val factTableName = carbonLoadModel.getTableName
-    val validSegments: Array[String] = CarbonDataMergerUtil
-      .getValidSegments(loadsToMerge).split(',')
-    val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
-    val carbonMergerMapping = CarbonMergerMapping(storePath,
-      carbonTable.getMetaDataFilepath,
-      mergedLoadName,
-      databaseName,
-      factTableName,
-      validSegments,
-      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-      compactionType,
-      maxSegmentColCardinality = null,
-      maxSegmentColumnSchemaList = null
-    )
-    carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
-    carbonLoadModel.setLoadMetadataDetails(
-      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
-    // trigger event for compaction
-    val operationContext = new OperationContext
-    val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
-      AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession,
-        carbonTable,
-        carbonLoadModel,
-        mergedLoadName,
-        sc)
-    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
-
-    var execInstance = "1"
-    // in case of non dynamic executor allocation, number of executors are fixed.
-    if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
-      execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
-      logger.info(s"spark.executor.instances property is set to = $execInstance")
-    } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
-    else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
-      if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
-        .equalsIgnoreCase("true")) {
-        execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
-        logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
-      }
-    }
-
-    val mergeStatus =
-    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
-      new CarbonIUDMergerRDD(
-        sc.sparkContext,
-        new MergeResultImpl(),
-        carbonLoadModel,
-        carbonMergerMapping,
-        execInstance
-      ).collect
-    } else {
-      new CarbonMergerRDD(
-        sc.sparkContext,
-        new MergeResultImpl(),
-        carbonLoadModel,
-        carbonMergerMapping,
-        execInstance
-      ).collect
-    }
-
-    if (mergeStatus.length == 0) {
-      finalMergeStatus = false
-    } else {
-      finalMergeStatus = mergeStatus.forall(_._2)
-    }
-
-    if (finalMergeStatus) {
-      val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
-      CommonUtil.mergeIndexFiles(
-        sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable, false)
-
-      // trigger event for compaction
-      val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
-        AlterTableCompactionPostEvent(compactionCallableModel.sqlContext.sparkSession,
-          carbonTable,
-          carbonLoadModel,
-          mergedLoadName,
-          sc)
-      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
-
-      val endTime = System.nanoTime()
-      logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
-      val statusFileUpdation =
-        ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
-          CarbonDataMergerUtil
-            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
-              carbonTable.getMetaDataFilepath,
-              carbonLoadModel)) ||
-         CarbonDataMergerUtil
-           .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
-             mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType)
-
-      if (!statusFileUpdation) {
-        logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
-                     s"${ carbonLoadModel.getTableName }")
-        logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
-                     s"${ carbonLoadModel.getTableName }")
-        throw new Exception(s"Compaction failed to update metadata for table" +
-                            s" ${ carbonLoadModel.getDatabaseName }." +
-                            s"${ carbonLoadModel.getTableName }")
-      } else {
-        logger.audit(s"Compaction request completed for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        logger.info(s"Compaction request completed for table " +
-                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      }
-    } else {
-      logger.audit(s"Compaction request failed for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
-      )
-      logger.error(s"Compaction request failed for table " +
-                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      throw new Exception("Compaction Failure in Merger Rdd.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
deleted file mode 100644
index 26a66f6..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.util
-import java.util.concurrent._
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders}
-import org.apache.carbondata.spark.compaction.CompactionCallable
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Common functions for data life cycle management
- */
-object DataManagementFunc {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  def executeCompaction(carbonLoadModel: CarbonLoadModel,
-      compactionModel: CompactionModel,
-      executor: ExecutorService,
-      sqlContext: SQLContext,
-      storeLocation: String): Unit = {
-    val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
-      carbonLoadModel.getLoadMetadataDetails
-    )
-    CarbonDataMergerUtil.sortSegments(sortedSegments)
-
-    var segList = carbonLoadModel.getLoadMetadataDetails
-    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-      carbonLoadModel,
-      compactionModel.compactionSize,
-      segList,
-      compactionModel.compactionType
-    )
-    while (loadsToMerge.size() > 1 ||
-           (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
-            loadsToMerge.size() > 0)) {
-      val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
-      deletePartialLoadsInCompaction(carbonLoadModel)
-      val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-        CarbonCommonConstants
-            .DEFAULT_COLLECTION_SIZE
-      )
-
-      scanSegmentsAndSubmitJob(futureList,
-        loadsToMerge,
-        executor,
-        sqlContext,
-        compactionModel,
-        carbonLoadModel
-      )
-
-      try {
-
-        futureList.asScala.foreach(future => {
-          future.get
-        }
-        )
-      } catch {
-        case e: Exception =>
-          LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
-          throw e
-      }
-
-      // scan again and determine if anything is there to merge again.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-      segList = carbonLoadModel.getLoadMetadataDetails
-      // in case of major compaction we will scan only once and come out as it will keep
-      // on doing major for the new loads also.
-      // excluding the newly added segments.
-      if (CompactionType.MAJOR == compactionModel.compactionType) {
-
-        segList = CarbonDataMergerUtil
-          .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
-      }
-
-      if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
-        loadsToMerge.clear()
-      } else if (segList.size > 0) {
-        loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-          carbonLoadModel,
-          compactionModel.compactionSize,
-          segList,
-          compactionModel.compactionType
-        )
-      }
-      else {
-        loadsToMerge.clear()
-      }
-    }
-  }
-
-  /**
-   * This will submit the loads to be merged into the executor.
-   */
-  private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
-      loadsToMerge: util.List[LoadMetadataDetails],
-      executor: ExecutorService,
-      sqlContext: SQLContext,
-      compactionModel: CompactionModel,
-      carbonLoadModel: CarbonLoadModel
-  ): Unit = {
-    loadsToMerge.asScala.foreach { seg =>
-      LOGGER.info("loads identified for merge is " + seg.getLoadName)
-    }
-
-    val compactionCallableModel = CompactionCallableModel(
-      carbonLoadModel,
-      compactionModel.carbonTable,
-      loadsToMerge,
-      sqlContext,
-      compactionModel.compactionType
-    )
-
-    val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel))
-    futureList.add(future)
-  }
-
-  def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
-    // Deleting the any partially loaded data if present.
-    // in some case the segment folder which is present in store will not have entry in
-    // status.
-    // so deleting those folders.
-    try {
-      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-            s" ${ e.getMessage }")
-    }
-  }
-
-  private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
-    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
-    if (details != null && details.nonEmpty) for (oneRow <- details) {
-      if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
-           SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
-          oneRow.getVisibility.equalsIgnoreCase("true")) {
-        return true
-      }
-    }
-    false
-  }
-
-  def deleteLoadsAndUpdateMetadata(
-      isForceDeletion: Boolean,
-      carbonTable: CarbonTable): Unit = {
-    if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
-      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val carbonTableStatusLock =
-        CarbonLockFactory.getCarbonLockObj(
-          absoluteTableIdentifier,
-          LockUsage.TABLE_STATUS_LOCK
-        )
-
-      // Delete marked loads
-      val isUpdationRequired =
-        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-          absoluteTableIdentifier,
-          isForceDeletion,
-          details
-        )
-
-      if (isUpdationRequired) {
-        try {
-          // Update load metadate file after cleaning deleted nodes
-          if (carbonTableStatusLock.lockWithRetries()) {
-            LOGGER.info("Table status lock has been successfully acquired.")
-
-            // read latest table status again.
-            val latestMetadata = SegmentStatusManager
-              .readLoadMetadata(carbonTable.getMetaDataFilepath)
-
-            // update the metadata details from old to new status.
-            val latestStatus = CarbonLoaderUtil
-                .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-            CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
-          } else {
-            val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
-            val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-            val errorMsg = "Clean files request is failed for " +
-                s"$dbName.$tableName" +
-                ". Not able to acquire the table status lock due to other operation " +
-                "running in the background."
-            LOGGER.audit(errorMsg)
-            LOGGER.error(errorMsg)
-            throw new Exception(errorMsg + " Please try after some time.")
-          }
-        } finally {
-          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 74ed6a6..69c9fe4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load.ValidateUtil
 
@@ -41,6 +43,8 @@ import org.apache.carbondata.spark.load.ValidateUtil
  */
 object DataLoadingUtil {
 
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   /**
    * get data loading options and initialise default value
    */
@@ -54,7 +58,6 @@ object DataLoadingUtil {
     optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
     optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
     optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
     optionsFinal.put(
       "serialization_null_format",
       options.getOrElse("serialization_null_format", "\\N"))
@@ -321,4 +324,70 @@ object DataLoadingUtil {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
   }
+
+  private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
+    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+    if (details != null && details.nonEmpty) for (oneRow <- details) {
+      if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
+           SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
+          oneRow.getVisibility.equalsIgnoreCase("true")) {
+        return true
+      }
+    }
+    false
+  }
+
+  def deleteLoadsAndUpdateMetadata(
+      isForceDeletion: Boolean,
+      carbonTable: CarbonTable): Unit = {
+    if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val carbonTableStatusLock =
+        CarbonLockFactory.getCarbonLockObj(
+          absoluteTableIdentifier,
+          LockUsage.TABLE_STATUS_LOCK
+        )
+
+      // Delete marked loads
+      val isUpdationRequired =
+        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+          absoluteTableIdentifier,
+          isForceDeletion,
+          details
+        )
+
+      if (isUpdationRequired) {
+        try {
+          // Update load metadate file after cleaning deleted nodes
+          if (carbonTableStatusLock.lockWithRetries()) {
+            LOGGER.info("Table status lock has been successfully acquired.")
+
+            // read latest table status again.
+            val latestMetadata = SegmentStatusManager
+              .readLoadMetadata(carbonTable.getMetaDataFilepath)
+
+            // update the metadata details from old to new status.
+            val latestStatus = CarbonLoaderUtil
+              .updateLoadMetadataFromOldToNew(details, latestMetadata)
+
+            CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
+          } else {
+            val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
+            val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
+            val errorMsg = "Clean files request is failed for " +
+                           s"$dbName.$tableName" +
+                           ". Not able to acquire the table status lock due to other operation " +
+                           "running in the background."
+            LOGGER.audit(errorMsg)
+            LOGGER.error(errorMsg)
+            throw new Exception(errorMsg + " Please try after some time.")
+          }
+        } finally {
+          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
new file mode 100644
index 0000000..636d731
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.rdd
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonSession, SQLContext}
+import org.apache.spark.sql.execution.command.CompactionModel
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+
+/**
+ * Used to perform compaction on Aggregate data map.
+ */
+class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
+    compactionModel: CompactionModel,
+    executor: ExecutorService,
+    sqlContext: SQLContext,
+    storeLocation: String)
+  extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
+
+  override def executeCompaction(): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val loadMetaDataDetails = identifySegmentsToBeMerged()
+    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
+    if (segments.nonEmpty) {
+      val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadMetaDataDetails).split("_")(1)
+      CarbonSession.threadSet(
+        CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+        carbonLoadModel.getDatabaseName + "." +
+        carbonLoadModel.getTableName,
+        segments.mkString(","))
+      CarbonSession.threadSet(
+        CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+        carbonLoadModel.getDatabaseName + "." +
+        carbonLoadModel.getTableName, "false")
+      val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
+        .map(_.getColumnName).mkString(",")
+      // Creating a new query string to insert data into pre-aggregate table from that same table.
+      // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
+      // select * from preaggtable1
+      // The following code will generate the select query with a load UDF that will be used to
+      // apply DataLoadingRules
+      val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser()
+        // adding the aggregation load UDF
+        .addPreAggLoadFunction(
+        // creating the select query on the bases on table schema
+        PreAggregateUtil.createChildSelectQuery(
+          carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad")
+      try {
+        CarbonLoadDataCommand(
+          Some(carbonTable.getDatabaseName),
+          carbonTable.getTableName,
+          null,
+          Nil,
+          Map("fileheader" -> headers),
+          isOverwriteTable = false,
+          dataFrame = Some(childDataFrame),
+          internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
+            "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession)
+        val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
+          carbonTable.getMetaDataFilepath)
+        val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
+          case load if loadMetaDataDetails.contains(load) =>
+            load.setMergedLoadName(mergedLoadName)
+            load.setSegmentStatus(SegmentStatus.COMPACTED)
+            load.setModificationOrdeletionTimesStamp(System.currentTimeMillis())
+            load
+          case other => other
+        }
+        val carbonTablePath = CarbonStorePath
+          .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+            .getAbsoluteTableIdentifier)
+        SegmentStatusManager
+          .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
+            updatedLoadMetaDataDetails)
+        carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
+      } finally {
+        // check if any other segments needs compaction on in case of MINOR_COMPACTION.
+        // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
+        // allows it.
+        if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
+          executeCompaction()
+        }
+        CarbonSession
+          .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                       carbonLoadModel.getDatabaseName + "." +
+                       carbonLoadModel.getTableName)
+        CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+                                  carbonLoadModel.getDatabaseName + "." +
+                                  carbonLoadModel.getTableName)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6393289..1d2934f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -55,7 +55,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
@@ -66,7 +66,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -161,18 +161,18 @@ object CarbonDataRDDFactory {
 
     val compactionThread = new Thread {
       override def run(): Unit = {
+        val compactor = CompactionFactory.getCompactor(
+          carbonLoadModel,
+          compactionModel,
+          executor,
+          sqlContext,
+          storeLocation)
         try {
           // compaction status of the table which is triggered by the user.
           var triggeredCompactionStatus = false
           var exception: Exception = null
           try {
-            DataManagementFunc.executeCompaction(
-              carbonLoadModel,
-              compactionModel,
-              executor,
-              sqlContext,
-              storeLocation
-            )
+            compactor.executeCompaction()
             triggeredCompactionStatus = true
           } catch {
             case e: Exception =>
@@ -211,10 +211,12 @@ object CarbonDataRDDFactory {
               )
               // proceed for compaction
               try {
-                DataManagementFunc.executeCompaction(newCarbonLoadModel,
+                CompactionFactory.getCompactor(
+                  newCarbonLoadModel,
                   newcompactionModel,
-                  executor, sqlContext, storeLocation
-                )
+                  executor,
+                  sqlContext,
+                  storeLocation).executeCompaction()
               } catch {
                 case e: Exception =>
                   LOGGER.error("Exception in compaction thread for table " +
@@ -248,7 +250,7 @@ object CarbonDataRDDFactory {
           }
         } finally {
           executor.shutdownNow()
-          DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
+          compactor.deletePartialLoadsInCompaction()
           compactionLock.unlock()
         }
       }
@@ -290,7 +292,7 @@ object CarbonDataRDDFactory {
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     // Check if any load need to be deleted before loading new data
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
+    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
@@ -492,6 +494,10 @@ object CarbonDataRDDFactory {
         throw new Exception("No Data to load")
       }
       writeDictionary(carbonLoadModel, result, writeAll = false)
+      val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession,
+        carbonTable.getCarbonTableIdentifier,
+        carbonLoadModel)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent)
       val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
       if (!done) {
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
new file mode 100644
index 0000000..3ebc957
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
+
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * This class is used to perform compaction on carbon table.
+ */
+class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
+    compactionModel: CompactionModel,
+    executor: ExecutorService,
+    sqlContext: SQLContext,
+    storeLocation: String)
+  extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
+
+  override def executeCompaction(): Unit = {
+    val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+      carbonLoadModel.getLoadMetadataDetails
+    )
+    CarbonDataMergerUtil.sortSegments(sortedSegments)
+
+    var segList = carbonLoadModel.getLoadMetadataDetails
+    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+      carbonLoadModel,
+      compactionModel.compactionSize,
+      segList,
+      compactionModel.compactionType
+    )
+    while (loadsToMerge.size() > 1 ||
+           (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
+            loadsToMerge.size() > 0)) {
+      val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
+      deletePartialLoadsInCompaction()
+
+      try {
+        scanSegmentsAndSubmitJob(loadsToMerge)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
+          throw e
+      }
+
+      // scan again and determine if anything is there to merge again.
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      segList = carbonLoadModel.getLoadMetadataDetails
+      // in case of major compaction we will scan only once and come out as it will keep
+      // on doing major for the new loads also.
+      // excluding the newly added segments.
+      if (CompactionType.MAJOR == compactionModel.compactionType) {
+
+        segList = CarbonDataMergerUtil
+          .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+      }
+
+      if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
+        loadsToMerge.clear()
+      } else if (segList.size > 0) {
+        loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+          carbonLoadModel,
+          compactionModel.compactionSize,
+          segList,
+          compactionModel.compactionType
+        )
+      }
+      else {
+        loadsToMerge.clear()
+      }
+    }
+  }
+
+  /**
+   * This will submit the loads to be merged into the executor.
+   */
+  def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails]): Unit = {
+    loadsToMerge.asScala.foreach { seg =>
+      LOGGER.info("loads identified for merge is " + seg.getLoadName)
+    }
+    val compactionCallableModel = CompactionCallableModel(
+      carbonLoadModel,
+      compactionModel.carbonTable,
+      loadsToMerge,
+      sqlContext,
+      compactionModel.compactionType)
+    triggerCompaction(compactionCallableModel)
+  }
+
+  private def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
+    val carbonTable = compactionCallableModel.carbonTable
+    val loadsToMerge = compactionCallableModel.loadsToMerge
+    val sc = compactionCallableModel.sqlContext
+    val carbonLoadModel = compactionCallableModel.carbonLoadModel
+    val compactionType = compactionCallableModel.compactionType
+    val tablePath = carbonLoadModel.getTablePath
+    val startTime = System.nanoTime()
+    val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+    var finalMergeStatus = false
+    val databaseName: String = carbonLoadModel.getDatabaseName
+    val factTableName = carbonLoadModel.getTableName
+    val validSegments: Array[String] = CarbonDataMergerUtil
+      .getValidSegments(loadsToMerge).split(',')
+    val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
+    val carbonMergerMapping = CarbonMergerMapping(tablePath,
+      carbonTable.getMetaDataFilepath,
+      mergedLoadName,
+      databaseName,
+      factTableName,
+      validSegments,
+      carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+      compactionType,
+      maxSegmentColCardinality = null,
+      maxSegmentColumnSchemaList = null
+    )
+    carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
+    carbonLoadModel.setLoadMetadataDetails(
+      SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+    // trigger event for compaction
+    val operationContext = new OperationContext
+    val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+      AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
+    OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+
+    var execInstance = "1"
+    // in case of non dynamic executor allocation, number of executors are fixed.
+    if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
+      execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
+      LOGGER.info(s"spark.executor.instances property is set to = $execInstance")
+    } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
+    else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
+      if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+        .equalsIgnoreCase("true")) {
+        execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+        LOGGER.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
+      }
+    }
+
+    val mergeStatus =
+      if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
+        new CarbonIUDMergerRDD(
+          sc.sparkContext,
+          new MergeResultImpl(),
+          carbonLoadModel,
+          carbonMergerMapping,
+          execInstance
+        ).collect
+      } else {
+        new CarbonMergerRDD(
+          sc.sparkContext,
+          new MergeResultImpl(),
+          carbonLoadModel,
+          carbonMergerMapping,
+          execInstance
+        ).collect
+      }
+
+    if (mergeStatus.length == 0) {
+      finalMergeStatus = false
+    } else {
+      finalMergeStatus = mergeStatus.forall(_._2)
+    }
+
+    if (finalMergeStatus) {
+      val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+      CommonUtil.mergeIndexFiles(
+        sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
+
+      // trigger event for compaction
+      val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+        AlterTableCompactionPostEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
+      OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+
+      val endTime = System.nanoTime()
+      LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
+      val statusFileUpdation =
+        ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
+         CarbonDataMergerUtil
+           .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
+             carbonTable.getMetaDataFilepath,
+             carbonLoadModel)) ||
+        CarbonDataMergerUtil
+          .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
+            mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType)
+
+      if (!statusFileUpdation) {
+        LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+                     s"${ carbonLoadModel.getTableName }")
+        throw new Exception(s"Compaction failed to update metadata for table" +
+                            s" ${ carbonLoadModel.getDatabaseName }." +
+                            s"${ carbonLoadModel.getTableName }")
+      } else {
+        LOGGER.audit(s"Compaction request completed for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        LOGGER.info(s"Compaction request completed for table " +
+                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      }
+    } else {
+      LOGGER.audit(s"Compaction request failed for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
+      )
+      LOGGER.error(s"Compaction request failed for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      throw new Exception("Compaction Failure in Merger Rdd.")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
new file mode 100644
index 0000000..6060f06
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util.concurrent.ExecutorService
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.CompactionModel
+
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CompactionFactory {
+
+  /**
+   *  Returns appropriate Compactable object.
+   */
+  def getCompactor(carbonLoadModel: CarbonLoadModel,
+      compactionModel: CompactionModel,
+      executor: ExecutorService,
+      sqlContext: SQLContext,
+      storeLocation: String): Compactor = {
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
+      new AggregateDataMapCompactor(
+        carbonLoadModel,
+        compactionModel,
+        executor,
+        sqlContext,
+        storeLocation)
+    } else {
+      new CarbonTableCompactor(
+        carbonLoadModel,
+        compactionModel,
+        executor,
+        sqlContext,
+        storeLocation)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
new file mode 100644
index 0000000..6fafc95
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util.concurrent.ExecutorService
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.CompactionModel
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+
+abstract class Compactor(carbonLoadModel: CarbonLoadModel,
+    compactionModel: CompactionModel,
+    executor: ExecutorService,
+    sqlContext: SQLContext,
+    storeLocation: String) {
+
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def executeCompaction(): Unit
+
+  def identifySegmentsToBeMerged(): java.util.List[LoadMetadataDetails] = {
+    CarbonDataMergerUtil
+      .identifySegmentsToBeMerged(carbonLoadModel,
+        compactionModel.compactionSize,
+        carbonLoadModel.getLoadMetadataDetails,
+        compactionModel.compactionType)
+  }
+
+  def deletePartialLoadsInCompaction(): Unit = {
+    // Deleting the any partially loaded data if present.
+    // in some case the segment folder which is present in store will not have entry in
+    // status.
+    // so deleting those folders.
+    try {
+      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
+                     s" ${ e.getMessage }")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 0cb6ca6..a9b5455 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -259,7 +259,7 @@ object CarbonSession {
   def initListeners(): Unit = {
     OperationListenerBus.getInstance()
       .addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener)
-      .addListener(classOf[LoadTablePostExecutionEvent], LoadPostAggregateListener)
+      .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener)
       .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener)
       .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener)
       .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener)
@@ -271,5 +271,7 @@ object CarbonSession {
       .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
       .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
       .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
+      .addListener(classOf[AlterTableCompactionPostEvent],
+        AlterPreAggregateTableCompactionPostListener)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index e761bea..f642785 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -45,6 +45,7 @@ import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -128,8 +129,9 @@ case class CarbonLoadDataCommand(
           CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf)
       }
       carbonLoadModel.setFactFilePath(factPath)
-      carbonLoadModel.setAggLoadRequest(internalOptions
-          .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
+      carbonLoadModel.setAggLoadRequest(
+        internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
+      carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
       DataLoadingUtil.buildCarbonLoadModel(
         table,
         carbonProperty,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 90b728d..9168247 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -19,14 +19,18 @@ package org.apache.spark.sql.execution.command.preaaggregate
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
+import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.CarbonSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 object LoadPostAggregateListener extends OperationEventListener {
   /**
@@ -35,7 +39,7 @@ object LoadPostAggregateListener extends OperationEventListener {
    * @param event
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
-    val loadEvent = event.asInstanceOf[LoadTablePostExecutionEvent]
+    val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
     val sparkSession = loadEvent.sparkSession
     val carbonLoadModel = loadEvent.carbonLoadModel
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -55,6 +59,36 @@ object LoadPostAggregateListener extends OperationEventListener {
   }
 }
 
+/**
+ * Listener to handle the operations that have to be done after compaction for a table has finished.
+ */
+object AlterPreAggregateTableCompactionPostListener extends OperationEventListener {
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   * @param operationContext
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
+    val carbonTable = compactionEvent.carbonTable
+    val compactionType = compactionEvent.carbonMergerMapping.campactionType
+    val sparkSession = compactionEvent.sQLContext.sparkSession
+    if (carbonTable.hasDataMapSchema) {
+      carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
+        val childRelationIdentifier = dataMapSchema.getRelationIdentifier
+        val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
+          childRelationIdentifier.getTableName,
+          None,
+          compactionType.toString,
+          Some(System.currentTimeMillis()),
+          "")
+        CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession)
+      }
+    }
+  }
+}
+
 object LoadPreAggregateTablePreListener extends OperationEventListener {
   /**
    * Called on a specified event occurrence