You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:02:09 UTC

[50/51] [abbrv] carbondata git commit: [CARBONDATA-1316] Support drop partition function

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 0c59bd9..3646fad 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -260,8 +260,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     checkAnswer(result_after, result_origin)
 
     val result_after1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area < 'OutSpace' ")
-    val rssult_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
-    checkAnswer(result_after1, rssult_origin1)
+    val result_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
+    checkAnswer(result_after1, result_origin1)
 
     val result_after2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area <= 'OutSpace' ")
     val result_origin2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <= 'OutSpace' ")
@@ -279,28 +279,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val result_origin5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area >= 'OutSpace' ")
     checkAnswer(result_after5, result_origin5)
 
-    sql("""ALTER TABLE list_table_area ADD PARTITION ('One', '(Two, Three)', 'Four')""".stripMargin)
-    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
-    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
-    val partitionIds1 = partitionInfo1.getPartitionIds
-    val new_list_info = partitionInfo1.getListInfo
-    assert(partitionIds1 == List(0, 1, 2, 3, 4, 5, 6, 7, 8).map(Integer.valueOf(_)).asJava)
-    assert(partitionInfo1.getMAX_PARTITION == 8)
-    assert(partitionInfo1.getNumPartitions == 9)
-    assert(new_list_info.get(0).get(0) == "Asia")
-    assert(new_list_info.get(1).get(0) == "America")
-    assert(new_list_info.get(2).get(0) == "Europe")
-    assert(new_list_info.get(3).get(0) == "OutSpace")
-    assert(new_list_info.get(4).get(0) == "Hi")
-    assert(new_list_info.get(5).get(0) == "One")
-    assert(new_list_info.get(6).get(0) == "Two")
-    assert(new_list_info.get(6).get(1) == "Three")
-    assert(new_list_info.get(7).get(0) == "Four")
-    validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4))
-
-    val result_after6 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area")
-    val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin""")
-    checkAnswer(result_after6, result_origin6)
+    intercept[Exception]  { sql("""ALTER TABLE DROP PARTITION(0)""")}
+    intercept[Exception]  { sql("""ALTER TABLE DROP PARTITION(0) WITH DATA""")}
+    
+    sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
+    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds2 = partitionInfo2.getPartitionIds
+    val list_info2 = partitionInfo2.getListInfo
+    assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo2.getMAX_PARTITION == 5)
+    assert(partitionInfo2.getNumPartitions == 5)
+    assert(list_info2.get(0).get(0) == "Asia")
+    assert(list_info2.get(1).get(0) == "Europe")
+    assert(list_info2.get(2).get(0) == "OutSpace")
+    assert(list_info2.get(3).get(0) == "Hi")
+    validateDataFiles("default_list_table_area", "0", Seq(0, 1, 4))
+    checkAnswer(sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area"),
+      sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <> 'America' "))
   }
 
   test("Alter table add partition: Range Partition") {
@@ -309,9 +305,9 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val range_info = partitionInfo.getRangeInfo
-    assert(partitionIds.size() == 6)
     assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
     assert(partitionInfo.getMAX_PARTITION == 5)
+    assert(partitionInfo.getNumPartitions == 6)
     assert(range_info.get(0) == "2014/01/01")
     assert(range_info.get(1) == "2015/01/01")
     assert(range_info.get(2) == "2016/01/01")
@@ -341,6 +337,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
     val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
     checkAnswer(result_after5, result_origin5)
+
+    sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds1 = partitionInfo1.getPartitionIds
+    val range_info1 = partitionInfo1.getRangeInfo
+    assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo1.getMAX_PARTITION == 5)
+    assert(partitionInfo1.getNumPartitions == 5)
+    assert(range_info1.get(0) == "2014/01/01")
+    assert(range_info1.get(1) == "2015/01/01")
+    assert(range_info1.get(2) == "2017/01/01")
+    assert(range_info1.get(3) == "2018/01/01")
+    assert(range_info1.size() == 4)
+    validateDataFiles("default_range_table_logdate", "0", Seq(1, 2, 4, 5))
+    val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate""")
+    val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate < '2015/01/01 00:00:00' or logdate >= '2016/01/01 00:00:00' """)
+    checkAnswer(result_after6, result_origin6)
   }
 
   test("test exception if invalid partition id is provided in alter command") {
@@ -396,6 +410,26 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country > 'NotGood' """)
     val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country > 'NotGood' """)
     checkAnswer(result_after5, result_origin5)
+
+    sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds1 = partitionInfo1.getPartitionIds
+    val list_info1 = partitionInfo1.getListInfo
+    assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo1.getMAX_PARTITION == 8)
+    assert(partitionInfo1.getNumPartitions == 7)
+    assert(list_info1.get(0).get(0) == "China")
+    assert(list_info1.get(0).get(1) == "US")
+    assert(list_info1.get(1).get(0) == "UK")
+    assert(list_info1.get(2).get(0) == "Japan")
+    assert(list_info1.get(3).get(0) == "Canada")
+    assert(list_info1.get(4).get(0) == "Russia")
+    assert(list_info1.get(5).get(0) == "Korea")
+    validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3))
+    val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
+    val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
+    checkAnswer(result_origin6, result_after6)
   }
 
   test("Alter table split partition with different List Sequence: List Partition") {
@@ -405,23 +439,21 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
     val partitionIds = partitionInfo.getPartitionIds
     val list_info = partitionInfo.getListInfo
-    assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
+    assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
     assert(partitionInfo.getMAX_PARTITION == 12)
-    assert(partitionInfo.getNumPartitions == 11)
+    assert(partitionInfo.getNumPartitions == 10)
     assert(list_info.get(0).get(0) == "China")
     assert(list_info.get(0).get(1) == "US")
     assert(list_info.get(1).get(0) == "UK")
     assert(list_info.get(2).get(0) == "Japan")
     assert(list_info.get(3).get(0) == "Canada")
     assert(list_info.get(4).get(0) == "Russia")
-    assert(list_info.get(5).get(0) == "Good")
-    assert(list_info.get(5).get(1) == "NotGood")
-    assert(list_info.get(6).get(0) == "Korea")
-    assert(list_info.get(7).get(0) == "Part4")
-    assert(list_info.get(8).get(0) == "Part2")
-    assert(list_info.get(9).get(0) == "Part1")
-    assert(list_info.get(9).get(1) == "Part3")
-    validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3, 8))
+    assert(list_info.get(5).get(0) == "Korea")
+    assert(list_info.get(6).get(0) == "Part4")
+    assert(list_info.get(7).get(0) == "Part2")
+    assert(list_info.get(8).get(0) == "Part1")
+    assert(list_info.get(8).get(1) == "Part3")
+    validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3))
     val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
     val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
     checkAnswer(result_after, result_origin)
@@ -528,6 +560,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
     val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
     checkAnswer(result_after5, result_origin5)
+
+    sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds1 = partitionInfo1.getPartitionIds
+    val rangeInfo1 = partitionInfo1.getRangeInfo
+    assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo1.getMAX_PARTITION == 6)
+    assert(partitionInfo1.getNumPartitions == 5)
+    assert(rangeInfo1.get(0) == "2014/01/01")
+    assert(rangeInfo1.get(1) == "2015/01/01")
+    assert(rangeInfo1.get(2) == "2016/01/01")
+    assert(rangeInfo1.get(3) == "2017/01/01")
+    assert(rangeInfo1.size() == 4)
+    validateDataFiles("default_range_table_logdate_split", "0", Seq(0, 1, 2, 3, 5))
+    val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split""")
+    val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin""")
+    checkAnswer(result_after6, result_origin6)
   }
 
   test("Alter table split partition: Range Partition + Bucket") {
@@ -568,6 +618,57 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
     val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
     val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
     checkAnswer(result_after5, result_origin5)
+
+    sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
+    val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds1 = partitionInfo1.getPartitionIds
+    val rangeInfo1 = partitionInfo1.getRangeInfo
+    assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo1.getMAX_PARTITION == 6)
+    assert(partitionInfo1.getNumPartitions == 5)
+    assert(rangeInfo1.get(0) == "2014/01/01")
+    assert(rangeInfo1.get(1) == "2015/01/01")
+    assert(rangeInfo1.get(2) == "2016/01/01")
+    assert(rangeInfo1.get(3) == "2017/01/01")
+    assert(rangeInfo1.size() == 4)
+    validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 3, 5))
+    val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+    val result_origin6= sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+    checkAnswer(result_after6, result_origin6)
+
+    sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
+    val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds2 = partitionInfo2.getPartitionIds
+    val rangeInfo2 = partitionInfo2.getRangeInfo
+    assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo2.getMAX_PARTITION == 6)
+    assert(partitionInfo2.getNumPartitions == 4)
+    assert(rangeInfo2.get(0) == "2014/01/01")
+    assert(rangeInfo2.get(1) == "2015/01/01")
+    assert(rangeInfo2.get(2) == "2017/01/01")
+    assert(rangeInfo2.size() == 3)
+    validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 5))
+    val result_origin7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+    val result_after7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+    checkAnswer(result_origin7, result_after7)
+
+    sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
+    val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+    val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionIds3 = partitionInfo3.getPartitionIds
+    val rangeInfo3 = partitionInfo3.getRangeInfo
+    assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
+    assert(partitionInfo3.getMAX_PARTITION == 6)
+    assert(partitionInfo3.getNumPartitions == 3)
+    assert(rangeInfo3.get(0) == "2014/01/01")
+    assert(rangeInfo3.get(1) == "2015/01/01")
+    assert(rangeInfo3.size() == 2)
+    validateDataFiles("default_range_table_bucket", "0", Seq(0, 1, 2))
+    val result_after8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+    val result_origin8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+    checkAnswer(result_after8, result_origin8)
   }
 
    test("test exception when alter partition and the values"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 43d456f..838e5be 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -393,15 +393,14 @@ public final class CarbonDataMergerUtil {
   /**
    * To identify which all segments can be merged.
    *
-   * @param storeLocation
    * @param carbonLoadModel
    * @param compactionSize
    * @return
    */
-  public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
+  public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
       CarbonLoadModel carbonLoadModel, long compactionSize,
       List<LoadMetadataDetails> segments, CompactionType compactionType) {
-
+    String storeLocation = carbonLoadModel.getStorePath();
     List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
 
     sortSegments(sortedSegments);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
new file mode 100644
index 0000000..9316c9f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.spliter;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.spliter.exception.AlterPartitionSliceException;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class RowResultProcessor {
+
+  private CarbonFactHandler dataHandler;
+  private SegmentProperties segmentProperties;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowResultProcessor.class.getName());
+
+
+  public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
+      SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
+    CarbonDataProcessorUtil.createLocations(tempStoreLocation);
+    this.segmentProperties = segProp;
+    String tableName = carbonTable.getFactTableName();
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+        CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
+            segProp, tableName, tempStoreLocation);
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+            loadModel.getFactTimeStamp());
+    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+    carbonFactDataHandlerModel.setBucketId(bucketId);
+    //Note: set compaction flow just to convert decimal type
+    carbonFactDataHandlerModel.setCompactionFlow(true);
+    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+  }
+
+  public boolean execute(List<Object[]> resultList) {
+    boolean processStatus;
+    boolean isDataPresent = false;
+
+    try {
+      if (!isDataPresent) {
+        dataHandler.initialise();
+        isDataPresent = true;
+      }
+      for (Object[] row: resultList) {
+        addRow(row);
+      }
+      if (isDataPresent)
+      {
+        this.dataHandler.finish();
+      }
+      processStatus = true;
+    } catch (AlterPartitionSliceException e) {
+      LOGGER.error(e, e.getMessage());
+      LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
+      processStatus = false;
+    } finally {
+      try {
+        if (isDataPresent) {
+          this.dataHandler.closeHandler();
+        }
+      } catch (Exception e) {
+        LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
+        processStatus = false;
+      }
+    }
+    return processStatus;
+  }
+
+  private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+    try {
+      this.dataHandler.addDataToStore(row);
+    } catch (CarbonDataWriterException e) {
+      throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
deleted file mode 100644
index ea38a53..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.spliter;
-
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.spliter.exception.SliceSpliterException;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class RowResultSpliterProcessor {
-
-  private CarbonFactHandler dataHandler;
-  private SegmentProperties segmentProperties;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RowResultSpliterProcessor.class.getName());
-
-
-  public RowResultSpliterProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
-      SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
-    CarbonDataProcessorUtil.createLocations(tempStoreLocation);
-    this.segmentProperties = segProp;
-    String tableName = carbonTable.getFactTableName();
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
-        CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
-            segProp, tableName, tempStoreLocation);
-    CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
-            loadModel.getFactTimeStamp());
-    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
-    carbonFactDataHandlerModel.setBucketId(bucketId);
-    //Note: set compaction flow just to convert decimal type
-    carbonFactDataHandlerModel.setCompactionFlow(true);
-    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-  }
-
-  public boolean execute(List<Object[]> resultList) {
-    boolean splitStatus;
-    boolean isDataPresent = false;
-
-    try {
-      if (!isDataPresent) {
-        dataHandler.initialise();
-        isDataPresent = true;
-      }
-      for (Object[] row: resultList) {
-        addRow(row);
-      }
-      if (isDataPresent)
-      {
-        this.dataHandler.finish();
-      }
-      splitStatus = true;
-    } catch (SliceSpliterException e) {
-      LOGGER.error(e, e.getMessage());
-      LOGGER.error("Exception in split partition" + e.getMessage());
-      splitStatus = false;
-    } finally {
-      try {
-        if (isDataPresent) {
-          this.dataHandler.closeHandler();
-        }
-      } catch (Exception e) {
-        LOGGER.error("Exception while closing the handler in partition spliter" + e.getMessage());
-        splitStatus = false;
-      }
-    }
-    return splitStatus;
-  }
-
-  private void addRow(Object[] carbonTuple) throws SliceSpliterException {
-    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
-    try {
-      this.dataHandler.addDataToStore(row);
-    } catch (CarbonDataWriterException e) {
-      throw new SliceSpliterException("Problem in writing rows when add/split the partition", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
new file mode 100644
index 0000000..0e53a1f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.spliter.exception;
+
+import java.util.Locale;
+
+public class AlterPartitionSliceException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public AlterPartitionSliceException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public AlterPartitionSliceException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
deleted file mode 100644
index 17e679a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter.exception;
-
-import java.util.Locale;
-
-public class SliceSpliterException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public SliceSpliterException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public SliceSpliterException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}