You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/08 18:05:23 UTC
[03/30] carbondata git commit: [CARBONDATA-1992] Remove partitionId
in CarbonTablePath
[CARBONDATA-1992] Remove partitionId in CarbonTablePath
In CarbonTablePath, there is a deprecated partition id which is always 0, it should be removed to avoid confusion.
This closes #1765
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/51421fda
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/51421fda
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/51421fda
Branch: refs/heads/carbonstore-rebase
Commit: 51421fdaaae53f6cfae25de4d26d4fefcea40c12
Parents: 4a2d799
Author: Jacky Li <ja...@qq.com>
Authored: Sat Jan 6 20:28:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Feb 9 01:39:54 2018 +0800
----------------------------------------------------------------------
.../core/metadata/PartitionMapFileStore.java | 2 +-
.../core/mutate/CarbonUpdateUtil.java | 8 +-
.../core/statusmanager/LoadMetadataDetails.java | 2 +
.../SegmentUpdateStatusManager.java | 8 +-
.../apache/carbondata/core/util/CarbonUtil.java | 6 +-
.../core/util/path/CarbonTablePath.java | 55 ++++---
.../CarbonFormatDirectoryStructureTest.java | 4 +-
.../hadoop/api/CarbonTableInputFormat.java | 2 +-
.../streaming/CarbonStreamRecordWriter.java | 2 +-
.../hadoop/test/util/StoreCreator.java | 1 -
.../presto/util/CarbonDataStoreCreator.scala | 1 -
.../dataload/TestLoadDataGeneral.scala | 2 +-
.../InsertIntoCarbonTableTestCase.scala | 4 +-
.../dataload/TestBatchSortDataLoad.scala | 3 +-
.../dataload/TestDataLoadWithFileName.scala | 2 +-
.../dataload/TestGlobalSortDataLoad.scala | 4 +-
.../testsuite/datamap/TestDataMapCommand.scala | 34 ++--
.../TestDataLoadingForPartitionTable.scala | 3 +-
.../StandardPartitionTableCleanTestCase.scala | 2 +-
...andardPartitionTableCompactionTestCase.scala | 2 +-
.../StandardPartitionTableLoadingTestCase.scala | 4 +-
.../load/DataLoadProcessBuilderOnSpark.scala | 1 -
.../load/DataLoadProcessorStepOnSpark.scala | 2 +-
.../spark/rdd/AlterTableLoadPartitionRDD.scala | 154 +++++++++++--------
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 +-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 25 ++-
.../org/apache/spark/util/PartitionUtils.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 5 +-
.../datasources/CarbonFileFormat.scala | 1 -
.../partition/TestAlterPartitionTable.scala | 2 +-
.../bucketing/TableBucketingTestCase.scala | 2 +
.../loading/CarbonDataLoadConfiguration.java | 10 --
.../loading/DataLoadProcessBuilder.java | 1 -
.../loading/TableProcessingOperations.java | 3 +-
.../loading/model/CarbonLoadModel.java | 72 +--------
.../sort/impl/ParallelReadMergeSorterImpl.java | 4 +-
...arallelReadMergeSorterWithBucketingImpl.java | 15 +-
.../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +-
...arallelReadMergeSorterWithBucketingImpl.java | 21 ++-
.../CarbonRowDataWriterProcessorStepImpl.java | 33 ++--
.../steps/DataWriterBatchProcessorStepImpl.java | 25 +--
.../steps/DataWriterProcessorStepImpl.java | 22 +--
.../processing/merger/CarbonDataMergerUtil.java | 6 +-
.../merger/CompactionResultSortProcessor.java | 4 +-
.../sort/sortdata/SortParameters.java | 16 +-
.../store/CarbonFactDataHandlerModel.java | 7 +-
.../util/CarbonDataProcessorUtil.java | 12 +-
.../processing/util/CarbonLoaderUtil.java | 12 +-
.../processing/util/DeleteLoadFolders.java | 7 +-
.../carbondata/processing/StoreCreator.java | 1 -
.../carbondata/streaming/StreamHandoffRDD.scala | 1 -
.../streaming/StreamSinkFactory.scala | 2 +-
.../streaming/CarbonAppendableStreamSink.scala | 8 +-
53 files changed, 285 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 1e9cbc4..43310fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -363,7 +363,7 @@ public class PartitionMapFileStore {
List<String> toBeDeletedIndexFiles = new ArrayList<>();
List<String> toBeDeletedDataFiles = new ArrayList<>();
// take the list of files from this segment.
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName());
String partitionFilePath = getPartitionFilePath(segmentPath);
if (partitionFilePath != null) {
PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index c5f61c2..e0b208f 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -297,9 +297,7 @@ public class CarbonUpdateUtil {
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
- // as of now considering only partition 0.
- String partitionId = "0";
- String partitionDir = carbonTablePath.getPartitionDir(partitionId);
+ String partitionDir = carbonTablePath.getPartitionDir();
CarbonFile file =
FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
if (!file.exists()) {
@@ -380,7 +378,7 @@ public class CarbonUpdateUtil {
}
public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
- String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentDirPath = tablePath.getCarbonDataDirectoryPath(segmentId);
// scan all the carbondata files and get the latest task ID.
CarbonFile segment =
@@ -445,7 +443,7 @@ public class CarbonUpdateUtil {
|| segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
// take the list of files from this segment.
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName());
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 85602bc..73a665d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -123,10 +123,12 @@ public class LoadMetadataDetails implements Serializable {
*/
private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
+ @Deprecated
public String getPartitionCount() {
return partitionCount;
}
+ @Deprecated
public void setPartitionCount(String partitionCount) {
this.partitionCount = partitionCount;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index e0e7b70..d4ef5c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -197,7 +197,7 @@ public class SegmentUpdateStatusManager {
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
String endTimeStamp = "";
String startTimeStamp = "";
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
for (LoadMetadataDetails eachSeg : segmentDetails) {
@@ -292,7 +292,7 @@ public class SegmentUpdateStatusManager {
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
- String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+ String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segment);
String completeBlockName = CarbonTablePath.addDataPartPrefix(
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
+ CarbonCommonConstants.FACT_FILE_EXT);
@@ -424,7 +424,7 @@ public class SegmentUpdateStatusManager {
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
@@ -867,7 +867,7 @@ public class SegmentUpdateStatusManager {
// filter out the fact files.
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
CarbonFile segDir =
FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c208154..83e7d52 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1103,7 +1103,7 @@ public final class CarbonUtil {
// geting the index file path
//TODO need to pass proper partition number when partiton will be supported
String carbonIndexFilePath = carbonTablePath
- .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+ .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(),
bucketNumber, CarbonTablePath.DataFileUtil
.getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
tableBlockInfoList.get(0).getVersion());
@@ -1346,7 +1346,7 @@ public final class CarbonUtil {
// geting the index file path
//TODO need to pass proper partition number when partiton will be supported
String carbonIndexFilePath = carbonTablePath
- .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+ .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(),
bucketNumber, CarbonTablePath.DataFileUtil
.getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
tableBlockInfoList.get(0).getVersion());
@@ -2303,7 +2303,7 @@ public final class CarbonUtil {
long carbonDataSize = 0L;
long carbonIndexSize = 0L;
HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
- String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+ String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
switch (fileType) {
case HDFS:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 5a63d2f..e107317 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -278,15 +278,14 @@ public class CarbonTablePath extends Path {
/**
* Gets absolute path of data file
*
- * @param partitionId unique partition identifier
* @param segmentId unique partition identifier
* @param filePartNo data file part number
* @param factUpdateTimeStamp unique identifier to identify an update
* @return absolute path of data file stored in carbon data format
*/
- public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
- Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
- return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
+ public String getCarbonDataFilePath(String segmentId, Integer filePartNo, Long taskNo,
+ int batchNo, int bucketNumber, String factUpdateTimeStamp) {
+ return getSegmentDir(segmentId) + File.separator + getCarbonDataFileName(
filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
}
@@ -295,13 +294,12 @@ public class CarbonTablePath extends Path {
* based on task id
*
* @param taskId task id of the file
- * @param partitionId partition number
* @param segmentId segment number
* @return full qualified carbon index path
*/
- public String getCarbonIndexFilePath(final String taskId, final String partitionId,
- final String segmentId, final String bucketNumber) {
- String segmentDir = getSegmentDir(partitionId, segmentId);
+ public String getCarbonIndexFilePath(final String taskId, final String segmentId,
+ final String bucketNumber) {
+ String segmentDir = getSegmentDir(segmentId);
CarbonFile carbonFile =
FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
@@ -317,9 +315,8 @@ public class CarbonTablePath extends Path {
if (files.length > 0) {
return files[0].getAbsolutePath();
} else {
- throw new RuntimeException("Missing Carbon index file for partition["
- + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId
- + "]");
+ throw new RuntimeException("Missing Carbon index file for Segment[" + segmentId + "], "
+ + "taskId[" + taskId + "]");
}
}
@@ -327,8 +324,6 @@ public class CarbonTablePath extends Path {
* Below method will be used to get the carbon index file path
* @param taskId
* task id
- * @param partitionId
- * partition id
* @param segmentId
* segment id
* @param bucketNumber
@@ -337,28 +332,27 @@ public class CarbonTablePath extends Path {
* timestamp
* @return carbon index file path
*/
- public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
- String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
+ public String getCarbonIndexFilePath(String taskId, String segmentId, String bucketNumber,
+ String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
switch (columnarFormatVersion) {
case V1:
case V2:
- return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
+ return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
default:
- String segmentDir = getSegmentDir(partitionId, segmentId);
+ String segmentDir = getSegmentDir(segmentId);
return segmentDir + File.separator + getCarbonIndexFileName(taskId,
Integer.parseInt(bucketNumber), timeStamp);
}
}
- public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
- int batchNo, String bucketNumber, String timeStamp,
- ColumnarFormatVersion columnarFormatVersion) {
+ public String getCarbonIndexFilePath(String taskId, String segmentId, int batchNo,
+ String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
switch (columnarFormatVersion) {
case V1:
case V2:
- return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
+ return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
default:
- String segmentDir = getSegmentDir(partitionId, segmentId);
+ String segmentDir = getSegmentDir(segmentId);
return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId),
Integer.parseInt(bucketNumber), batchNo, timeStamp);
}
@@ -375,12 +369,11 @@ public class CarbonTablePath extends Path {
/**
* Gets absolute path of data file
*
- * @param partitionId unique partition identifier
* @param segmentId unique partition identifier
* @return absolute path of data file stored in carbon data format
*/
- public String getCarbonDataDirectoryPath(String partitionId, String segmentId) {
- return getSegmentDir(partitionId, segmentId);
+ public String getCarbonDataDirectoryPath(String segmentId) {
+ return getSegmentDir(segmentId);
}
/**
@@ -418,12 +411,16 @@ public class CarbonTablePath extends Path {
return segmentDir + File.separator + getCarbonStreamIndexFileName();
}
- public String getSegmentDir(String partitionId, String segmentId) {
- return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId;
+ public String getSegmentDir(String segmentId) {
+ return getPartitionDir() + File.separator + SEGMENT_PREFIX + segmentId;
}
- public String getPartitionDir(String partitionId) {
- return getFactDir() + File.separator + PARTITION_PREFIX + partitionId;
+ // This partition is not used in any code logic, just keep backward compatibility
+ public static final String DEPRECATED_PATITION_ID = "0";
+
+ public String getPartitionDir() {
+ return getFactDir() + File.separator + PARTITION_PREFIX +
+ CarbonTablePath.DEPRECATED_PATITION_ID;
}
private String getMetaDataDir() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 5549806..a1ccab3 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest {
.equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
.equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
- assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4L, 0, 0, "999").replace("\\", "/")
- .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata"));
+ assertTrue(carbonTablePath.getCarbonDataFilePath("2", 3, 4L, 0, 0, "999").replace("\\", "/")
+ .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata"));
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a1887f0..6f1e123 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -480,7 +480,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
for (String segmentId : streamSegments) {
- String segmentDir = tablePath.getSegmentDir("0", segmentId);
+ String segmentDir = tablePath.getSegmentDir(segmentId);
FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
if (FileFactory.isFileExist(segmentDir, fileType)) {
String indexName = CarbonTablePath.getCarbonStreamIndexFileName();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 364a6a6..3ef8afc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -129,7 +129,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
CarbonTablePath tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
- segmentDir = tablePath.getSegmentDir("0", segmentId);
+ segmentDir = tablePath.getSegmentDir(segmentId);
fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index fbf33d6..ac17c4e 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -154,7 +154,6 @@ public class StoreCreator {
loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
loadModel.setTaskNo("0");
loadModel.setSegmentId("0");
- loadModel.setPartitionId("0");
loadModel.setFactTimeStamp(System.currentTimeMillis());
loadModel.setMaxColumns("10");
return loadModel;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 7b5c311..a41e738 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -135,7 +135,6 @@ object CarbonDataStoreCreator {
loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(","))
loadModel.setTaskNo("0")
loadModel.setSegmentId("0")
- loadModel.setPartitionId("0")
loadModel.setFactTimeStamp(System.currentTimeMillis())
loadModel.setMaxColumns("15")
executeGraph(loadModel, storePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 09ca9e5..c84ae6b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -49,7 +49,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
tableName: String): Boolean = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, tableName)
val partitionPath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
val segments: ArrayBuffer[String] = ArrayBuffer()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index d59f0b5..5cc4156 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -232,7 +232,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite")
val partitionPath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
val folder = new File(partitionPath)
assert(folder.isDirectory)
assert(folder.list().length == 1)
@@ -255,7 +255,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite")
val partitionPath = CarbonStorePath
- .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+ .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
val folder = new File(partitionPath)
assert(folder.isDirectory)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 4af9d54..42ac4df 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -193,9 +193,8 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.DATABASE_DEFAULT_NAME,
tableName
)
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+ val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index dae0962..db0a62c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -49,7 +49,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
val indexReader = new CarbonIndexFileReader()
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0")
+ val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0")
val carbonIndexPaths = new File(segmentDir)
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0d9e0fd..479db50 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -272,7 +272,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+ val segmentDir = carbonTablePath.getSegmentDir("0")
assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
}
@@ -379,7 +379,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
- val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+ val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 146ad62..5170c43 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -217,19 +217,27 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
}
test("test if preaggregate load is successfull for hivemetastore") {
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
- sql("DROP TABLE IF EXISTS maintable")
- sql(
- """
- | CREATE TABLE maintable(id int, name string, city string, age int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
- sql(
- s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
- .stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
- checkAnswer(sql(s"select * from maintable_preagg_sum"),
- Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+ .stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
}
test("test preaggregate load for decimal column for hivemetastore") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index ed151bd..0a21aed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -63,7 +63,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
@@ -87,6 +87,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
| projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
| utilization int,salary int)
| PARTITIONED BY (empno int)
+ |
| STORED BY 'org.apache.carbondata.format'
| TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
""".stripMargin)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index 2b0dd09..5427981 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -53,7 +53,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 22ebd80..f4b6e0e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -53,7 +53,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 669d6e7..eb091f3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -70,7 +70,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
@@ -335,7 +335,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0")
+ val segmentDir = tablePath.getCarbonDataDirectoryPath("0")
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val files = carbonFile.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 8be70a9..a5c1313 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -56,7 +56,6 @@ object DataLoadProcessBuilderOnSpark {
.map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
}
- model.setPartitionId("0")
val sc = sparkSession.sparkContext
val modelBroadcast = sc.broadcast(model)
val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 21de003..834c1a6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -194,7 +194,7 @@ object DataLoadProcessorStepOnSpark {
dataWriter = new DataWriterProcessorStepImpl(conf)
- val dataHandlerModel = dataWriter.getDataHandlerModel(0)
+ val dataHandlerModel = dataWriter.getDataHandlerModel
var dataHandler: CarbonFactHandler = null
var rowsNotExist = true
while (rows.hasNext) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 76c99f2..9de8dc9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,19 +18,21 @@
package org.apache.carbondata.spark.rdd
import scala.collection.JavaConverters._
+import scala.util.Random
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.AlterPartitionModel
import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.AlterPartitionResult
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CommonUtil, Util}
class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
result: AlterPartitionResult[K, V],
@@ -39,76 +41,96 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
identifier: AbsoluteTableIdentifier,
prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
- var storeLocation: String = null
- val carbonLoadModel = alterPartitionModel.carbonLoadModel
- val segmentId = alterPartitionModel.segmentId
- val oldPartitionIds = alterPartitionModel.oldPartitionIds
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val databaseName = carbonTable.getDatabaseName
- val factTableName = carbonTable.getTableName
- val partitionInfo = carbonTable.getPartitionInfo(factTableName)
+ var storeLocation: String = null
+ val carbonLoadModel = alterPartitionModel.carbonLoadModel
+ val segmentId = alterPartitionModel.segmentId
+ val oldPartitionIds = alterPartitionModel.oldPartitionIds
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val databaseName = carbonTable.getDatabaseName
+ val factTableName = carbonTable.getTableName
+ val partitionInfo = carbonTable.getPartitionInfo(factTableName)
- override protected def getPartitions: Array[Partition] = {
- val sc = alterPartitionModel.sqlContext.sparkContext
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
- sc.setLocalProperty("spark.job.interruptOnCancel", "true")
- firstParent[Array[AnyRef]].partitions
- }
-
- override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
- val iter = new Iterator[(K, V)] {
- val partitionId = partitionInfo.getPartitionId(split.index)
- carbonLoadModel.setTaskNo(String.valueOf(partitionId))
- carbonLoadModel.setSegmentId(segmentId)
- carbonLoadModel.setPartitionId("0")
- CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
+ override protected def getPartitions: Array[Partition] = {
+ val sc = alterPartitionModel.sqlContext.sparkContext
+ sc.setLocalProperty("spark.scheduler.pool", "DDL")
+ sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+ firstParent[Array[AnyRef]].partitions
+ }
- val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
- factTableName,
- carbonLoadModel.getTaskNo,
- "0",
- segmentId,
- false,
- true
- )
+ override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
+ val iter = new Iterator[(K, V)] {
+ val partitionId = partitionInfo.getPartitionId(split.index)
+ carbonLoadModel.setTaskNo(String.valueOf(partitionId))
+ carbonLoadModel.setSegmentId(segmentId)
+ CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
+ val tempLocationKey = CarbonDataProcessorUtil
+ .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName,
+ segmentId,
+ carbonLoadModel.getTaskNo,
+ false,
+ true)
+ // this property is used to determine whether temp location for carbon is inside
+ // container temp dir or is yarn application directory.
+ val carbonUseLocalDir = CarbonProperties.getInstance()
+ .getProperty("carbon.use.local.dir", "false")
- val loadStatus = if (rows.isEmpty) {
- LOGGER.info("After repartition this split, NO target rows to write back.")
- true
- } else {
- val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
- segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable)
- val processor = new RowResultProcessor(
- carbonTable,
- carbonLoadModel,
- segmentProperties,
- tempStoreLoc,
- bucketId)
- try {
- processor.execute(rows)
- } catch {
- case e: Exception =>
- sys.error(s"Exception when executing Row result processor ${e.getMessage}")
- } finally {
- TableProcessingOperations
- .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
- }
- }
+ if (carbonUseLocalDir.equalsIgnoreCase("true")) {
- val loadResult = segmentId
- var finished = false
+ val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+ if (null != storeLocations && storeLocations.nonEmpty) {
+ storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+ }
+ if (storeLocation == null) {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ } else {
+ storeLocation = System.getProperty("java.io.tmpdir")
+ }
+ storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
+ CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+ LOGGER.info(s"Temp storeLocation taken is $storeLocation")
- override def hasNext: Boolean = {
- !finished
- }
+ val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
- override def next(): (K, V) = {
- finished = true
- result.getKey(loadResult, loadStatus)
- }
+ val loadStatus = if (rows.isEmpty) {
+ LOGGER.info("After repartition this split, NO target rows to write back.")
+ true
+ } else {
+ val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
+ segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable)
+ val processor = new RowResultProcessor(
+ carbonTable,
+ carbonLoadModel,
+ segmentProperties,
+ tempStoreLoc,
+ bucketId)
+ try {
+ processor.execute(rows)
+ } catch {
+ case e: Exception =>
+ sys.error(s"Exception when executing Row result processor ${ e.getMessage }")
+ } finally {
+ TableProcessingOperations
+ .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
}
- iter
+ }
+
+ val loadResult = segmentId
+ var finished = false
+
+ override def hasNext: Boolean = {
+ !finished
+ }
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(loadResult, loadStatus)
+ }
}
+ iter
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0859f2e..a517bf4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -179,16 +179,9 @@ class CarbonMergerRDD[K, V](
}
}
- val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
- factTableName,
- carbonLoadModel.getTaskNo,
- "0",
- mergeNumber,
- true,
- false
- )
+ val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false)
- carbonLoadModel.setPartitionId("0")
var processor: AbstractResultProcessor = null
if (restructuredBlockExists) {
LOGGER.info("CompactionResultSortProcessor flow is selected")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 72d0484..1fa1689 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -129,7 +130,8 @@ class SparkPartitionLoader(model: CarbonLoadModel,
System.setProperty("carbon.properties.filepath",
System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
}
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(
+ CarbonTablePath.DEPRECATED_PATITION_ID)
CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
@@ -219,14 +221,13 @@ class NewCarbonDataLoadRDD[K, V](
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
- var partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails()
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
var model: CarbonLoadModel = _
val uniqueLoadStatusId =
carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
try {
- loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
@@ -264,7 +265,7 @@ class NewCarbonDataLoadRDD[K, V](
// So print the data load statistics only in case of non failure case
if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance
- .printStatisticsInfo(model.getPartitionId)
+ .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
}
}
@@ -287,8 +288,8 @@ class NewCarbonDataLoadRDD[K, V](
val fileList: java.util.List[String] = new java.util.ArrayList[String](
CarbonCommonConstants.CONSTANT_SIZE_TEN)
CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, fileList,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ model = carbonLoadModel.getCopyWithPartition(
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
StandardLogService.setThreadName(StandardLogService
.getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
, ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
@@ -351,7 +352,6 @@ class NewDataFrameLoaderRDD[K, V](
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
- val partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails()
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
val model: CarbonLoadModel = carbonLoadModel
@@ -359,9 +359,8 @@ class NewDataFrameLoaderRDD[K, V](
carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
try {
- loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
- carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
carbonLoadModel.setPreFetch(false)
@@ -406,7 +405,7 @@ class NewDataFrameLoaderRDD[K, V](
// So print the data load statistics only in case of non failure case
if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance
- .printStatisticsInfo(model.getPartitionId)
+ .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
}
}
var finished = false
@@ -542,7 +541,6 @@ class PartitionTableDataLoaderRDD[K, V](
override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val iter = new Iterator[(K, V)] {
- val partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails()
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
val model: CarbonLoadModel = carbonLoadModel
@@ -552,9 +550,8 @@ class PartitionTableDataLoaderRDD[K, V](
carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
try {
- loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
- carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
carbonLoadModel.setPreFetch(false)
val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
@@ -590,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V](
// So print the data load statistics only in case of non failure case
if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance
- .printStatisticsInfo(model.getPartitionId)
+ .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
}
}
var finished = false
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 0498b25..3c871db 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -190,8 +190,9 @@ object PartitionUtils {
val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
- val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0",
- segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version)
+ val indexFilePath = carbonTablePath.getCarbonIndexFilePath(
+ String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
+ timestamp, version)
// indexFilePath could be duplicated when multiple data file related to one index file
if (indexFilePath != null && !pathList.contains(indexFilePath)) {
pathList.add(indexFilePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5c43d58..fa83e8d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -615,7 +615,6 @@ object CarbonDataRDDFactory {
override def getPartition(key: Any): Int = {
val segId = key.asInstanceOf[String]
- // partitionId
segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
}
}
@@ -649,7 +648,6 @@ object CarbonDataRDDFactory {
val rddResult = new updateResultImpl()
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
- val partitionID = "0"
val loadMetadataDetails = new LoadMetadataDetails
val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
var uniqueLoadStatusId = ""
@@ -660,10 +658,9 @@ object CarbonDataRDDFactory {
CarbonCommonConstants.UNDERSCORE +
(index + "_0")
- loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
loadMetadataDetails.setLoadName(segId)
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
- carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setSegmentId(segId)
carbonLoadModel.setTaskNo(String.valueOf(index))
carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 17749c8..0c956e5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -105,7 +105,6 @@ with Serializable {
model,
conf
)
- model.setPartitionId("0")
model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
model.setDictionaryServerHost(options.getOrElse("dicthost", null))
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index b5325ef..aadee81 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -858,7 +858,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
carbonTable.getTablePath)
- val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+ val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
override def accept(file: CarbonFile): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 102df39..9da7244 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -173,6 +173,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
case s: ShuffleExchange => shuffleExists = true
}
assert(!shuffleExists, "shuffle should not exist on bucket tables")
+ sql("DROP TABLE bucketed_parquet_table")
}
test("test create table with bucket join of carbon table and non bucket parquet table") {
@@ -197,6 +198,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
case s: ShuffleExchange => shuffleExists = true
}
assert(shuffleExists, "shuffle should exist on non bucket tables")
+ sql("DROP TABLE parquet_table")
}
test("test scalar subquery with equal") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7b1ab9d..e291f41 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -40,8 +40,6 @@ public class CarbonDataLoadConfiguration {
private String[] header;
- private String partitionId;
-
private String segmentId;
private String taskNo;
@@ -189,14 +187,6 @@ public class CarbonDataLoadConfiguration {
this.tableIdentifier = tableIdentifier;
}
- public String getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-
public String getSegmentId() {
return segmentId;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f7eff81..cf045a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -152,7 +152,6 @@ public final class DataLoadProcessBuilder {
configuration.setTableIdentifier(identifier);
configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
configuration.setHeader(loadModel.getCsvHeaderColumns());
- configuration.setPartitionId(loadModel.getPartitionId());
configuration.setSegmentId(loadModel.getSegmentId());
configuration.setTaskNo(loadModel.getTaskNo());
configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index e2be79c..a8db6c9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -63,8 +63,7 @@ public class TableProcessingOperations {
//delete folder which metadata no exist in tablestatus
for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
- final String partitionCount = i + "";
- String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+ String partitionPath = carbonTablePath.getPartitionDir();
FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
if (FileFactory.isFileExist(partitionPath, fileType)) {
CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index d41455f..fef2da6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -39,8 +39,6 @@ public class CarbonLoadModel implements Serializable {
private String colDictFilePath;
- private String partitionId;
-
private CarbonDataLoadSchema carbonDataLoadSchema;
private boolean aggLoadRequest;
@@ -356,55 +354,6 @@ public class CarbonLoadModel implements Serializable {
}
/**
- * get copy with partition
- *
- * @param uniqueId
- * @return
- */
- public CarbonLoadModel getCopyWithPartition(String uniqueId) {
- CarbonLoadModel copy = new CarbonLoadModel();
- copy.tableName = tableName;
- copy.factFilePath = factFilePath + '/' + uniqueId;
- copy.databaseName = databaseName;
- copy.partitionId = uniqueId;
- copy.aggLoadRequest = aggLoadRequest;
- copy.loadMetadataDetails = loadMetadataDetails;
- copy.isRetentionRequest = isRetentionRequest;
- copy.complexDelimiterLevel1 = complexDelimiterLevel1;
- copy.complexDelimiterLevel2 = complexDelimiterLevel2;
- copy.carbonDataLoadSchema = carbonDataLoadSchema;
- copy.blocksID = blocksID;
- copy.taskNo = taskNo;
- copy.factTimeStamp = factTimeStamp;
- copy.segmentId = segmentId;
- copy.serializationNullFormat = serializationNullFormat;
- copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
- copy.badRecordsAction = badRecordsAction;
- copy.escapeChar = escapeChar;
- copy.quoteChar = quoteChar;
- copy.commentChar = commentChar;
- copy.timestampformat = timestampformat;
- copy.dateFormat = dateFormat;
- copy.defaultTimestampFormat = defaultTimestampFormat;
- copy.maxColumns = maxColumns;
- copy.tablePath = tablePath;
- copy.useOnePass = useOnePass;
- copy.dictionaryServerHost = dictionaryServerHost;
- copy.dictionaryServerPort = dictionaryServerPort;
- copy.dictionaryServerSecretKey = dictionaryServerSecretKey;
- copy.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
- copy.dictionaryServiceProvider = dictionaryServiceProvider;
- copy.preFetch = preFetch;
- copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
- copy.skipEmptyLine = skipEmptyLine;
- copy.sortScope = sortScope;
- copy.batchSortSizeInMb = batchSortSizeInMb;
- copy.badRecordsLocation = badRecordsLocation;
- copy.isAggLoadRequest = isAggLoadRequest;
- return copy;
- }
-
- /**
* Get copy with taskNo.
* Broadcast value is shared in process, so we need to copy it to make sure the value in each
* task independently.
@@ -416,7 +365,6 @@ public class CarbonLoadModel implements Serializable {
copy.tableName = tableName;
copy.factFilePath = factFilePath;
copy.databaseName = databaseName;
- copy.partitionId = partitionId;
copy.aggLoadRequest = aggLoadRequest;
copy.loadMetadataDetails = loadMetadataDetails;
copy.isRetentionRequest = isRetentionRequest;
@@ -460,19 +408,15 @@ public class CarbonLoadModel implements Serializable {
/**
* get CarbonLoadModel with partition
*
- * @param uniqueId
- * @param filesForPartition
* @param header
* @param delimiter
* @return
*/
- public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
- String header, String delimiter) {
+ public CarbonLoadModel getCopyWithPartition(String header, String delimiter) {
CarbonLoadModel copyObj = new CarbonLoadModel();
copyObj.tableName = tableName;
copyObj.factFilePath = null;
copyObj.databaseName = databaseName;
- copyObj.partitionId = uniqueId;
copyObj.aggLoadRequest = aggLoadRequest;
copyObj.loadMetadataDetails = loadMetadataDetails;
copyObj.isRetentionRequest = isRetentionRequest;
@@ -514,20 +458,6 @@ public class CarbonLoadModel implements Serializable {
}
/**
- * @return the partitionId
- */
- public String getPartitionId() {
- return partitionId;
- }
-
- /**
- * @param partitionId the partitionId to set
- */
- public void setPartitionId(String partitionId) {
- this.partitionId = partitionId;
- }
-
- /**
* @param tablePath The tablePath to set.
*/
public void setTablePath(String tablePath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 6432d38..fcc88b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -73,8 +73,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
String[] storeLocations =
CarbonDataProcessorUtil.getLocalDataFolderLocation(
sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
- sortParameters.getSegmentId() + "", false, false);
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+ false, false);
// Set the data file location
String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index c7030dd..b7452a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -133,10 +133,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
}
private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), bucketId,
- sortParameters.getSegmentId() + "", false, false);
+ String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+ false, false);
// Set the data file location
String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -181,10 +181,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
}
private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(),
- parameters.getTableName(), parameters.getTaskNo(),
- parameters.getPartitionID(), parameters.getSegmentId(), false, false);
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
+ parameters.getSegmentId(), false, false);
String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
parameters.setTempFileLocation(tmpLocs);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index c5579d9..ed3a55d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -219,10 +219,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
}
private void setTempLocation(SortParameters parameters) {
- String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(parameters.getDatabaseName(),
- parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
- parameters.getSegmentId(), false, false);
+ String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
+ parameters.getSegmentId(), false, false);
String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
parameters.setTempFileLocation(tempDirs);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/51421fda/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index 3c48e4d..f605b22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -119,18 +119,17 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
for (int i = 0; i < sortDataRows.length; i++) {
- batchIterator[i] =
- new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+ batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]);
}
return batchIterator;
}
- private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
- String[] storeLocation = CarbonDataProcessorUtil
- .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
- String.valueOf(sortParameters.getTaskNo()), bucketId,
- sortParameters.getSegmentId() + "", false, false);
+ private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() {
+ String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+ sortParameters.getDatabaseName(), sortParameters.getTableName(),
+ String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+ false, false);
// Set the data file location
String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -173,7 +172,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
private void setTempLocation(SortParameters parameters) {
String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
.getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
- parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
+ parameters.getTaskNo(), parameters.getSegmentId(),
false, false);
String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -224,7 +223,6 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
- private String partitionId;
private int batchSize;
@@ -232,9 +230,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
private UnsafeIntermediateMerger intermediateMerger;
- public MergedDataIterator(String partitionId, int batchSize,
+ public MergedDataIterator(int batchSize,
UnsafeIntermediateMerger intermediateMerger) {
- this.partitionId = partitionId;
this.batchSize = batchSize;
this.intermediateMerger = intermediateMerger;
this.firstRow = true;
@@ -245,7 +242,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
@Override public boolean hasNext() {
if (firstRow) {
firstRow = false;
- finalMerger = getFinalMerger(partitionId);
+ finalMerger = getFinalMerger();
List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
intermediateMerger.getMergedPages());