You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/09/18 10:02:09 UTC
[50/51] [abbrv] carbondata git commit: [CARBONDATA-1316] Support drop
partition function
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 0c59bd9..3646fad 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -260,8 +260,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
checkAnswer(result_after, result_origin)
val result_after1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area < 'OutSpace' ")
- val rssult_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
- checkAnswer(result_after1, rssult_origin1)
+ val result_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
+ checkAnswer(result_after1, result_origin1)
val result_after2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area <= 'OutSpace' ")
val result_origin2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <= 'OutSpace' ")
@@ -279,28 +279,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_origin5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area >= 'OutSpace' ")
checkAnswer(result_after5, result_origin5)
- sql("""ALTER TABLE list_table_area ADD PARTITION ('One', '(Two, Three)', 'Four')""".stripMargin)
- val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
- val partitionIds1 = partitionInfo1.getPartitionIds
- val new_list_info = partitionInfo1.getListInfo
- assert(partitionIds1 == List(0, 1, 2, 3, 4, 5, 6, 7, 8).map(Integer.valueOf(_)).asJava)
- assert(partitionInfo1.getMAX_PARTITION == 8)
- assert(partitionInfo1.getNumPartitions == 9)
- assert(new_list_info.get(0).get(0) == "Asia")
- assert(new_list_info.get(1).get(0) == "America")
- assert(new_list_info.get(2).get(0) == "Europe")
- assert(new_list_info.get(3).get(0) == "OutSpace")
- assert(new_list_info.get(4).get(0) == "Hi")
- assert(new_list_info.get(5).get(0) == "One")
- assert(new_list_info.get(6).get(0) == "Two")
- assert(new_list_info.get(6).get(1) == "Three")
- assert(new_list_info.get(7).get(0) == "Four")
- validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4))
-
- val result_after6 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area")
- val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin""")
- checkAnswer(result_after6, result_origin6)
+ intercept[Exception] { sql("""ALTER TABLE DROP PARTITION(0)""")}
+ intercept[Exception] { sql("""ALTER TABLE DROP PARTITION(0) WITH DATA""")}
+
+ sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
+ val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds2 = partitionInfo2.getPartitionIds
+ val list_info2 = partitionInfo2.getListInfo
+ assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo2.getMAX_PARTITION == 5)
+ assert(partitionInfo2.getNumPartitions == 5)
+ assert(list_info2.get(0).get(0) == "Asia")
+ assert(list_info2.get(1).get(0) == "Europe")
+ assert(list_info2.get(2).get(0) == "OutSpace")
+ assert(list_info2.get(3).get(0) == "Hi")
+ validateDataFiles("default_list_table_area", "0", Seq(0, 1, 4))
+ checkAnswer(sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area"),
+ sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <> 'America' "))
}
test("Alter table add partition: Range Partition") {
@@ -309,9 +305,9 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
val partitionIds = partitionInfo.getPartitionIds
val range_info = partitionInfo.getRangeInfo
- assert(partitionIds.size() == 6)
assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
assert(partitionInfo.getMAX_PARTITION == 5)
+ assert(partitionInfo.getNumPartitions == 6)
assert(range_info.get(0) == "2014/01/01")
assert(range_info.get(1) == "2015/01/01")
assert(range_info.get(2) == "2016/01/01")
@@ -341,6 +337,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val range_info1 = partitionInfo1.getRangeInfo
+ assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 5)
+ assert(partitionInfo1.getNumPartitions == 5)
+ assert(range_info1.get(0) == "2014/01/01")
+ assert(range_info1.get(1) == "2015/01/01")
+ assert(range_info1.get(2) == "2017/01/01")
+ assert(range_info1.get(3) == "2018/01/01")
+ assert(range_info1.size() == 4)
+ validateDataFiles("default_range_table_logdate", "0", Seq(1, 2, 4, 5))
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate""")
+ val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate < '2015/01/01 00:00:00' or logdate >= '2016/01/01 00:00:00' """)
+ checkAnswer(result_after6, result_origin6)
}
test("test exception if invalid partition id is provided in alter command") {
@@ -396,6 +410,26 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country > 'NotGood' """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country > 'NotGood' """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val list_info1 = partitionInfo1.getListInfo
+ assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 8)
+ assert(partitionInfo1.getNumPartitions == 7)
+ assert(list_info1.get(0).get(0) == "China")
+ assert(list_info1.get(0).get(1) == "US")
+ assert(list_info1.get(1).get(0) == "UK")
+ assert(list_info1.get(2).get(0) == "Japan")
+ assert(list_info1.get(3).get(0) == "Canada")
+ assert(list_info1.get(4).get(0) == "Russia")
+ assert(list_info1.get(5).get(0) == "Korea")
+ validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3))
+ val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
+ checkAnswer(result_origin6, result_after6)
}
test("Alter table split partition with different List Sequence: List Partition") {
@@ -405,23 +439,21 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
- assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
+ assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
assert(partitionInfo.getMAX_PARTITION == 12)
- assert(partitionInfo.getNumPartitions == 11)
+ assert(partitionInfo.getNumPartitions == 10)
assert(list_info.get(0).get(0) == "China")
assert(list_info.get(0).get(1) == "US")
assert(list_info.get(1).get(0) == "UK")
assert(list_info.get(2).get(0) == "Japan")
assert(list_info.get(3).get(0) == "Canada")
assert(list_info.get(4).get(0) == "Russia")
- assert(list_info.get(5).get(0) == "Good")
- assert(list_info.get(5).get(1) == "NotGood")
- assert(list_info.get(6).get(0) == "Korea")
- assert(list_info.get(7).get(0) == "Part4")
- assert(list_info.get(8).get(0) == "Part2")
- assert(list_info.get(9).get(0) == "Part1")
- assert(list_info.get(9).get(1) == "Part3")
- validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3, 8))
+ assert(list_info.get(5).get(0) == "Korea")
+ assert(list_info.get(6).get(0) == "Part4")
+ assert(list_info.get(7).get(0) == "Part2")
+ assert(list_info.get(8).get(0) == "Part1")
+ assert(list_info.get(8).get(1) == "Part3")
+ validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3))
val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
checkAnswer(result_after, result_origin)
@@ -528,6 +560,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val rangeInfo1 = partitionInfo1.getRangeInfo
+ assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 6)
+ assert(partitionInfo1.getNumPartitions == 5)
+ assert(rangeInfo1.get(0) == "2014/01/01")
+ assert(rangeInfo1.get(1) == "2015/01/01")
+ assert(rangeInfo1.get(2) == "2016/01/01")
+ assert(rangeInfo1.get(3) == "2017/01/01")
+ assert(rangeInfo1.size() == 4)
+ validateDataFiles("default_range_table_logdate_split", "0", Seq(0, 1, 2, 3, 5))
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split""")
+ val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin""")
+ checkAnswer(result_after6, result_origin6)
}
test("Alter table split partition: Range Partition + Bucket") {
@@ -568,6 +618,57 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val rangeInfo1 = partitionInfo1.getRangeInfo
+ assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 6)
+ assert(partitionInfo1.getNumPartitions == 5)
+ assert(rangeInfo1.get(0) == "2014/01/01")
+ assert(rangeInfo1.get(1) == "2015/01/01")
+ assert(rangeInfo1.get(2) == "2016/01/01")
+ assert(rangeInfo1.get(3) == "2017/01/01")
+ assert(rangeInfo1.size() == 4)
+ validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 3, 5))
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+ val result_origin6= sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+ checkAnswer(result_after6, result_origin6)
+
+ sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
+ val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds2 = partitionInfo2.getPartitionIds
+ val rangeInfo2 = partitionInfo2.getRangeInfo
+ assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo2.getMAX_PARTITION == 6)
+ assert(partitionInfo2.getNumPartitions == 4)
+ assert(rangeInfo2.get(0) == "2014/01/01")
+ assert(rangeInfo2.get(1) == "2015/01/01")
+ assert(rangeInfo2.get(2) == "2017/01/01")
+ assert(rangeInfo2.size() == 3)
+ validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 5))
+ val result_origin7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+ val result_after7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+ checkAnswer(result_origin7, result_after7)
+
+ sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
+ val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+ val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds3 = partitionInfo3.getPartitionIds
+ val rangeInfo3 = partitionInfo3.getRangeInfo
+ assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo3.getMAX_PARTITION == 6)
+ assert(partitionInfo3.getNumPartitions == 3)
+ assert(rangeInfo3.get(0) == "2014/01/01")
+ assert(rangeInfo3.get(1) == "2015/01/01")
+ assert(rangeInfo3.size() == 2)
+ validateDataFiles("default_range_table_bucket", "0", Seq(0, 1, 2))
+ val result_after8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+ val result_origin8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+ checkAnswer(result_after8, result_origin8)
}
test("test exception when alter partition and the values"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 43d456f..838e5be 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -393,15 +393,14 @@ public final class CarbonDataMergerUtil {
/**
* To identify which all segments can be merged.
*
- * @param storeLocation
* @param carbonLoadModel
* @param compactionSize
* @return
*/
- public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
+ public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
CarbonLoadModel carbonLoadModel, long compactionSize,
List<LoadMetadataDetails> segments, CompactionType compactionType) {
-
+ String storeLocation = carbonLoadModel.getStorePath();
List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
sortSegments(sortedSegments);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
new file mode 100644
index 0000000..9316c9f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.spliter;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.spliter.exception.AlterPartitionSliceException;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class RowResultProcessor {
+
+ private CarbonFactHandler dataHandler;
+ private SegmentProperties segmentProperties;
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RowResultProcessor.class.getName());
+
+
+ public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
+ SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
+ CarbonDataProcessorUtil.createLocations(tempStoreLocation);
+ this.segmentProperties = segProp;
+ String tableName = carbonTable.getFactTableName();
+ CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+ CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
+ segProp, tableName, tempStoreLocation);
+ CarbonDataFileAttributes carbonDataFileAttributes =
+ new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+ loadModel.getFactTimeStamp());
+ carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+ carbonFactDataHandlerModel.setBucketId(bucketId);
+ //Note: set compaction flow just to convert decimal type
+ carbonFactDataHandlerModel.setCompactionFlow(true);
+ dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+ }
+
+ public boolean execute(List<Object[]> resultList) {
+ boolean processStatus;
+ boolean isDataPresent = false;
+
+ try {
+ if (!isDataPresent) {
+ dataHandler.initialise();
+ isDataPresent = true;
+ }
+ for (Object[] row: resultList) {
+ addRow(row);
+ }
+ if (isDataPresent)
+ {
+ this.dataHandler.finish();
+ }
+ processStatus = true;
+ } catch (AlterPartitionSliceException e) {
+ LOGGER.error(e, e.getMessage());
+ LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
+ processStatus = false;
+ } finally {
+ try {
+ if (isDataPresent) {
+ this.dataHandler.closeHandler();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
+ processStatus = false;
+ }
+ }
+ return processStatus;
+ }
+
+ private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
+ CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+ try {
+ this.dataHandler.addDataToStore(row);
+ } catch (CarbonDataWriterException e) {
+ throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
deleted file mode 100644
index ea38a53..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.spliter;
-
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.spliter.exception.SliceSpliterException;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class RowResultSpliterProcessor {
-
- private CarbonFactHandler dataHandler;
- private SegmentProperties segmentProperties;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RowResultSpliterProcessor.class.getName());
-
-
- public RowResultSpliterProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
- SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
- CarbonDataProcessorUtil.createLocations(tempStoreLocation);
- this.segmentProperties = segProp;
- String tableName = carbonTable.getFactTableName();
- CarbonFactDataHandlerModel carbonFactDataHandlerModel =
- CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
- segProp, tableName, tempStoreLocation);
- CarbonDataFileAttributes carbonDataFileAttributes =
- new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
- loadModel.getFactTimeStamp());
- carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
- carbonFactDataHandlerModel.setBucketId(bucketId);
- //Note: set compaction flow just to convert decimal type
- carbonFactDataHandlerModel.setCompactionFlow(true);
- dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
- }
-
- public boolean execute(List<Object[]> resultList) {
- boolean splitStatus;
- boolean isDataPresent = false;
-
- try {
- if (!isDataPresent) {
- dataHandler.initialise();
- isDataPresent = true;
- }
- for (Object[] row: resultList) {
- addRow(row);
- }
- if (isDataPresent)
- {
- this.dataHandler.finish();
- }
- splitStatus = true;
- } catch (SliceSpliterException e) {
- LOGGER.error(e, e.getMessage());
- LOGGER.error("Exception in split partition" + e.getMessage());
- splitStatus = false;
- } finally {
- try {
- if (isDataPresent) {
- this.dataHandler.closeHandler();
- }
- } catch (Exception e) {
- LOGGER.error("Exception while closing the handler in partition spliter" + e.getMessage());
- splitStatus = false;
- }
- }
- return splitStatus;
- }
-
- private void addRow(Object[] carbonTuple) throws SliceSpliterException {
- CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
- try {
- this.dataHandler.addDataToStore(row);
- } catch (CarbonDataWriterException e) {
- throw new SliceSpliterException("Problem in writing rows when add/split the partition", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
new file mode 100644
index 0000000..0e53a1f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.spliter.exception;
+
+import java.util.Locale;
+
+public class AlterPartitionSliceException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public AlterPartitionSliceException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public AlterPartitionSliceException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
deleted file mode 100644
index 17e679a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter.exception;
-
-import java.util.Locale;
-
-public class SliceSpliterException extends Exception {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceSpliterException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceSpliterException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}