You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/09/18 09:20:04 UTC
[1/2] carbondata git commit: [CARBONDATA-1316] Support drop partition
function
Repository: carbondata
Updated Branches:
refs/heads/master fe36e3bc9 -> cb51b8621
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 0c59bd9..3646fad 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -260,8 +260,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
checkAnswer(result_after, result_origin)
val result_after1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area < 'OutSpace' ")
- val rssult_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
- checkAnswer(result_after1, rssult_origin1)
+ val result_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ")
+ checkAnswer(result_after1, result_origin1)
val result_after2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area <= 'OutSpace' ")
val result_origin2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <= 'OutSpace' ")
@@ -279,28 +279,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_origin5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area >= 'OutSpace' ")
checkAnswer(result_after5, result_origin5)
- sql("""ALTER TABLE list_table_area ADD PARTITION ('One', '(Two, Three)', 'Four')""".stripMargin)
- val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
- val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
- val partitionIds1 = partitionInfo1.getPartitionIds
- val new_list_info = partitionInfo1.getListInfo
- assert(partitionIds1 == List(0, 1, 2, 3, 4, 5, 6, 7, 8).map(Integer.valueOf(_)).asJava)
- assert(partitionInfo1.getMAX_PARTITION == 8)
- assert(partitionInfo1.getNumPartitions == 9)
- assert(new_list_info.get(0).get(0) == "Asia")
- assert(new_list_info.get(1).get(0) == "America")
- assert(new_list_info.get(2).get(0) == "Europe")
- assert(new_list_info.get(3).get(0) == "OutSpace")
- assert(new_list_info.get(4).get(0) == "Hi")
- assert(new_list_info.get(5).get(0) == "One")
- assert(new_list_info.get(6).get(0) == "Two")
- assert(new_list_info.get(6).get(1) == "Three")
- assert(new_list_info.get(7).get(0) == "Four")
- validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4))
-
- val result_after6 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area")
- val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin""")
- checkAnswer(result_after6, result_origin6)
+ intercept[Exception] { sql("""ALTER TABLE DROP PARTITION(0)""")}
+ intercept[Exception] { sql("""ALTER TABLE DROP PARTITION(0) WITH DATA""")}
+
+ sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""")
+ val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area")
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds2 = partitionInfo2.getPartitionIds
+ val list_info2 = partitionInfo2.getListInfo
+ assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo2.getMAX_PARTITION == 5)
+ assert(partitionInfo2.getNumPartitions == 5)
+ assert(list_info2.get(0).get(0) == "Asia")
+ assert(list_info2.get(1).get(0) == "Europe")
+ assert(list_info2.get(2).get(0) == "OutSpace")
+ assert(list_info2.get(3).get(0) == "Hi")
+ validateDataFiles("default_list_table_area", "0", Seq(0, 1, 4))
+ checkAnswer(sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area"),
+ sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <> 'America' "))
}
test("Alter table add partition: Range Partition") {
@@ -309,9 +305,9 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
val partitionIds = partitionInfo.getPartitionIds
val range_info = partitionInfo.getRangeInfo
- assert(partitionIds.size() == 6)
assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava)
assert(partitionInfo.getMAX_PARTITION == 5)
+ assert(partitionInfo.getNumPartitions == 6)
assert(range_info.get(0) == "2014/01/01")
assert(range_info.get(1) == "2015/01/01")
assert(range_info.get(2) == "2016/01/01")
@@ -341,6 +337,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val range_info1 = partitionInfo1.getRangeInfo
+ assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 5)
+ assert(partitionInfo1.getNumPartitions == 5)
+ assert(range_info1.get(0) == "2014/01/01")
+ assert(range_info1.get(1) == "2015/01/01")
+ assert(range_info1.get(2) == "2017/01/01")
+ assert(range_info1.get(3) == "2018/01/01")
+ assert(range_info1.size() == 4)
+ validateDataFiles("default_range_table_logdate", "0", Seq(1, 2, 4, 5))
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate""")
+ val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate < '2015/01/01 00:00:00' or logdate >= '2016/01/01 00:00:00' """)
+ checkAnswer(result_after6, result_origin6)
}
test("test exception if invalid partition id is provided in alter command") {
@@ -396,6 +410,26 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country > 'NotGood' """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country > 'NotGood' """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE list_table_country DROP PARTITION(8)""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val list_info1 = partitionInfo1.getListInfo
+ assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 8)
+ assert(partitionInfo1.getNumPartitions == 7)
+ assert(list_info1.get(0).get(0) == "China")
+ assert(list_info1.get(0).get(1) == "US")
+ assert(list_info1.get(1).get(0) == "UK")
+ assert(list_info1.get(2).get(0) == "Japan")
+ assert(list_info1.get(3).get(0) == "Canada")
+ assert(list_info1.get(4).get(0) == "Russia")
+ assert(list_info1.get(5).get(0) == "Korea")
+ validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3))
+ val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
+ checkAnswer(result_origin6, result_after6)
}
test("Alter table split partition with different List Sequence: List Partition") {
@@ -405,23 +439,21 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
val partitionIds = partitionInfo.getPartitionIds
val list_info = partitionInfo.getListInfo
- assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
+ assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava)
assert(partitionInfo.getMAX_PARTITION == 12)
- assert(partitionInfo.getNumPartitions == 11)
+ assert(partitionInfo.getNumPartitions == 10)
assert(list_info.get(0).get(0) == "China")
assert(list_info.get(0).get(1) == "US")
assert(list_info.get(1).get(0) == "UK")
assert(list_info.get(2).get(0) == "Japan")
assert(list_info.get(3).get(0) == "Canada")
assert(list_info.get(4).get(0) == "Russia")
- assert(list_info.get(5).get(0) == "Good")
- assert(list_info.get(5).get(1) == "NotGood")
- assert(list_info.get(6).get(0) == "Korea")
- assert(list_info.get(7).get(0) == "Part4")
- assert(list_info.get(8).get(0) == "Part2")
- assert(list_info.get(9).get(0) == "Part1")
- assert(list_info.get(9).get(1) == "Part3")
- validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3, 8))
+ assert(list_info.get(5).get(0) == "Korea")
+ assert(list_info.get(6).get(0) == "Part4")
+ assert(list_info.get(7).get(0) == "Part2")
+ assert(list_info.get(8).get(0) == "Part1")
+ assert(list_info.get(8).get(1) == "Part3")
+ validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3))
val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""")
val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""")
checkAnswer(result_after, result_origin)
@@ -528,6 +560,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val rangeInfo1 = partitionInfo1.getRangeInfo
+ assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 6)
+ assert(partitionInfo1.getNumPartitions == 5)
+ assert(rangeInfo1.get(0) == "2014/01/01")
+ assert(rangeInfo1.get(1) == "2015/01/01")
+ assert(rangeInfo1.get(2) == "2016/01/01")
+ assert(rangeInfo1.get(3) == "2017/01/01")
+ assert(rangeInfo1.size() == 4)
+ validateDataFiles("default_range_table_logdate_split", "0", Seq(0, 1, 2, 3, 5))
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split""")
+ val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin""")
+ checkAnswer(result_after6, result_origin6)
}
test("Alter table split partition: Range Partition + Bucket") {
@@ -568,6 +618,57 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """)
checkAnswer(result_after5, result_origin5)
+
+ sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""")
+ val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+ val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds1 = partitionInfo1.getPartitionIds
+ val rangeInfo1 = partitionInfo1.getRangeInfo
+ assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo1.getMAX_PARTITION == 6)
+ assert(partitionInfo1.getNumPartitions == 5)
+ assert(rangeInfo1.get(0) == "2014/01/01")
+ assert(rangeInfo1.get(1) == "2015/01/01")
+ assert(rangeInfo1.get(2) == "2016/01/01")
+ assert(rangeInfo1.get(3) == "2017/01/01")
+ assert(rangeInfo1.size() == 4)
+ validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 3, 5))
+ val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+ val result_origin6= sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+ checkAnswer(result_after6, result_origin6)
+
+ sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""")
+ val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+ val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds2 = partitionInfo2.getPartitionIds
+ val rangeInfo2 = partitionInfo2.getRangeInfo
+ assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo2.getMAX_PARTITION == 6)
+ assert(partitionInfo2.getNumPartitions == 4)
+ assert(rangeInfo2.get(0) == "2014/01/01")
+ assert(rangeInfo2.get(1) == "2015/01/01")
+ assert(rangeInfo2.get(2) == "2017/01/01")
+ assert(rangeInfo2.size() == 3)
+ validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 5))
+ val result_origin7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+ val result_after7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+ checkAnswer(result_origin7, result_after7)
+
+ sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""")
+ val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket")
+ val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName)
+ val partitionIds3 = partitionInfo3.getPartitionIds
+ val rangeInfo3 = partitionInfo3.getRangeInfo
+ assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava)
+ assert(partitionInfo3.getMAX_PARTITION == 6)
+ assert(partitionInfo3.getNumPartitions == 3)
+ assert(rangeInfo3.get(0) == "2014/01/01")
+ assert(rangeInfo3.get(1) == "2015/01/01")
+ assert(rangeInfo3.size() == 2)
+ validateDataFiles("default_range_table_bucket", "0", Seq(0, 1, 2))
+ val result_after8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""")
+ val result_origin8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""")
+ checkAnswer(result_after8, result_origin8)
}
test("test exception when alter partition and the values"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 43d456f..838e5be 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -393,15 +393,14 @@ public final class CarbonDataMergerUtil {
/**
* To identify which all segments can be merged.
*
- * @param storeLocation
* @param carbonLoadModel
* @param compactionSize
* @return
*/
- public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
+ public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
CarbonLoadModel carbonLoadModel, long compactionSize,
List<LoadMetadataDetails> segments, CompactionType compactionType) {
-
+ String storeLocation = carbonLoadModel.getStorePath();
List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
sortSegments(sortedSegments);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
new file mode 100644
index 0000000..9316c9f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.spliter;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.spliter.exception.AlterPartitionSliceException;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class RowResultProcessor {
+
+ private CarbonFactHandler dataHandler;
+ private SegmentProperties segmentProperties;
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(RowResultProcessor.class.getName());
+
+
+ public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
+ SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
+ CarbonDataProcessorUtil.createLocations(tempStoreLocation);
+ this.segmentProperties = segProp;
+ String tableName = carbonTable.getFactTableName();
+ CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+ CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
+ segProp, tableName, tempStoreLocation);
+ CarbonDataFileAttributes carbonDataFileAttributes =
+ new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+ loadModel.getFactTimeStamp());
+ carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+ carbonFactDataHandlerModel.setBucketId(bucketId);
+ //Note: set compaction flow just to convert decimal type
+ carbonFactDataHandlerModel.setCompactionFlow(true);
+ dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+ }
+
+ public boolean execute(List<Object[]> resultList) {
+ boolean processStatus;
+ boolean isDataPresent = false;
+
+ try {
+ if (!isDataPresent) {
+ dataHandler.initialise();
+ isDataPresent = true;
+ }
+ for (Object[] row: resultList) {
+ addRow(row);
+ }
+ if (isDataPresent)
+ {
+ this.dataHandler.finish();
+ }
+ processStatus = true;
+ } catch (AlterPartitionSliceException e) {
+ LOGGER.error(e, e.getMessage());
+ LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
+ processStatus = false;
+ } finally {
+ try {
+ if (isDataPresent) {
+ this.dataHandler.closeHandler();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
+ processStatus = false;
+ }
+ }
+ return processStatus;
+ }
+
+ private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
+ CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+ try {
+ this.dataHandler.addDataToStore(row);
+ } catch (CarbonDataWriterException e) {
+ throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
deleted file mode 100644
index ea38a53..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.spliter;
-
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.spliter.exception.SliceSpliterException;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class RowResultSpliterProcessor {
-
- private CarbonFactHandler dataHandler;
- private SegmentProperties segmentProperties;
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(RowResultSpliterProcessor.class.getName());
-
-
- public RowResultSpliterProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
- SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
- CarbonDataProcessorUtil.createLocations(tempStoreLocation);
- this.segmentProperties = segProp;
- String tableName = carbonTable.getFactTableName();
- CarbonFactDataHandlerModel carbonFactDataHandlerModel =
- CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
- segProp, tableName, tempStoreLocation);
- CarbonDataFileAttributes carbonDataFileAttributes =
- new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
- loadModel.getFactTimeStamp());
- carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
- carbonFactDataHandlerModel.setBucketId(bucketId);
- //Note: set compaction flow just to convert decimal type
- carbonFactDataHandlerModel.setCompactionFlow(true);
- dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
- }
-
- public boolean execute(List<Object[]> resultList) {
- boolean splitStatus;
- boolean isDataPresent = false;
-
- try {
- if (!isDataPresent) {
- dataHandler.initialise();
- isDataPresent = true;
- }
- for (Object[] row: resultList) {
- addRow(row);
- }
- if (isDataPresent)
- {
- this.dataHandler.finish();
- }
- splitStatus = true;
- } catch (SliceSpliterException e) {
- LOGGER.error(e, e.getMessage());
- LOGGER.error("Exception in split partition" + e.getMessage());
- splitStatus = false;
- } finally {
- try {
- if (isDataPresent) {
- this.dataHandler.closeHandler();
- }
- } catch (Exception e) {
- LOGGER.error("Exception while closing the handler in partition spliter" + e.getMessage());
- splitStatus = false;
- }
- }
- return splitStatus;
- }
-
- private void addRow(Object[] carbonTuple) throws SliceSpliterException {
- CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
- try {
- this.dataHandler.addDataToStore(row);
- } catch (CarbonDataWriterException e) {
- throw new SliceSpliterException("Problem in writing rows when add/split the partition", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
new file mode 100644
index 0000000..0e53a1f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.spliter.exception;
+
+import java.util.Locale;
+
+public class AlterPartitionSliceException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public AlterPartitionSliceException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public AlterPartitionSliceException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * This method is used to get the localized message.
+ *
+ * @param locale - A Locale object represents a specific geographical,
+ * political, or cultural region.
+ * @return - Localized error message.
+ */
+ public String getLocalizedMessage(Locale locale) {
+ return "";
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
deleted file mode 100644
index 17e679a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.spliter.exception;
-
-import java.util.Locale;
-
-public class SliceSpliterException extends Exception {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceSpliterException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public SliceSpliterException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
[2/2] carbondata git commit: [CARBONDATA-1316] Support drop partition
function
Posted by qi...@apache.org.
[CARBONDATA-1316] Support drop partition function
This closes #1317
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cb51b862
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cb51b862
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cb51b862
Branch: refs/heads/master
Commit: cb51b86218cd815167f7c702b643ed0852c7f3dc
Parents: fe36e3b
Author: lionelcao <wh...@gmail.com>
Authored: Mon Sep 4 15:38:44 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Sep 18 17:19:22 2017 +0800
----------------------------------------------------------------------
.../core/metadata/schema/PartitionInfo.java | 5 +
.../hadoop/api/CarbonTableInputFormat.java | 23 ++-
.../spark/partition/DropPartitionCallable.java | 39 +++++
.../org/apache/carbondata/spark/KeyVal.scala | 4 +-
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 141 +++++++++++++++
.../spark/rdd/AlterTableSplitPartitionRDD.scala | 146 ----------------
.../spark/rdd/CarbonScanPartitionRDD.scala | 29 ++--
.../apache/carbondata/spark/rdd/Compactor.scala | 3 +-
.../spark/rdd/DataManagementFunc.scala | 50 +++---
.../carbondata/spark/rdd/PartitionDropper.scala | 122 +++++++++++++
.../spark/rdd/PartitionSplitter.scala | 36 ++--
.../carbondata/spark/util/CommonUtil.scala | 2 +-
.../spark/util/GlobalDictionaryUtil.scala | 3 +-
.../command/carbonTableSchemaCommon.scala | 25 ++-
.../org/apache/spark/util/PartitionUtils.scala | 15 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 8 +-
.../execution/command/carbonTableSchema.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 103 ++++++++---
.../execution/command/carbonTableSchema.scala | 145 +++++++++++++++-
.../sql/parser/CarbonSpark2SqlParser.scala | 16 +-
.../partition/TestAlterPartitionTable.scala | 171 +++++++++++++++----
.../processing/merger/CarbonDataMergerUtil.java | 5 +-
.../processing/spliter/RowResultProcessor.java | 105 ++++++++++++
.../spliter/RowResultSpliterProcessor.java | 105 ------------
.../exception/AlterPartitionSliceException.java | 78 +++++++++
.../exception/SliceSpliterException.java | 78 ---------
26 files changed, 978 insertions(+), 481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 4b0bc3e..d0c4447 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -92,6 +92,11 @@ public class PartitionInfo implements Serializable {
numPartitions = numPartitions - 1 + newPartitionNumbers;
}
+ public void dropPartition(int index) {
+ partitionIds.remove(index);
+ numPartitions--;
+ }
+
public List<ColumnSchema> getColumnSchemaList() {
return columnSchemaList;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index dcc75bd..9076233 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -306,7 +306,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
// prune partitions for filter query on partition table
BitSet matchedPartitions = null;
if (partitionInfo != null) {
- matchedPartitions = setMatchedPartitions(null, filter, partitionInfo);
+ matchedPartitions = setMatchedPartitions(null, filter, partitionInfo, null);
if (matchedPartitions != null) {
if (matchedPartitions.cardinality() == 0) {
return new ArrayList<InputSplit>();
@@ -366,9 +366,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
TableProvider tableProvider = new SingleTableProvider(carbonTable);
// prune partitions for filter query on partition table
String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
+ // matchedPartitions records partitionIndex, not partitionId
BitSet matchedPartitions = null;
if (partitionInfo != null) {
- matchedPartitions = setMatchedPartitions(partitionIds, filter, partitionInfo);
+ matchedPartitions =
+ setMatchedPartitions(partitionIds, filter, partitionInfo, oldPartitionIdList);
if (matchedPartitions != null) {
if (matchedPartitions.cardinality() == 0) {
return new ArrayList<InputSplit>();
@@ -396,15 +398,24 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
}
}
+ /**
+ * set the matched partition indices into a BitSet
+ * @param partitionIds from alter table command, for normal query, it's null
+ * @param filter from query
+ * @param partitionInfo
+ * @param oldPartitionIdList only used in alter table command
+ * @return
+ */
private BitSet setMatchedPartitions(String partitionIds, Expression filter,
- PartitionInfo partitionInfo) {
+ PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) {
BitSet matchedPartitions = null;
if (null != partitionIds) {
String[] partList = partitionIds.replace("[", "").replace("]", "").split(",");
- // only one partitionId in current alter table statement
- matchedPartitions = new BitSet(Integer.parseInt(partList[0]));
+ // partList[0] -> use the first element to initiate BitSet, will auto expand later
+ matchedPartitions = new BitSet(Integer.parseInt(partList[0].trim()));
for (String partitionId : partList) {
- matchedPartitions.set(Integer.parseInt(partitionId));
+ Integer index = oldPartitionIdList.indexOf(Integer.parseInt(partitionId.trim()));
+ matchedPartitions.set(index);
}
} else {
if (null != filter) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
new file mode 100644
index 0000000..ce66aac
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.partition;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.spark.rdd.PartitionDropper;
+
+import org.apache.spark.sql.execution.command.DropPartitionCallableModel;
+
+public class DropPartitionCallable implements Callable<Void> {
+
+ private DropPartitionCallableModel dropPartitionCallableModel;
+
+ public DropPartitionCallable(DropPartitionCallableModel dropPartitionCallableModel) {
+ this.dropPartitionCallableModel = dropPartitionCallableModel;
+ }
+
+ @Override public Void call() {
+ PartitionDropper.triggerPartitionDrop(dropPartitionCallableModel);
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 181f6e4..7cf8c88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -107,11 +107,11 @@ class MergeResultImpl extends MergeResult[String, Boolean] {
override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
}
-trait SplitResult[K, V] extends Serializable {
+trait AlterPartitionResult[K, V] extends Serializable {
def getKey(key: String, value: Boolean): (K, V)
}
-class SplitResultImpl extends SplitResult[String, Boolean] {
+class AlterPartitionResultImpl extends AlterPartitionResult[String, Boolean] {
override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
new file mode 100644
index 0000000..6cf8a7a
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.AlterPartitionModel
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.spliter.RowResultProcessor
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.AlterPartitionResult
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
+ result: AlterPartitionResult[K, V],
+ partitionIds: Seq[String],
+ bucketId: Int,
+ identifier: AbsoluteTableIdentifier,
+ prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+
+ var storeLocation: String = null
+ val carbonLoadModel = alterPartitionModel.carbonLoadModel
+ val segmentId = alterPartitionModel.segmentId
+ val oldPartitionIds = alterPartitionModel.oldPartitionIds
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val databaseName = carbonTable.getDatabaseName
+ val factTableName = carbonTable.getFactTableName
+ val partitionInfo = carbonTable.getPartitionInfo(factTableName)
+
+ override protected def getPartitions: Array[Partition] = {
+ val sc = alterPartitionModel.sqlContext.sparkContext
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+ sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+ firstParent[Array[AnyRef]].partitions
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
+ val iter = new Iterator[(K, V)] {
+ val partitionId = partitionInfo.getPartitionId(split.index)
+ carbonLoadModel.setTaskNo(String.valueOf(partitionId))
+ carbonLoadModel.setSegmentId(segmentId)
+ carbonLoadModel.setPartitionId("0")
+ val tempLocationKey = CarbonDataProcessorUtil
+ .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName,
+ segmentId,
+ carbonLoadModel.getTaskNo,
+ false,
+ true)
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
+
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+ val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.nonEmpty) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ } else {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+ LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+
+ val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+ factTableName,
+ carbonLoadModel.getTaskNo,
+ "0",
+ segmentId,
+ false,
+ true
+ )
+
+ val loadStatus = if (rows.isEmpty) {
+ LOGGER.info("After repartition this split, NO target rows to write back.")
+ true
+ } else {
+ val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
+ segmentId, partitionIds.toList, oldPartitionIds, partitionInfo)
+ val processor = new RowResultProcessor(
+ carbonTable,
+ carbonLoadModel,
+ segmentProperties,
+ tempStoreLoc,
+ bucketId)
+ try {
+ processor.execute(rows)
+ } catch {
+ case e: Exception =>
+ sys.error(s"Exception when executing Row result processor ${e.getMessage}")
+ } finally {
+ CarbonLoaderUtil
+ .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
+ }
+ }
+
+ val loadResult = segmentId
+ var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(loadResult, loadStatus)
+ }
+ }
+ iter
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
deleted file mode 100644
index e481fc4..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.util.Random
-
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.PartitionUtils
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.spliter.RowResultSpliterProcessor
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.SplitResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-
-class AlterTableSplitPartitionRDD[K, V](
- sc: SparkContext,
- result: SplitResult[K, V],
- partitionIds: Seq[String],
- segmentId: String,
- bucketId: Int,
- carbonLoadModel: CarbonLoadModel,
- identifier: AbsoluteTableIdentifier,
- storePath: String,
- oldPartitionIdList: List[Int],
- prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
- sc.setLocalProperty("spark.job.interruptOnCancel", "true")
-
- var storeLocation: String = null
- var splitResult: String = null
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val databaseName = carbonTable.getDatabaseName
- val factTableName = carbonTable.getFactTableName
- val partitionInfo = carbonTable.getPartitionInfo(factTableName)
-
- override protected def getPartitions: Array[Partition] = firstParent[Array[AnyRef]].partitions
-
- override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
- val iter = new Iterator[(K, V)] {
- val partitionId = partitionInfo.getPartitionId(split.index)
- carbonLoadModel.setTaskNo(String.valueOf(partitionId))
- carbonLoadModel.setSegmentId(segmentId)
- carbonLoadModel.setPartitionId("0")
- val tempLocationKey = CarbonDataProcessorUtil
- .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName,
- segmentId,
- carbonLoadModel.getTaskNo,
- false,
- true)
- // this property is used to determine whether temp location for carbon is inside
- // container temp dir or is yarn application directory.
- val carbonUseLocalDir = CarbonProperties.getInstance()
- .getProperty("carbon.use.local.dir", "false")
-
- if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
- val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.nonEmpty) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- } else {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
- LOGGER.info(s"Temp storeLocation taken is $storeLocation")
-
- val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
- factTableName,
- carbonLoadModel.getTaskNo,
- "0",
- segmentId,
- false,
- true
- )
-
- val splitStatus = if (rows.isEmpty) {
- LOGGER.info("After repartition this split, NO target rows to write back.")
- true
- } else {
- try {
- val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
- segmentId, partitionIds.toList, oldPartitionIdList, partitionInfo)
- val processor = new RowResultSpliterProcessor(
- carbonTable,
- carbonLoadModel,
- segmentProperties,
- tempStoreLoc,
- bucketId
- )
- processor.execute(rows)
- } catch {
- case e: Exception =>
- sys.error(s"Exception when executing Row result processor ${e.getMessage}")
- } finally {
- CarbonLoaderUtil
- .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
- }
-
- }
-
- val splitResult = segmentId
- var finished = false
-
- override def hasNext: Boolean = {
- !finished
- }
-
- override def next(): (K, V) = {
- finished = true
- result.getKey(splitResult, splitStatus)
- }
- }
- iter
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 2a39db5..86bc79f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.AlterPartitionModel
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.PartitionUtils
@@ -53,27 +54,23 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
/**
* This RDD is used in alter table partition statement to get data of target partitions,
* then repartition data according to new partitionInfo
- * @param sc
+ * @param alterPartitionModel
+ * @param carbonTableIdentifier
* @param partitionIds the ids of target partition to be scanned
- * @param storePath
- * @param segmentId
* @param bucketId
- * @param oldPartitionIdList the taskId in partition order before partitionInfo is modified
- * @param carbonTableIdentifier
- * @param carbonLoadModel
*/
-class CarbonScanPartitionRDD(
- sc: SparkContext,
- partitionIds: Seq[String],
- storePath: String,
- segmentId: String,
- bucketId: Int,
- oldPartitionIdList: List[Int],
+class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
carbonTableIdentifier: CarbonTableIdentifier,
- carbonLoadModel: CarbonLoadModel)
- extends RDD[(AnyRef, Array[AnyRef])](sc, Nil) {
+ partitionIds: Seq[String],
+ bucketId: Int)
+ extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) {
- private val queryId = sc.getConf.get("queryId", System.nanoTime() + "")
+ private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf
+ .get("queryId", System.nanoTime() + "")
+ val segmentId = alterPartitionModel.segmentId
+ val carbonLoadModel = alterPartitionModel.carbonLoadModel
+ val oldPartitionIdList = alterPartitionModel.oldPartitionIds
+ val storePath = carbonLoadModel.getStorePath
val identifier = new AbsoluteTableIdentifier(storePath, carbonTableIdentifier)
var storeLocation: String = null
var splitStatus: Boolean = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index c13a942..fb610c1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -36,14 +36,13 @@ object Compactor {
def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
- val storePath = compactionCallableModel.storePath
val storeLocation = compactionCallableModel.storeLocation
val carbonTable = compactionCallableModel.carbonTable
val loadsToMerge = compactionCallableModel.loadsToMerge
val sc = compactionCallableModel.sqlContext
val carbonLoadModel = compactionCallableModel.carbonLoadModel
val compactionType = compactionCallableModel.compactionType
-
+ val storePath = carbonLoadModel.getStorePath
val startTime = System.nanoTime()
val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
var finalMergeStatus = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index bca119e..c2b7b74 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, SplitPartitionCallableModel}
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, DropPartitionCallableModel, SplitPartitionCallableModel}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -39,7 +39,7 @@ import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM
import org.apache.carbondata.spark._
import org.apache.carbondata.spark.compaction.CompactionCallable
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.partition.SplitPartitionCallable
+import org.apache.carbondata.spark.partition.{DropPartitionCallable, SplitPartitionCallable}
import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
/**
@@ -149,7 +149,6 @@ object DataManagementFunc {
}
def executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
compactionModel: CompactionModel,
executor: ExecutorService,
sqlContext: SQLContext,
@@ -161,7 +160,6 @@ object DataManagementFunc {
var segList = carbonLoadModel.getLoadMetadataDetails
var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- storePath,
carbonLoadModel,
compactionModel.compactionSize,
segList,
@@ -180,7 +178,6 @@ object DataManagementFunc {
scanSegmentsAndSubmitJob(futureList,
loadsToMerge,
executor,
- storePath,
sqlContext,
compactionModel,
carbonLoadModel,
@@ -200,7 +197,7 @@ object DataManagementFunc {
}
// scan again and determine if anything is there to merge again.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
segList = carbonLoadModel.getLoadMetadataDetails
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
@@ -215,7 +212,6 @@ object DataManagementFunc {
loadsToMerge.clear()
} else if (segList.size > 0) {
loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- storePath,
carbonLoadModel,
compactionModel.compactionSize,
segList,
@@ -234,10 +230,8 @@ object DataManagementFunc {
* @param futureList
*/
private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
- loadsToMerge: util
- .List[LoadMetadataDetails],
+ loadsToMerge: util.List[LoadMetadataDetails],
executor: ExecutorService,
- storePath: String,
sqlContext: SQLContext,
compactionModel: CompactionModel,
carbonLoadModel: CarbonLoadModel,
@@ -248,8 +242,7 @@ object DataManagementFunc {
}
)
- val compactionCallableModel = CompactionCallableModel(storePath,
- carbonLoadModel,
+ val compactionCallableModel = CompactionCallableModel(carbonLoadModel,
storeLocation,
compactionModel.carbonTable,
loadsToMerge,
@@ -264,14 +257,13 @@ object DataManagementFunc {
def executePartitionSplit( sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
executor: ExecutorService,
- storePath: String,
segment: String,
partitionId: String,
oldPartitionIdList: List[Int]): Unit = {
val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
)
- scanSegmentsForSplitPartition(futureList, executor, storePath, segment, partitionId,
+ scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
sqlContext, carbonLoadModel, oldPartitionIdList)
try {
futureList.asScala.foreach(future => {
@@ -287,15 +279,13 @@ object DataManagementFunc {
private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]],
executor: ExecutorService,
- storePath: String,
segmentId: String,
partitionId: String,
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
oldPartitionIdList: List[Int]): Unit = {
- val splitModel = SplitPartitionCallableModel(storePath,
- carbonLoadModel,
+ val splitModel = SplitPartitionCallableModel(carbonLoadModel,
segmentId,
partitionId,
oldPartitionIdList,
@@ -305,9 +295,27 @@ object DataManagementFunc {
futureList.add(future)
}
- def prepareCarbonLoadModel(storePath: String,
- table: CarbonTable,
- newCarbonLoadModel: CarbonLoadModel): Unit = {
+ def executeDroppingPartition(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val model = new DropPartitionCallableModel(carbonLoadModel,
+ segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext)
+ val future: Future[Void] = executor.submit(new DropPartitionCallable(model))
+ try {
+ future.get
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+ throw e
+ }
+ }
+
+ def prepareCarbonLoadModel(table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = {
newCarbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
@@ -315,7 +323,7 @@ object DataManagementFunc {
newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
newCarbonLoadModel.setStorePath(table.getStorePath)
- CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(newCarbonLoadModel)
val loadStartTime = CarbonUpdateUtil.readCurrentTime();
newCarbonLoadModel.setFactTimeStamp(loadStartTime)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
new file mode 100644
index 0000000..0a41f44
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.IOException
+
+import org.apache.spark.sql.execution.command.{AlterPartitionModel, DropPartitionCallableModel}
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory}
+
+object PartitionDropper {
+
+ val logger = LogServiceFactory.getLogService(PartitionDropper.getClass.getName)
+
+ def triggerPartitionDrop(dropPartitionCallableModel: DropPartitionCallableModel): Unit = {
+ val alterPartitionModel = new AlterPartitionModel(dropPartitionCallableModel.carbonLoadModel,
+ dropPartitionCallableModel.segmentId,
+ dropPartitionCallableModel.oldPartitionIds,
+ dropPartitionCallableModel.sqlContext
+ )
+ val partitionId = dropPartitionCallableModel.partitionId
+ val oldPartitionIds = dropPartitionCallableModel.oldPartitionIds
+ val dropWithData = dropPartitionCallableModel.dropWithData
+ val carbonTable = dropPartitionCallableModel.carbonTable
+ val dbName = carbonTable.getDatabaseName
+ val tableName = carbonTable.getFactTableName
+ val identifier = carbonTable.getAbsoluteTableIdentifier
+ val carbonTableIdentifier = identifier.getCarbonTableIdentifier
+ val partitionInfo = carbonTable.getPartitionInfo(tableName)
+ val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+
+ var finalDropStatus = false
+ val bucketInfo = carbonTable.getBucketingInfo(tableName)
+ val bucketNumber = bucketInfo match {
+ case null => 1
+ case _ => bucketInfo.getNumberOfBuckets
+ }
+ val partitionIndex = oldPartitionIds.indexOf(Integer.valueOf(partitionId))
+ val targetPartitionId = partitionInfo.getPartitionType match {
+ case PartitionType.RANGE => if (partitionIndex == oldPartitionIds.length - 1) {
+ "0"
+ } else {
+ String.valueOf(oldPartitionIds(partitionIndex + 1))
+ }
+ case PartitionType.LIST => "0"
+ }
+
+ if (!dropWithData) {
+ try {
+ for (i <- 0 until bucketNumber) {
+ val bucketId = i
+ val rdd = new CarbonScanPartitionRDD(alterPartitionModel,
+ carbonTableIdentifier,
+ Seq(partitionId, targetPartitionId),
+ bucketId
+ ).partitionBy(partitioner).map(_._2)
+
+ val dropStatus = new AlterTableLoadPartitionRDD(alterPartitionModel,
+ new AlterPartitionResultImpl(),
+ Seq(partitionId),
+ bucketId,
+ identifier,
+ rdd).collect()
+
+ if (dropStatus.length == 0) {
+ finalDropStatus = false
+ } else {
+ finalDropStatus = dropStatus.forall(_._2)
+ }
+ if (!finalDropStatus) {
+ logger.audit(s"Drop Partition request failed for table " +
+ s"${ dbName }.${ tableName }")
+ logger.error(s"Drop Partition request failed for table " +
+ s"${ dbName }.${ tableName }")
+ }
+ }
+
+ if (finalDropStatus) {
+ try {
+ PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, identifier,
+ Seq(partitionId, targetPartitionId).toList, dbName,
+ tableName, partitionInfo)
+ } catch {
+ case e: IOException => sys.error(s"Exception while delete original carbon files " +
+ e.getMessage)
+ }
+ logger.audit(s"Drop Partition request completed for table " +
+ s"${ dbName }.${ tableName }")
+ logger.info(s"Drop Partition request completed for table " +
+ s"${ dbName }.${ tableName }")
+ }
+ } catch {
+ case e: Exception => sys.error(s"Exception in dropping partition action: ${ e.getMessage }")
+ }
+ } else {
+ PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, identifier,
+ Seq(partitionId).toList, dbName, tableName, partitionInfo)
+ logger.audit(s"Drop Partition request completed for table " +
+ s"${ dbName }.${ tableName }")
+ logger.info(s"Drop Partition request completed for table " +
+ s"${ dbName }.${ tableName }")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index 48e1bee..fca7542 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -19,22 +19,24 @@ package org.apache.carbondata.spark.rdd
import java.io.IOException
-import org.apache.spark.sql.execution.command.SplitPartitionCallableModel
+import org.apache.spark.sql.execution.command.{AlterPartitionModel, SplitPartitionCallableModel}
import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.spark.{PartitionFactory, SplitResultImpl}
+import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory}
object PartitionSplitter {
val logger = LogServiceFactory.getLogService(PartitionSplitter.getClass.getName)
def triggerPartitionSplit(splitPartitionCallableModel: SplitPartitionCallableModel): Unit = {
- val sc = splitPartitionCallableModel.sqlContext.sparkContext
+
+ val alterPartitionModel = new AlterPartitionModel(splitPartitionCallableModel.carbonLoadModel,
+ splitPartitionCallableModel.segmentId,
+ splitPartitionCallableModel.oldPartitionIds,
+ splitPartitionCallableModel.sqlContext
+ )
val partitionId = splitPartitionCallableModel.partitionId
- val storePath = splitPartitionCallableModel.storePath
- val segmentId = splitPartitionCallableModel.segmentId
- val oldPartitionIdList = splitPartitionCallableModel.oldPartitionIdList
val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val identifier = carbonTable.getAbsoluteTableIdentifier
@@ -53,25 +55,17 @@ object PartitionSplitter {
for (i <- 0 until bucketNumber) {
val bucketId = i
val rdd = new CarbonScanPartitionRDD(
- sc,
- Seq(partitionId),
- storePath,
- segmentId,
- bucketId,
- oldPartitionIdList,
+ alterPartitionModel,
carbonTableIdentifier,
- carbonLoadModel
+ Seq(partitionId),
+ bucketId
).partitionBy(partitioner).map(_._2)
- val splitStatus = new AlterTableSplitPartitionRDD(sc,
- new SplitResultImpl(),
+ val splitStatus = new AlterTableLoadPartitionRDD(alterPartitionModel,
+ new AlterPartitionResultImpl(),
Seq(partitionId),
- segmentId,
bucketId,
- carbonLoadModel,
identifier,
- storePath,
- oldPartitionIdList,
rdd).collect()
if (splitStatus.length == 0) {
@@ -89,8 +83,8 @@ object PartitionSplitter {
if (finalSplitStatus) {
try {
PartitionUtils.
- deleteOriginalCarbonFile(identifier, segmentId, Seq(partitionId).toList,
- oldPartitionIdList, storePath, databaseName, tableName, partitionInfo, carbonLoadModel)
+ deleteOriginalCarbonFile(alterPartitionModel, identifier, Seq(partitionId).toList
+ , databaseName, tableName, partitionInfo)
} catch {
case e: IOException => sys.error(s"Exception while delete original carbon files " +
e.getMessage)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index fd265a8..f123624 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -537,7 +537,7 @@ object CommonUtil {
}
}
- def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
+ def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
val details = SegmentStatusManager.readLoadMetadata(metadataPath)
model.setLoadMetadataDetails(details.toList.asJava)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 47eaece..601c0c7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -314,7 +314,6 @@ object GlobalDictionaryUtil {
isComplexes += dimensions(i).isComplex
}
}
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
val primDimensions = primDimensionsBuffer.map { x => x }.toArray
val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
@@ -330,7 +329,7 @@ object GlobalDictionaryUtil {
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// get load count
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
DictionaryLoadModel(table,
dimensions,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index cc2cc82..f5d69ef 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -118,19 +118,31 @@ case class CompactionModel(compactionSize: Long,
carbonTable: CarbonTable,
isDDLTrigger: Boolean)
-case class CompactionCallableModel(storePath: String,
- carbonLoadModel: CarbonLoadModel,
+case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
storeLocation: String,
carbonTable: CarbonTable,
loadsToMerge: util.List[LoadMetadataDetails],
sqlContext: SQLContext,
compactionType: CompactionType)
-case class SplitPartitionCallableModel(storePath: String,
- carbonLoadModel: CarbonLoadModel,
+case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
+ segmentId: String,
+ oldPartitionIds: List[Int],
+ sqlContext: SQLContext
+)
+
+case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
segmentId: String,
partitionId: String,
- oldPartitionIdList: List[Int],
+ oldPartitionIds: List[Int],
+ sqlContext: SQLContext)
+
+case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
+ segmentId: String,
+ partitionId: String,
+ oldPartitionIds: List[Int],
+ dropWithData: Boolean,
+ carbonTable: CarbonTable,
sqlContext: SQLContext)
case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
@@ -160,7 +172,8 @@ case class AlterTableDropColumnModel(databaseName: Option[String],
case class AlterTableDropPartitionModel(databaseName: Option[String],
tableName: String,
- partitionId: String)
+ partitionId: String,
+ dropWithData: Boolean)
case class AlterTableSplitPartitionModel(databaseName: Option[String],
tableName: String,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 3982f7b..002ed27 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.execution.command.AlterPartitionModel
import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -151,13 +152,17 @@ object PartitionUtils {
}
@throws(classOf[IOException])
- def deleteOriginalCarbonFile(identifier: AbsoluteTableIdentifier, segmentId: String,
- partitionIds: List[String], oldPartitionIdList: List[Int], storePath: String,
- dbName: String, tableName: String, partitionInfo: PartitionInfo,
- carbonLoadModel: CarbonLoadModel): Unit = {
+ def deleteOriginalCarbonFile(alterPartitionModel: AlterPartitionModel,
+ identifier: AbsoluteTableIdentifier,
+ partitionIds: List[String], dbName: String, tableName: String,
+ partitionInfo: PartitionInfo): Unit = {
+ val carbonLoadModel = alterPartitionModel.carbonLoadModel
+ val segmentId = alterPartitionModel.segmentId
+ val oldPartitionIds = alterPartitionModel.oldPartitionIds
val newTime = carbonLoadModel.getFactTimeStamp
+ val storePath = carbonLoadModel.getStorePath
val tableBlockInfoList =
- getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList,
+ getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIds,
partitionInfo).asScala
val pathList: util.List[String] = new util.ArrayList[String]()
val carbonTablePath = new CarbonTablePath(storePath, dbName, tableName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ef2a917..596cebf 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -104,7 +104,7 @@ object CarbonDataRDDFactory {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
// reading the start time of data load.
val loadStartTime = CarbonUpdateUtil.readCurrentTime();
@@ -228,7 +228,7 @@ object CarbonDataRDDFactory {
compactionLock: ICarbonLock): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
val compactionThread = new Thread {
override def run(): Unit = {
@@ -238,7 +238,6 @@ object CarbonDataRDDFactory {
var exception: Exception = null
try {
DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
compactionModel: CompactionModel,
executor, sqlContext, storeLocation
)
@@ -269,7 +268,7 @@ object CarbonDataRDDFactory {
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = new CarbonLoadModel()
- DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+ DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel)
val compactionSize = CarbonDataMergerUtil
.getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -282,7 +281,6 @@ object CarbonDataRDDFactory {
// proceed for compaction
try {
DataManagementFunc.executeCompaction(newCarbonLoadModel,
- newCarbonLoadModel.getStorePath,
newcompactionModel,
executor, sqlContext, storeLocation
)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8a39b0a..130f305 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -593,7 +593,7 @@ case class LoadTable(
LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName")
}
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c7b72d5..0edfccf 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -76,7 +76,6 @@ object CarbonDataRDDFactory {
def alterTableForCompaction(sqlContext: SQLContext,
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
storeLocation: String): Unit = {
var compactionSize: Long = 0
var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
@@ -104,7 +103,7 @@ object CarbonDataRDDFactory {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
// reading the start time of data load.
val loadStartTime : Long =
@@ -135,7 +134,6 @@ object CarbonDataRDDFactory {
LOGGER.info("System level compaction lock is enabled.")
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
- storePath,
storeLocation,
compactionType,
carbonTable,
@@ -154,7 +152,6 @@ object CarbonDataRDDFactory {
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
- storePath,
storeLocation,
compactionModel,
lock
@@ -178,14 +175,12 @@ object CarbonDataRDDFactory {
def alterTableSplitPartition(sqlContext: SQLContext,
partitionId: String,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
oldPartitionIdList: List[Int]): Unit = {
LOGGER.audit(s"Add partition request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startSplitThreads(sqlContext,
carbonLoadModel,
- storePath,
partitionId,
oldPartitionIdList)
} catch {
@@ -195,9 +190,28 @@ object CarbonDataRDDFactory {
}
}
+ def alterTableDropPartition(sqlContext: SQLContext,
+ partitionId: String,
+ carbonLoadModel: CarbonLoadModel,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ LOGGER.audit(s"Drop partition request received for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ try {
+ startDropThreads(sqlContext,
+ carbonLoadModel,
+ partitionId,
+ dropWithData,
+ oldPartitionIds)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
+ throw e
+ }
+ }
+
def handleCompactionForSystemLocking(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
storeLocation: String,
compactionType: CompactionType,
carbonTable: CarbonTable,
@@ -212,7 +226,6 @@ object CarbonDataRDDFactory {
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
- storePath,
storeLocation,
compactionModel,
lock
@@ -248,7 +261,6 @@ object CarbonDataRDDFactory {
def startCompactionThreads(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
storeLocation: String,
compactionModel: CompactionModel,
compactionLock: ICarbonLock): Unit = {
@@ -257,7 +269,7 @@ object CarbonDataRDDFactory {
if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
// update the updated table status. For the case of Update Delta Compaction the Metadata
// is filled in LoadModel, no need to refresh.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
val compactionThread = new Thread {
@@ -269,7 +281,6 @@ object CarbonDataRDDFactory {
var exception: Exception = null
try {
DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
- storePath: String,
compactionModel: CompactionModel,
executor, sqlContext, storeLocation
)
@@ -301,7 +312,7 @@ object CarbonDataRDDFactory {
val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
val newCarbonLoadModel = new CarbonLoadModel()
- DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
+ DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel)
val compactionSize = CarbonDataMergerUtil
.getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -314,7 +325,6 @@ object CarbonDataRDDFactory {
// proceed for compaction
try {
DataManagementFunc.executeCompaction(newCarbonLoadModel,
- newCarbonLoadModel.getStorePath,
newcompactionModel,
executor, sqlContext, storeLocation
)
@@ -365,7 +375,6 @@ object CarbonDataRDDFactory {
case class SplitThread(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
executor: ExecutorService,
- storePath: String,
segmentId: String,
partitionId: String,
oldPartitionIdList: List[Int]) extends Thread {
@@ -374,8 +383,7 @@ object CarbonDataRDDFactory {
var exception: Exception = null
try {
DataManagementFunc.executePartitionSplit(sqlContext,
- carbonLoadModel, executor, storePath, segmentId, partitionId,
- oldPartitionIdList)
+ carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
triggeredSplitPartitionStatus = true
} catch {
case e: Exception =>
@@ -388,9 +396,26 @@ object CarbonDataRDDFactory {
}
}
+ case class dropPartitionThread(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ executor: ExecutorService,
+ segmentId: String,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]) extends Thread {
+ override def run(): Unit = {
+ try {
+ DataManagementFunc.executeDroppingPartition(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, dropWithData, oldPartitionIds)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+ }
+ }
+ }
+
def startSplitThreads(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
partitionId: String,
oldPartitionIdList: List[Int]): Unit = {
val numberOfCores = CarbonProperties.getInstance()
@@ -405,7 +430,7 @@ object CarbonDataRDDFactory {
val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
var i = 0
validSegments.foreach { segmentId =>
- threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, storePath,
+ threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
segmentId, partitionId, oldPartitionIdList)
threadArray(i).start()
i += 1
@@ -429,6 +454,46 @@ object CarbonDataRDDFactory {
}
}
+ def startDropThreads(sqlContext: SQLContext,
+ carbonLoadModel: CarbonLoadModel,
+ partitionId: String,
+ dropWithData: Boolean,
+ oldPartitionIds: List[Int]): Unit = {
+ val numberOfCores = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+ CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+ val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
+ try {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
+ val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+ val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
+ var i = 0
+ for (segmentId: String <- validSegments) {
+ threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
+ segmentId, partitionId, dropWithData, oldPartitionIds)
+ threadArray(i).start()
+ i += 1
+ }
+ for (thread <- threadArray) {
+ thread.join()
+ }
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
+ } finally {
+ executor.shutdown()
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
+ s" ${ e.getMessage }")
+ }
+ }
+ }
+
def loadCarbonData(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
@@ -473,7 +538,6 @@ object CarbonDataRDDFactory {
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
- storePath,
storeLocation,
CompactionType.MINOR_COMPACTION,
carbonTable,
@@ -490,7 +554,6 @@ object CarbonDataRDDFactory {
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
- storePath,
storeLocation,
compactionModel,
lock
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 3f0153e..7ed280e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -21,7 +21,7 @@ import java.text.SimpleDateFormat
import java.util
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.language.implicitConversions
import org.apache.commons.lang3.StringUtils
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -177,7 +178,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
.alterTableForCompaction(sparkSession.sqlContext,
alterTableModel,
carbonLoadModel,
- relation.tableMeta.storePath,
storeLocation
)
} catch {
@@ -301,7 +301,6 @@ case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP
CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext,
partitionId.toString,
carbonLoadModel,
- relation.tableMeta.storePath,
oldPartitionIds.asScala.toList
)
success = true
@@ -313,6 +312,7 @@ case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP
AlterTableUtil.releaseLocks(locks)
CacheProvider.getInstance().dropAllCache()
LOGGER.info("Locks released after alter table add/split partition action.")
+ LOGGER.audit("Locks released after alter table add/split partition action.")
if (success) {
LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
@@ -322,7 +322,142 @@ case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP
}
}
-case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand
+case class AlterTableDropPartition(alterTableDropPartitionModel: AlterTableDropPartitionModel)
+ extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val tableName = alterTableDropPartitionModel.tableName
+ var dbName: String = null
+ val partitionId = alterTableDropPartitionModel.partitionId
+ val dropWithData = alterTableDropPartitionModel.dropWithData
+ if (partitionId == 0 ) {
+ sys.error(s"Cannot drop default partition! Please use delete statement!")
+ }
+ var partitionInfo: PartitionInfo = null
+ var carbonMetaStore: CarbonMetaStore = null
+ var relation: CarbonRelation = null
+ var storePath: String = null
+ var table: CarbonTable = null
+ var carbonTableIdentifier: CarbonTableIdentifier = null
+ val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+ LockUsage.COMPACTION_LOCK,
+ LockUsage.DELETE_SEGMENT_LOCK,
+ LockUsage.DROP_TABLE_LOCK,
+ LockUsage.CLEAN_FILES_LOCK,
+ LockUsage.ALTER_PARTITION_LOCK)
+
+ def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ processData(sparkSession)
+ Seq.empty
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ dbName = alterTableDropPartitionModel.databaseName
+ .getOrElse(sparkSession.catalog.currentDatabase)
+ carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+ storePath = relation.tableMeta.storePath
+ carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+ if (relation == null) {
+ sys.error(s"Table $dbName.$tableName does not exist")
+ }
+ if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) {
+ LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+ sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+ }
+ table = relation.tableMeta.carbonTable
+ partitionInfo = table.getPartitionInfo(tableName)
+ if (partitionInfo == null) {
+ sys.error(s"Table $tableName is not a partition table.")
+ }
+ val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+ // keep a copy of partitionIdList before update partitionInfo.
+ // will be used in partition data scan
+ oldPartitionIds.addAll(partitionIds.asJava)
+ val partitionIndex = partitionIds.indexOf(Integer.valueOf(partitionId))
+ partitionInfo.getPartitionType match {
+ case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!")
+ case PartitionType.RANGE =>
+ val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
+ val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
+ rangeInfo.remove(rangeToRemove)
+ partitionInfo.setRangeInfo(rangeInfo)
+ case PartitionType.LIST =>
+ val listInfo = new util.ArrayList(partitionInfo.getListInfo)
+ val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
+ listInfo.remove(listToRemove)
+ partitionInfo.setListInfo(listInfo)
+ case PartitionType.RANGE_INTERVAL =>
+ sys.error(s"Dropping range interval partition isn't support yet!")
+ }
+ partitionInfo.dropPartition(partitionIndex)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
+ val schemaFilePath = carbonTablePath.getSchemaFilePath
+ // read TableInfo
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+ val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+ dbName, tableName, storePath)
+ val tableSchema = wrapperTableInfo.getFactTable
+ tableSchema.setPartitionInfo(partitionInfo)
+ wrapperTableInfo.setFactTable(tableSchema)
+ wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+ val thriftTable =
+ schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
+ thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+ .setTime_stamp(System.currentTimeMillis)
+ carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+ dbName, tableName, storePath)
+ CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+ // update the schema modified time
+ carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+ // sparkSession.catalog.refreshTable(tableName)
+ Seq.empty
+ }
+
+ override def processData(sparkSession: SparkSession): Seq[Row] = {
+ var locks = List.empty[ICarbonLock]
+ var success = false
+ try {
+ locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+ locksToBeAcquired)(sparkSession)
+ val carbonLoadModel = new CarbonLoadModel()
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
+ carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+ carbonLoadModel.setStorePath(storePath)
+ val loadStartTime = CarbonUpdateUtil.readCurrentTime
+ carbonLoadModel.setFactTimeStamp(loadStartTime)
+ CarbonDataRDDFactory.alterTableDropPartition(sparkSession.sqlContext,
+ partitionId,
+ carbonLoadModel,
+ dropWithData,
+ oldPartitionIds.asScala.toList
+ )
+ success = true
+ } catch {
+ case e: Exception =>
+ sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ")
+ success = false
+ } finally {
+ CacheProvider.getInstance().dropAllCache()
+ AlterTableUtil.releaseLocks(locks)
+ LOGGER.info("Locks released after alter table drop partition action.")
+ LOGGER.audit("Locks released after alter table drop partition action.")
+ }
+ LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
+ LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
+ Seq.empty
+ }
+}
+
+ case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand
with SchemaProcessCommand {
def run(sparkSession: SparkSession): Seq[Row] = {
@@ -796,7 +931,7 @@ case class LoadTable(
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
}
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 1d74bee..24b2981 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -72,7 +72,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns
protected lazy val alterPartition: Parser[LogicalPlan] =
- alterAddPartition | alterSplitPartition
+ alterAddPartition | alterSplitPartition | alterDropPartition
protected lazy val alterAddPartition: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
@@ -95,6 +95,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
AlterTableSplitPartitionCommand(alterTableSplitPartitionModel)
}
+ protected lazy val alterDropPartition: Parser[LogicalPlan] =
+ ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (DROP ~> PARTITION ~>
+ "(" ~> numericLit <~ ")") ~ (WITH ~> DATA).? <~ opt(";") ^^ {
+ case dbName ~ table ~ partitionId ~ withData =>
+ val dropWithData = withData.getOrElse("NO") match {
+ case "NO" => false
+ case _ => true
+ }
+ val alterTableDropPartitionModel =
+ AlterTableDropPartitionModel(dbName, table, partitionId, dropWithData)
+ AlterTableDropPartition(alterTableDropPartitionModel)
+ }
+
+
protected lazy val alterTable: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";") ^^ {
case dbName ~ table ~ (compact ~ compactType) =>