You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/03/02 15:50:53 UTC
carbondata git commit: [CARBONDATA-2209] Fixed rename table with
partitions not working issue and batch_sort and no_sort with partition table
issue
Repository: carbondata
Updated Branches:
refs/heads/master 7bfe4afe4 -> 74f5d67c0
[CARBONDATA-2209] Fixed rename table with partitions not working issue and batch_sort and no_sort with partition table issue
This closes #2006
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74f5d67c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74f5d67c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74f5d67c
Branch: refs/heads/master
Commit: 74f5d67c0f79ba2a45ed97339179333a7cd37279
Parents: 7bfe4af
Author: ravipesala <ra...@gmail.com>
Authored: Tue Feb 27 16:38:09 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 2 21:19:57 2018 +0530
----------------------------------------------------------------------
.../apache/carbondata/core/datamap/Segment.java | 21 ++
.../blockletindex/SegmentIndexFileStore.java | 13 +-
.../core/metadata/SegmentFileStore.java | 33 +++-
.../core/util/path/CarbonTablePath.java | 7 +
.../core/writer/CarbonIndexFileMergeWriter.java | 191 ++++++++++++++-----
.../CarbonIndexFileMergeTestCase.scala | 15 +-
.../StandardPartitionGlobalSortTestCase.scala | 86 +++++++++
.../StandardPartitionTableQueryTestCase.scala | 21 ++
.../schema/CarbonAlterTableRenameCommand.scala | 58 +++++-
.../sql/execution/strategy/DDLStrategy.scala | 8 +
.../spark/sql/hive/CarbonSessionState.scala | 12 +-
.../spark/sql/hive/CarbonSessionState.scala | 10 +
.../loading/DataLoadProcessBuilder.java | 8 +-
.../processing/util/CarbonLoaderUtil.java | 87 +++++++++
14 files changed, 509 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index c47f16c..a2a2a41 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -21,6 +21,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
/**
* Represents one load of carbondata
*/
@@ -76,6 +80,23 @@ public class Segment implements Serializable {
return new Segment(segmentId, null);
}
+ /**
+ * Read the table status and get the segment corresponding to segmentNo
+ * @param segmentNo
+ * @param tablePath
+ * @return
+ */
+ public static Segment getSegment(String segmentNo, String tablePath) {
+ LoadMetadataDetails[] loadMetadataDetails =
+ SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+ for (LoadMetadataDetails details: loadMetadataDetails) {
+ if (details.getLoadName().equals(segmentNo)) {
+ return new Segment(details.getLoadName(), details.getSegmentFile());
+ }
+ }
+ return null;
+ }
+
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 4883d94..9364a7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -96,6 +96,7 @@ public class SegmentIndexFileStore {
public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath,
SegmentStatus status, boolean ignoreStatus) throws IOException {
List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+ Set<String> indexFiles = new HashSet<>();
if (segmentFile == null) {
return;
}
@@ -107,11 +108,21 @@ public class SegmentIndexFileStore {
if (locations.getValue().isRelative()) {
location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
}
+ String mergeFileName = locations.getValue().getMergeFileName();
+ if (mergeFileName != null) {
+ CarbonFile mergeFile = FileFactory
+ .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName);
+ if (mergeFile.exists() && !indexFiles.contains(mergeFile.getAbsolutePath())) {
+ carbonIndexFiles.add(mergeFile);
+ indexFiles.add(mergeFile.getAbsolutePath());
+ }
+ }
for (String indexFile : locations.getValue().getFiles()) {
CarbonFile carbonFile = FileFactory
.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile);
- if (carbonFile.exists()) {
+ if (carbonFile.exists() && !indexFiles.contains(carbonFile.getAbsolutePath())) {
carbonIndexFiles.add(carbonFile);
+ indexFiles.add(carbonFile.getAbsolutePath());
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index f2548b5..2d31b4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -195,7 +195,7 @@ public class SegmentFileStore {
* @param partitionSpecs
*/
public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
- List<PartitionSpec> partitionSpecs) {
+ List<PartitionSpec> partitionSpecs) throws IOException {
SegmentFile segmentFile = null;
for (PartitionSpec spec : partitionSpecs) {
String location = spec.getLocation().toString();
@@ -220,6 +220,9 @@ public class SegmentFileStore {
folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
for (CarbonFile file : listFiles) {
if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+ List<String> indexFiles =
+ new SegmentIndexFileStore().getIndexFilesFromMergeFile(file.getAbsolutePath());
+ folderDetails.getFiles().addAll(indexFiles);
folderDetails.setMergeFileName(file.getName());
} else {
folderDetails.getFiles().add(file.getName());
@@ -302,6 +305,10 @@ public class SegmentFileStore {
readIndexFiles(SegmentStatus.SUCCESS, false);
}
+ public SegmentFile getSegmentFile() {
+ return segmentFile;
+ }
+
/**
* Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
* reads all index files
@@ -377,6 +384,30 @@ public class SegmentFileStore {
}
/**
+ * Gets all carbon index files from this segment
+ * @return
+ */
+ public List<CarbonFile> getIndexCarbonFiles() {
+ Map<String, String> indexFiles = getIndexFiles();
+ Set<String> files = new HashSet<>();
+ for (Map.Entry<String, String> entry: indexFiles.entrySet()) {
+ Path path = new Path(entry.getKey());
+ files.add(entry.getKey());
+ if (entry.getValue() != null) {
+ files.add(new Path(path.getParent(), entry.getValue()).toString());
+ }
+ }
+ List<CarbonFile> carbonFiles = new ArrayList<>();
+ for (String indexFile : files) {
+ CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
+ if (carbonFile.exists()) {
+ carbonFiles.add(carbonFile);
+ }
+ }
+ return carbonFiles;
+ }
+
+ /**
* Drops the partition related files from the segment file of the segment and writes
* to a new file. First iterator over segment file and check the path it needs to be dropped.
* And update the status with delete if it found.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index fa742d8..b5fe5ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -170,6 +170,13 @@ public class CarbonTablePath extends Path {
}
/**
+ * Return metadata path based on `tablePath`
+ */
+ public static String getTableStatusPath(String tablePath) {
+ return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE;
+ }
+
+ /**
* @param columnId unique column identifier
* @return absolute path of dictionary meta file
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 01f96ba..bc150e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -17,18 +17,26 @@
package org.apache.carbondata.core.writer;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.MergedBlockIndex;
import org.apache.carbondata.format.MergedBlockIndexHeader;
+import org.apache.hadoop.fs.Path;
+
public class CarbonIndexFileMergeWriter {
/**
@@ -38,7 +46,7 @@ public class CarbonIndexFileMergeWriter {
/**
* Merge all the carbonindex files of segment to a merged file
- * @param segmentPath
+ * @param tablePath
* @param indexFileNamesTobeAdded while merging it comsiders only these files.
* If null then consider all
* @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
@@ -46,77 +54,152 @@ public class CarbonIndexFileMergeWriter {
* which do not store the blocklet info to current version
* @throws IOException
*/
- private void mergeCarbonIndexFilesOfSegment(String segmentPath,
- List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile)
- throws IOException {
- CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+ private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ String tablePath, List<String> indexFileNamesTobeAdded,
+ boolean readFileFooterFromCarbonDataFile) throws IOException {
+ Segment segment = Segment.getSegment(segmentId, tablePath);
+ String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+ CarbonFile[] indexFiles;
+ SegmentFileStore sfs = null;
+ if (segment != null && segment.getSegmentFileName() != null) {
+ sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+ List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
+ indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+ } else {
+ indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+ }
if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
- SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
- if (readFileFooterFromCarbonDataFile) {
- // this case will be used in case of upgrade where old store will not have the blocklet
- // info in the index file and therefore blocklet info need to be read from the file footer
- // in the carbondata file
- fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+ if (sfs == null) {
+ return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile,
+ segmentPath, indexFiles);
} else {
- fileStore.readAllIIndexOfSegment(segmentPath);
+ return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles);
}
- Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
- MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
- MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
- List<String> fileNames = new ArrayList<>(indexMap.size());
- List<ByteBuffer> data = new ArrayList<>(indexMap.size());
- for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
- if (indexFileNamesTobeAdded == null ||
- indexFileNamesTobeAdded.contains(entry.getKey())) {
- fileNames.add(entry.getKey());
- data.add(ByteBuffer.wrap(entry.getValue()));
- }
+ }
+ return null;
+ }
+
+
+ private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded,
+ boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles)
+ throws IOException {
+ SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+ if (readFileFooterFromCarbonDataFile) {
+ // this case will be used in case of upgrade where old store will not have the blocklet
+ // info in the index file and therefore blocklet info need to be read from the file footer
+ // in the carbondata file
+ fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+ } else {
+ fileStore.readAllIIndexOfSegment(segmentPath);
+ }
+ Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
+ writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap);
+ for (CarbonFile indexFile : indexFiles) {
+ indexFile.delete();
+ }
+ return null;
+ }
+
+ private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> indexFileNamesTobeAdded,
+ SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
+ SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+ fileStore
+ .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), SegmentStatus.SUCCESS,
+ true);
+ Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
+ Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
+ for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
+ Path path = new Path(entry.getKey());
+ Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString());
+ if (map == null) {
+ map = new HashMap<>();
+ indexLocationMap.put(path.getParent().toString(), map);
}
- if (fileNames.size() > 0) {
- openThriftWriter(
- segmentPath + "/" + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
- indexHeader.setFile_names(fileNames);
- mergedBlockIndex.setFileData(data);
- writeMergedBlockIndexHeader(indexHeader);
- writeMergedBlockIndex(mergedBlockIndex);
- close();
+ map.put(path.getName(), entry.getValue());
+ }
+ for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
+ String mergeIndexFile =
+ writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue());
+ for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : sfs.getLocationMap()
+ .entrySet()) {
+ String location = segentry.getKey();
+ if (segentry.getValue().isRelative()) {
+ location = sfs.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+ }
+ if (new Path(entry.getKey()).equals(new Path(location))) {
+ segentry.getValue().setMergeFileName(mergeIndexFile);
+ break;
+ }
}
- for (CarbonFile indexFile : indexFiles) {
- indexFile.delete();
+ }
+
+ List<String> filesTobeDeleted = new ArrayList<>();
+ for (CarbonFile file : indexFiles) {
+ filesTobeDeleted.add(file.getAbsolutePath());
+ }
+ return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), filesTobeDeleted);
+ }
+
+ private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath,
+ Map<String, byte[]> indexMap) throws IOException {
+ MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
+ MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
+ List<String> fileNames = new ArrayList<>(indexMap.size());
+ List<ByteBuffer> data = new ArrayList<>(indexMap.size());
+ for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
+ if (indexFileNamesTobeAdded == null ||
+ indexFileNamesTobeAdded.contains(entry.getKey())) {
+ fileNames.add(entry.getKey());
+ data.add(ByteBuffer.wrap(entry.getValue()));
}
}
+ if (fileNames.size() > 0) {
+ String mergeIndexName = System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT;
+ openThriftWriter(segmentPath + "/" + mergeIndexName);
+ indexHeader.setFile_names(fileNames);
+ mergedBlockIndex.setFileData(data);
+ writeMergedBlockIndexHeader(indexHeader);
+ writeMergedBlockIndex(mergedBlockIndex);
+ close();
+ return mergeIndexName;
+ }
+ return null;
}
/**
* Merge all the carbonindex files of segment to a merged file
*
- * @param segmentPath
+ * @param segmentId
* @param indexFileNamesTobeAdded
* @throws IOException
*/
- public void mergeCarbonIndexFilesOfSegment(String segmentPath,
- List<String> indexFileNamesTobeAdded) throws IOException {
- mergeCarbonIndexFilesOfSegment(segmentPath, indexFileNamesTobeAdded, false);
+ public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ String tablePath, List<String> indexFileNamesTobeAdded) throws IOException {
+ return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false);
}
/**
* Merge all the carbonindex files of segment to a merged file
- * @param segmentPath
+ *
+ * @param segmentId
* @throws IOException
*/
- public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
- mergeCarbonIndexFilesOfSegment(segmentPath, null, false);
+ public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ String tablePath) throws IOException {
+ return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false);
}
/**
* Merge all the carbonindex files of segment to a merged file
- * @param segmentPath
+ *
+ * @param segmentId
* @param readFileFooterFromCarbonDataFile
* @throws IOException
*/
- public void mergeCarbonIndexFilesOfSegment(String segmentPath,
- boolean readFileFooterFromCarbonDataFile) throws IOException {
- mergeCarbonIndexFilesOfSegment(segmentPath, null, readFileFooterFromCarbonDataFile);
+ public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+ String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException {
+ return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
+ readFileFooterFromCarbonDataFile);
}
private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
@@ -166,4 +249,24 @@ public class CarbonIndexFileMergeWriter {
thriftWriter.close();
}
+ public static class SegmentIndexFIleMergeStatus implements Serializable {
+
+ private SegmentFileStore.SegmentFile segmentFile;
+
+ private List<String> filesTobeDeleted;
+
+ public SegmentIndexFIleMergeStatus(SegmentFileStore.SegmentFile segmentFile,
+ List<String> filesTobeDeleted) {
+ this.segmentFile = segmentFile;
+ this.filesTobeDeleted = filesTobeDeleted;
+ }
+
+ public SegmentFileStore.SegmentFile getSegmentFile() {
+ return segmentFile;
+ }
+
+ public List<String> getFilesTobeDeleted() {
+ return filesTobeDeleted;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 895b0b5..7608318 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -62,9 +62,8 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
- val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+ .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
assert(getIndexFileCount("default_indexmerge", "0") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""),
sql("""Select count(*) from indexmerge"""))
@@ -88,9 +87,9 @@ class CarbonIndexFileMergeTestCase
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+ .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
+ .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -114,9 +113,9 @@ class CarbonIndexFileMergeTestCase
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+ .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
+ .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -144,7 +143,7 @@ class CarbonIndexFileMergeTestCase
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+ .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
@@ -174,7 +173,7 @@ class CarbonIndexFileMergeTestCase
val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+ .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index ff062cd..b511ee8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -927,6 +927,92 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
assert(exMessage.getMessage.contains("day is not a valid partition column in table default.partitionnocolumn"))
}
+ test("data loading with default partition in static partition table with batchsort") {
+ sql("DROP TABLE IF EXISTS partitiondefaultbatchsort")
+ sql(
+ """
+ | CREATE TABLE partitiondefaultbatchsort (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectenddate Timestamp,attendance int,
+ | utilization int, doj Timestamp, empname String)
+ | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='BATCH_SORT')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultbatchsort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql("select count(*) from partitiondefaultbatchsort"), Seq(Row(10)))
+ }
+
+ test("data loading with default partition in static partition table with nosort") {
+ sql("DROP TABLE IF EXISTS partitiondefaultnosort")
+ sql(
+ """
+ | CREATE TABLE partitiondefaultnosort (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectenddate Timestamp,attendance int,
+ | utilization int, doj Timestamp, empname String)
+ | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='NO_SORT')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultnosort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql("select count(*) from partitiondefaultnosort"), Seq(Row(10)))
+ }
+
+ test("data loading with default partition in static partition table with rename") {
+ sql("DROP TABLE IF EXISTS partitiondefaultrename")
+ sql("DROP TABLE IF EXISTS partitiondefaultrename_new")
+ sql(
+ """
+ | CREATE TABLE partitiondefaultrename (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectenddate Timestamp,attendance int,
+ | utilization int, doj Timestamp, empname String)
+ | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql("select count(*) from partitiondefaultrename"), Seq(Row(10)))
+ sql(s"alter table partitiondefaultrename rename to partitiondefaultrename_new")
+ checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(10)))
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(20)))
+ }
+
+ test("data loading with default partition in static partition table with rename first") {
+ sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst")
+ sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst_new")
+ sql(
+ """
+ | CREATE TABLE partitiondefaultrenamefirst (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectenddate Timestamp,attendance int,
+ | utilization int, doj Timestamp, empname String)
+ | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"alter table partitiondefaultrenamefirst rename to partitiondefaultrenamefirst_new")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrenamefirst_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ checkAnswer(sql("select count(*) from partitiondefaultrenamefirst_new"), Seq(Row(10)))
+ }
+
+ test("data loading for global partition table for two partition column with no columns in csv") {
+ sql("DROP TABLE IF EXISTS partitiontwonocolumns")
+ sql(
+ """
+ | CREATE TABLE partitiontwonocolumns (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int,doj Timestamp, empname String)
+ | PARTITIONED BY (newcol1 date, newcol2 int)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwonocolumns partition(newcol1='2016-08-09', newcol2='20') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwonocolumns order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ checkAnswer(sql("select distinct cast(newcol1 as string) from partitiontwonocolumns"), Seq(Row("2016-08-09")))
+ }
+
override def afterAll = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 58eb9f9..163e662 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -300,6 +300,27 @@ test("Creation of partition table should fail if the colname in table schema and
FileFactory.deleteAllCarbonFilesOfDir(file)
}
+ test("set partition location with static column partition with load command") {
+ sql("drop table if exists staticpartitionsetloc")
+ sql(
+ """
+ | CREATE TABLE staticpartitionsetloc (empno int, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
+ | projectjoindate Timestamp,attendance int,
+ | deptname String,projectcode int,
+ | utilization int,salary int,projectenddate Date,doj Timestamp)
+ | PARTITIONED BY (empname String)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val location = metastoredb +"/" +"ravi1"
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionsetloc partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ intercept[Exception] {
+ sql(s"""alter table staticpartitionsetloc partition (empname='ravi') set location '$location'""")
+ }
+ val file = FileFactory.getCarbonFile(location)
+ FileFactory.deleteAllCarbonFilesOfDir(file)
+ }
+
test("add external partition with static column partition with load command with diffrent schema") {
sql(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c8f64e1..40b5cfc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,8 +17,10 @@
package org.apache.spark.sql.execution.command.schema
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.util.AlterTableUtil
@@ -119,6 +121,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
.getClient()
+ var partitions: Seq[CatalogTablePartition] = Seq.empty
+ if (carbonTable.isHivePartitionTable) {
+ partitions =
+ sparkSession.sessionState.catalog.listPartitions(
+ TableIdentifier(oldTableName, Some(oldDatabaseName)))
+ }
sparkSession.catalog.refreshTable(TableIdentifier(oldTableName,
Some(oldDatabaseName)).quotedString)
hiveClient.runSqlHive(
@@ -127,6 +135,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
s"('tableName'='$newTableName', " +
s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+
// changed the rename order to deal with situation when carbon table and hive table
// will point to the same tablePath
if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -138,6 +147,27 @@ private[sql] case class CarbonAlterTableRenameCommand(
sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
}
}
+ val updatedParts = updatePartitionLocations(
+ partitions,
+ oldTablePath.getPath,
+ newTablePath,
+ sparkSession)
+
+ val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
+ val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
+ // Update the storage location with new path
+ sparkSession.sessionState.catalog.alterTable(
+ catalogTable.copy(storage = sparkSession.sessionState.catalog.
+ asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
+ new Path(newTablePath),
+ catalogTable.storage)))
+ if (updatedParts.nonEmpty) {
+ // Update the new updated partitions specs with new location.
+ sparkSession.sessionState.catalog.alterPartitions(
+ newIdentifier,
+ updatedParts)
+ }
+
newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
carbonTable.getCarbonTableIdentifier,
tableInfo,
@@ -151,8 +181,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
- sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
- Some(oldDatabaseName)).quotedString)
+ sparkSession.catalog.refreshTable(newIdentifier.quotedString)
carbonTableLockFilePath = newTablePath
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
@@ -187,6 +216,31 @@ private[sql] case class CarbonAlterTableRenameCommand(
Seq.empty
}
+ /**
+ * Update partitions with new table location
+ *
+ */
+ private def updatePartitionLocations(
+ partitions: Seq[CatalogTablePartition],
+ oldTablePath: String,
+ newTablePath: String,
+ sparkSession: SparkSession): Seq[CatalogTablePartition] = {
+ partitions.map{ part =>
+ if (part.storage.locationUri.isDefined) {
+ val path = new Path(part.location)
+ if (path.toString.contains(oldTablePath)) {
+ val newPath = new Path(path.toString.replace(oldTablePath, newTablePath))
+ part.copy(storage = sparkSession.sessionState.catalog.
+ asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, part.storage))
+ } else {
+ part
+ }
+ } else {
+ part
+ }
+ }
+ }
+
private def renameBadRecords(
oldTableName: String,
newTableName: String,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f69ccc1..dcbce84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -265,6 +265,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
RefreshCarbonTableCommand(tableIdentifier.database,
tableIdentifier.table).run(sparkSession)
ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil
+ case alterSetLoc@AlterTableSetLocationCommand(tableName, _, _) =>
+ val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .tableExists(tableName)(sparkSession)
+ if (isCarbonTable) {
+ throw new UnsupportedOperationException("Set partition location is not supported")
+ } else {
+ ExecutedCommandExec(alterSetLoc) :: Nil
+ }
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 1b7f0cb..ba2fe947 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -182,6 +183,15 @@ class CarbonSessionCatalog(
allPartitions
}
}
+
+ /**
+ * Update the storageformat with new location information
+ */
+ def updateStorageLocation(
+ path: Path,
+ storage: CatalogStorageFormat): CatalogStorageFormat = {
+ storage.copy(locationUri = Some(path.toString))
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index a119bda..e82b485 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import scala.collection.generic.SeqFactory
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
@@ -171,6 +172,15 @@ class CarbonSessionCatalog(
partitionFilters,
sparkSession.sessionState.conf.sessionLocalTimeZone)
}
+
+ /**
+ * Update the storageformat with new location information
+ */
+ def updateStorageLocation(
+ path: Path,
+ storage: CatalogStorageFormat): CatalogStorageFormat = {
+ storage.copy(locationUri = Some(path.toUri))
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f5b29e7..82f4c9b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -57,15 +57,15 @@ public final class DataLoadProcessBuilder {
CarbonIterator[] inputIterators) throws Exception {
CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
- if ((!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT))
- && !loadModel.isPartitionLoad()) {
+ if (loadModel.isPartitionLoad()) {
+ return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
+ } else if (!configuration.isSortTable() ||
+ sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
return buildInternalForNoSort(inputIterators, configuration);
} else if (configuration.getBucketingInfo() != null) {
return buildInternalForBucketing(inputIterators, configuration);
} else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
return buildInternalForBatchSort(inputIterators, configuration);
- } else if (loadModel.isPartitionLoad()) {
- return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
} else {
return buildInternal(inputIterators, configuration);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/74f5d67c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 32c72da..23f9aa8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -57,6 +58,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.merger.NodeBlockRelation;
import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
@@ -328,6 +330,60 @@ public final class CarbonLoaderUtil {
return status;
}
+ /**
+ * This API will update the segmentFile of a passed segment.
+ *
+ * @return boolean which determines whether status update is done or not.
+ * @throws IOException
+ */
+ public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
+ throws IOException {
+ boolean status = false;
+ String tableStatusPath = CarbonTablePath.getTableStatusPath(tablePath);
+ String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ AbsoluteTableIdentifier.from(tablePath, null, null);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+ int retryCount = CarbonLockUtil
+ .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+ CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+ int maxTimeout = CarbonLockUtil
+ .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+ CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+ try {
+ if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+ LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
+ LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+ SegmentStatusManager.readLoadMetadata(metadataPath);
+
+ for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+ // if the segments is in the list of marked for delete then update the status.
+ if (segmentId.equals(detail.getLoadName())) {
+ detail.setSegmentFile(segmentFile);
+ break;
+ }
+ }
+
+ SegmentStatusManager
+ .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+ status = true;
+ } else {
+ LOGGER.error(
+ "Not able to acquire the lock for Table status updation for table path " + tablePath);
+ }
+ ;
+ } finally {
+ if (carbonLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
+ } else {
+ LOGGER.error(
+ "Unable to unlock Table lock for table" + tablePath + " during table status updation");
+ }
+ }
+ return status;
+ }
+
private static void addToStaleFolders(CarbonTablePath carbonTablePath,
List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
@@ -950,4 +1006,35 @@ public final class CarbonLoaderUtil {
loadMetadataDetails.setIndexSize(String.valueOf(indexSize));
return dataSize + indexSize;
}
+
+ /**
+ * Merge index files with in the segment of partitioned table
+ * @param segmentId
+ * @param tablePath
+ * @param uniqueId
+ * @return
+ * @throws IOException
+ */
+ public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath,
+ String uniqueId) throws IOException {
+ CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
+ new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
+ if (segmentIndexFIleMergeStatus != null) {
+ uniqueId = System.currentTimeMillis() + "";
+ String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;
+ String path =
+ CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+ + newSegmentFileName;
+ SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path);
+ updateSegmentFile(tablePath, segmentId, newSegmentFileName);
+ deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted());
+ }
+ return uniqueId;
+ }
+
+ private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
+ for (String filePath : filesToBeDeleted) {
+ FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath));
+ }
+ }
}