You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/21 16:21:02 UTC
carbondata git commit: [CARBONDATA-1863][PARTITION] Supported clean
files for partition table.
Repository: carbondata
Updated Branches:
refs/heads/master 577a8b0d5 -> 9659edccb
[CARBONDATA-1863][PARTITION] Supported clean files for partition table.
This closes #1706
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9659edcc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9659edcc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9659edcc
Branch: refs/heads/master
Commit: 9659edccbd05e9f5499911e6d049d9b9e1cf8c3a
Parents: 577a8b0
Author: ravipesala <ra...@gmail.com>
Authored: Thu Dec 21 10:23:27 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Dec 21 21:50:42 2017 +0530
----------------------------------------------------------------------
.../blockletindex/SegmentIndexFileStore.java | 2 +-
.../core/metadata/PartitionMapFileStore.java | 147 ++++++++++++++--
.../core/writer/CarbonIndexFileMergeWriter.java | 41 +++--
.../StandardPartitionTableCleanTestCase.scala | 167 +++++++++++++++++++
.../StandardPartitionTableDropTestCase.scala | 2 +-
.../org/apache/carbondata/api/CarbonStore.scala | 10 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 29 ++--
.../management/CarbonCleanFilesCommand.scala | 15 +-
.../management/CarbonLoadDataCommand.scala | 37 ++--
.../datasources/CarbonFileFormat.scala | 9 +-
.../loading/model/CarbonLoadModel.java | 6 +-
.../processing/util/CarbonLoaderUtil.java | 3 +
12 files changed, 414 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 244e8bb..01cb1d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -42,7 +42,7 @@ public class SegmentIndexFileStore {
*/
private Map<String, byte[]> carbonIndexMap;
- public SegmentIndexFileStore() throws IOException {
+ public SegmentIndexFileStore() {
carbonIndexMap = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index b44f99b..d29dfbb 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -38,8 +39,18 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
import com.google.gson.Gson;
@@ -134,6 +145,31 @@ public class PartitionMapFileStore {
}
}
+ private String getPartitionFilePath(String segmentPath) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+ if (carbonFile.exists()) {
+ CarbonFile[] partitionFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile file) {
+ return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
+ }
+ });
+ if (partitionFiles != null && partitionFiles.length > 0) {
+ partionedSegment = true;
+ int i = 0;
+ // Get the latest partition map file based on the timestamp of that file.
+ long[] partitionTimestamps = new long[partitionFiles.length];
+ for (CarbonFile file : partitionFiles) {
+ partitionTimestamps[i++] = Long.parseLong(file.getName()
+ .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
+ }
+ Arrays.sort(partitionTimestamps);
+ return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
+ + CarbonTablePath.PARTITION_MAP_EXT;
+ }
+ }
+ return null;
+ }
+
private CarbonFile[] getPartitionFiles(String segmentPath) {
CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
if (carbonFile.exists()) {
@@ -179,22 +215,15 @@ public class PartitionMapFileStore {
return partitionMapper;
}
+ /**
+ * Reads all partitions which existed inside the passed segment path
+ * @param segmentPath
+ */
public void readAllPartitionsOfSegment(String segmentPath) {
- CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
- if (partitionFiles != null && partitionFiles.length > 0) {
+ String partitionFilePath = getPartitionFilePath(segmentPath);
+ if (partitionFilePath != null) {
partionedSegment = true;
- int i = 0;
- // Get the latest partition map file based on the timestamp of that file.
- long [] partitionTimestamps = new long[partitionFiles.length];
- for (CarbonFile file : partitionFiles) {
- partitionTimestamps[i++] =
- Long.parseLong(file.getName().substring(
- 0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
- }
- Arrays.sort(partitionTimestamps);
- PartitionMapper partitionMapper = readPartitionMap(
- segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
- + CarbonTablePath.PARTITION_MAP_EXT);
+ PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
partitionMap.putAll(partitionMapper.getPartitionMap());
}
}
@@ -253,6 +282,96 @@ public class PartitionMapFileStore {
}
}
+ /**
+ * Clean up invalid data after drop partition in all segments of table
+ * @param table
+ * @param currentPartitions Current partitions of table
+ * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+ * to delete data.
+ * @throws IOException
+ */
+ public void cleanSegments(
+ CarbonTable table,
+ List<String> currentPartitions,
+ boolean forceDelete) throws IOException {
+ SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+
+ CarbonTablePath carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+ table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
+
+ LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
+ // scan through each segment.
+
+ for (LoadMetadataDetails segment : details) {
+
+ // if this segment is valid then only we will go for deletion of related
+ // dropped partition files. if the segment is mark for delete or compacted then any way
+ // it will get deleted.
+
+ if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
+ || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+ List<String> toBeDeletedIndexFiles = new ArrayList<>();
+ List<String> toBeDeletedDataFiles = new ArrayList<>();
+ // take the list of files from this segment.
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+ String partitionFilePath = getPartitionFilePath(segmentPath);
+ if (partitionFilePath != null) {
+ PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
+ DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+ SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+ indexFileStore.readAllIIndexOfSegment(segmentPath);
+ Set<String> indexFilesFromSegment = indexFileStore.getCarbonIndexMap().keySet();
+ for (String indexFile : indexFilesFromSegment) {
+ // Check the partition information in the partiton mapper
+ List<String> indexPartitions = partitionMapper.partitionMap.get(indexFile);
+ if (indexPartitions == null || !currentPartitions.containsAll(indexPartitions)) {
+ Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+ .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+ indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+ if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+ toBeDeletedIndexFiles.add(indexFile);
+ // Add the corresponding carbondata files to the delete list.
+ byte[] fileData = indexFileStore.getFileData(indexFile);
+ List<DataFileFooter> indexInfo =
+ fileFooterConverter.getIndexInfo(segmentPath + "/" + indexFile, fileData);
+ for (DataFileFooter footer : indexInfo) {
+ toBeDeletedDataFiles.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+ }
+ }
+ }
+ }
+
+ if (toBeDeletedIndexFiles.size() > 0) {
+ indexFilesFromSegment.removeAll(toBeDeletedIndexFiles);
+ new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath,
+ new ArrayList<String>(indexFilesFromSegment));
+ for (String dataFile : toBeDeletedDataFiles) {
+ FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+ }
+ }
+ CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
+ CarbonFile currentPartitionFile = FileFactory.getCarbonFile(partitionFilePath);
+ if (partitionFiles != null) {
+ // Delete all old partition files
+ for (CarbonFile partitionFile : partitionFiles) {
+ if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) {
+ partitionFile.delete();
+ }
+ }
+ }
+ partitionMapper = readPartitionMap(partitionFilePath);
+ if (partitionMapper != null) {
+ // delete partition map if there is no partition files exist
+ if (partitionMapper.partitionMap.size() == 0) {
+ currentPartitionFile.delete();
+ }
+ }
+ }
+ }
+ }
+ }
+
public List<String> getPartitions(String indexFileName) {
return partitionMap.get(indexFileName);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 85f08cc..067d024 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -39,36 +39,53 @@ public class CarbonIndexFileMergeWriter {
/**
* Merge all the carbonindex files of segment to a merged file
* @param segmentPath
+ * @param indexFileNamesTobeAdded while merging it comsiders only these files.
+ * If null then consider all
* @throws IOException
*/
- public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
+ public void mergeCarbonIndexFilesOfSegment(
+ String segmentPath,
+ List<String> indexFileNamesTobeAdded) throws IOException {
CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
- if (isCarbonIndexFilePresent(indexFiles)) {
+ if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
fileStore.readAllIIndexOfSegment(segmentPath);
- openThriftWriter(
- segmentPath + "/" +
- System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
List<String> fileNames = new ArrayList<>(indexMap.size());
List<ByteBuffer> data = new ArrayList<>(indexMap.size());
for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
- fileNames.add(entry.getKey());
- data.add(ByteBuffer.wrap(entry.getValue()));
+ if (indexFileNamesTobeAdded == null ||
+ indexFileNamesTobeAdded.contains(entry.getKey())) {
+ fileNames.add(entry.getKey());
+ data.add(ByteBuffer.wrap(entry.getValue()));
+ }
+ }
+ if (fileNames.size() > 0) {
+ openThriftWriter(
+ segmentPath + "/" + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
+ indexHeader.setFile_names(fileNames);
+ mergedBlockIndex.setFileData(data);
+ writeMergedBlockIndexHeader(indexHeader);
+ writeMergedBlockIndex(mergedBlockIndex);
+ close();
}
- indexHeader.setFile_names(fileNames);
- mergedBlockIndex.setFileData(data);
- writeMergedBlockIndexHeader(indexHeader);
- writeMergedBlockIndex(mergedBlockIndex);
- close();
for (CarbonFile indexFile : indexFiles) {
indexFile.delete();
}
}
}
+ /**
+ * Merge all the carbonindex files of segment to a merged file
+ * @param segmentPath
+ * @throws IOException
+ */
+ public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
+ mergeCarbonIndexFilesOfSegment(segmentPath, null);
+ }
+
private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
for (CarbonFile file : indexFiles) {
if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
new file mode 100644
index 0000000..2b0dd09
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ dropTable
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+ sql(
+ """
+ | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ }
+
+ def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int, partitionMapFiles: Int): Unit = {
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+ carbonTable.getTablePath)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+ val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+ val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ override def accept(file: CarbonFile): Boolean = {
+ return file.getName.endsWith(".carbondata")
+ }
+ })
+ assert(dataFiles.length == partitions)
+ val partitionFile = carbonFile.listFiles(new CarbonFileFilter() {
+ override def accept(file: CarbonFile): Boolean = {
+ return file.getName.endsWith(".partitionmap")
+ }
+ })
+ assert(partitionFile.length == partitionMapFiles)
+ }
+
+ test("clean up partition table for int partition column") {
+ sql(
+ """
+ | CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(
+ sql(s"""select count (*) from partitionone"""),
+ sql(s"""select count (*) from originTable"""))
+
+ checkAnswer(
+ sql(s"""select count (*) from partitionone where empno=11"""),
+ sql(s"""select count (*) from originTable where empno=11"""))
+
+ sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""")
+ validateDataFiles("default_partitionone", "0", 10, 2)
+ sql(s"CLEAN FILES FOR TABLE partitionone").show()
+
+ checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11")
+ validateDataFiles("default_partitionone", "0", 9, 1)
+ checkAnswer(
+ sql(s"""select count (*) from partitionone where empno=11"""),
+ Seq(Row(0)))
+
+ }
+
+ test("clean up partition on table for more partition columns") {
+ sql(
+ """
+ | CREATE TABLE partitionmany (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
+ | projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""")
+ validateDataFiles("default_partitionmany", "0", 10, 2)
+ validateDataFiles("default_partitionmany", "1", 10, 2)
+ sql(s"CLEAN FILES FOR TABLE partitionmany").show()
+ validateDataFiles("default_partitionmany", "0", 8, 1)
+ validateDataFiles("default_partitionmany", "1", 8, 1)
+ checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479")
+ checkAnswer(
+ sql(s"""select count (*) from partitionmany where deptname='Learning'"""),
+ Seq(Row(0)))
+ }
+
+ test("clean up after dropping all partition on table") {
+ sql(
+ """
+ | CREATE TABLE partitionall (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
+ | projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='Learning')""")
+ sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='configManagement')""")
+ sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='network')""")
+ sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""")
+ sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""")
+ assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0)
+ validateDataFiles("default_partitionall", "0", 10, 6)
+ sql(s"CLEAN FILES FOR TABLE partitionall").show()
+ validateDataFiles("default_partitionall", "0", 0, 0)
+ checkAnswer(
+ sql(s"""select count (*) from partitionall"""),
+ Seq(Row(0)))
+ }
+
+ override def afterAll = {
+ dropTable
+ }
+
+ def dropTable = {
+ sql("drop table if exists originTable")
+ sql("drop table if exists originMultiLoads")
+ sql("drop table if exists partitionone")
+ sql("drop table if exists partitionall")
+ sql("drop table if exists partitionmany")
+ sql("drop table if exists partitionshow")
+ sql("drop table if exists staticpartition")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 2a25255..9a9940b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -156,7 +156,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
}
override def afterAll = {
-// dropTable
+ dropTable
}
def dropTable = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 2b127e4..d514f77 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMapFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -103,7 +103,8 @@ object CarbonStore {
tableName: String,
storePath: String,
carbonTable: CarbonTable,
- forceTableClean: Boolean): Unit = {
+ forceTableClean: Boolean,
+ currentTablePartitions: Option[Seq[String]] = None): Unit = {
LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
var carbonCleanFilesLock: ICarbonLock = null
var absoluteTableIdentifier: AbsoluteTableIdentifier = null
@@ -128,6 +129,11 @@ object CarbonStore {
DataLoadingUtil.deleteLoadsAndUpdateMetadata(
isForceDeletion = true, carbonTable)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+ currentTablePartitions match {
+ case Some(partitions) =>
+ new PartitionMapFileStore().cleanSegments(carbonTable, partitions.asJava, true)
+ case _ =>
+ }
}
} finally {
if (carbonCleanFilesLock != null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index f78412b..4c405a4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -157,16 +157,25 @@ object CarbonScalaUtil {
def convertToUTF8String(value: String,
dataType: DataType,
timeStampFormat: SimpleDateFormat,
- dateFormat: SimpleDateFormat): UTF8String = {
- dataType match {
- case TimestampType =>
- UTF8String.fromString(
- DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000))
- case DateType =>
- UTF8String.fromString(
- DateTimeUtils.dateToString(
- (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
- case _ => UTF8String.fromString(value)
+ dateFormat: SimpleDateFormat,
+ serializationNullFormat: String): UTF8String = {
+ if (value == null || serializationNullFormat.equals(value)) {
+ return UTF8String.fromString(value)
+ }
+ try {
+ dataType match {
+ case TimestampType =>
+ UTF8String.fromString(
+ DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000))
+ case DateType =>
+ UTF8String.fromString(
+ DateTimeUtils.dateToString(
+ (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
+ case _ => UTF8String.fromString(value)
+ }
+ } catch {
+ case e: Exception =>
+ UTF8String.fromString(value)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index e0530f6..342acd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -18,7 +18,10 @@
package org.apache.spark.sql.execution.command.management
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.api.CarbonStore
import org.apache.carbondata.core.util.CarbonProperties
@@ -76,13 +79,21 @@ case class CarbonCleanFilesCommand(
private def cleanGarbageData(sparkSession: SparkSession,
databaseNameOp: Option[String], tableName: String): Unit = {
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
+ val partitions: Option[Seq[String]] = if (carbonTable.isHivePartitionTable) {
+ Some(CarbonFilters.getPartitions(
+ Seq.empty[Expression],
+ sparkSession,
+ TableIdentifier(tableName, databaseNameOp)))
+ } else {
+ None
+ }
CarbonStore.cleanFiles(
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
tableName,
CarbonProperties.getStorePath,
carbonTable,
- forceTableClean)
+ forceTableClean,
+ partitions)
}
private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f96c0a7..7285d9d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -34,14 +34,15 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -52,6 +53,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -453,16 +455,28 @@ case class CarbonLoadDataCommand(
hadoopConf: Configuration,
dataFrame: Option[DataFrame]) = {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
val logicalPlan =
sparkSession.sessionState.catalog.lookupRelation(
- TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
+ identifier)
val relation = logicalPlan.collect {
case l: LogicalRelation => l
- case c: CatalogRelation => c
+ case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
+ if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+ c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+ c.getClass.getName.equals(
+ "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => c
}.head
-
+ // Clean up the old invalid segment data.
+ DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
+ val currentPartitions =
+ CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
+ // Clean up the alreday dropped partitioned data
+ new PartitionMapFileStore().cleanSegments(table, currentPartitions.asJava, false)
// Converts the data to carbon understandable format. The timestamp/date format data needs to
// converted to hive standard fomat to let spark understand the data to partition.
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
val query: LogicalPlan = if (dataFrame.isDefined) {
var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
val timeStampFormat = new SimpleDateFormat(timeStampformatString)
@@ -545,13 +559,15 @@ case class CarbonLoadDataCommand(
jobConf).map{ case (key, value) =>
val data = new Array[Any](len)
var i = 0
- while (i < len) {
+ val input = value.get()
+ while (i < input.length) {
// TODO find a way to avoid double conversion of date and time.
data(i) = CarbonScalaUtil.convertToUTF8String(
- value.get()(i),
+ input(i),
rowDataTypes(i),
timeStampFormat,
- dateFormat)
+ dateFormat,
+ serializationNullFormat)
i = i + 1
}
InternalRow.fromSeq(data)
@@ -576,7 +592,7 @@ case class CarbonLoadDataCommand(
isOverwriteTable,
carbonLoadModel,
sparkSession)
- case c: CatalogRelation =>
+ case others =>
val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable(
"tableMeta",
relation).asInstanceOf[CatalogTable]
@@ -587,7 +603,7 @@ case class CarbonLoadDataCommand(
val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
convertToLogicalRelation(
catalogTable,
- c.output,
+ others.output,
sizeInBytes,
isOverwriteTable,
carbonLoadModel,
@@ -642,6 +658,7 @@ case class CarbonLoadDataCommand(
options += (("onepass", loadModel.getUseOnePass.toString))
options += (("dicthost", loadModel.getDictionaryServerHost))
options += (("dictport", loadModel.getDictionaryServerPort.toString))
+ options ++= this.options
if (updateModel.isDefined) {
options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index a95693c..b9df7a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -189,7 +189,14 @@ private class CarbonOutputWriter(path: String,
extends OutputWriter with AbstractCarbonOutputWriter {
val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
val partitionData = if (partitions.nonEmpty) {
- partitions.map(_.split("=")(1))
+ partitions.map{ p =>
+ val splitData = p.split("=")
+ if (splitData.length > 1) {
+ splitData(1)
+ } else {
+ ""
+ }
+ }
} else {
Array.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 3031b8e..8a295d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -564,7 +564,11 @@ public class CarbonLoadModel implements Serializable {
* @return
*/
public LoadMetadataDetails getCurrentLoadMetadataDetail() {
- return loadMetadataDetails.get(loadMetadataDetails.size() - 1);
+ if (loadMetadataDetails != null && loadMetadataDetails.size() > 0) {
+ return loadMetadataDetails.get(loadMetadataDetails.size() - 1);
+ } else {
+ return null;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index fef6930..49ca254 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -393,6 +393,9 @@ public final class CarbonLoaderUtil {
SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE;
// always the last entry in the load metadata details will be the current load entry
LoadMetadataDetails loadMetaEntry = model.getCurrentLoadMetadataDetail();
+ if (loadMetaEntry == null) {
+ return;
+ }
CarbonLoaderUtil
.populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
boolean entryAdded =