You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/18 12:56:29 UTC
[1/2] incubator-carbondata git commit: add SORT_COLUMNS option in
dataframe
Repository: incubator-carbondata
Updated Branches:
refs/heads/12-dev 8843aecfe -> ebf70bca8
add SORT_COLUMNS option in dataframe
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/be5904f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/be5904f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/be5904f8
Branch: refs/heads/12-dev
Commit: be5904f8ee9bab3c226cedba89d309c23f0aefae
Parents: 8843aec
Author: jackylk <ja...@huawei.com>
Authored: Tue Apr 18 07:26:43 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Apr 18 07:26:43 2017 +0800
----------------------------------------------------------------------
.../core/datastore/SegmentTaskIndexStore.java | 17 ---
.../core/mutate/CarbonUpdateUtil.java | 122 +++++++++----------
.../statusmanager/SegmentStatusManager.java | 17 +--
.../carbondata/examples/CompareTest.scala | 2 +
.../carbondata/hadoop/CarbonInputSplit.java | 10 +-
.../testsuite/dataload/TestLoadDataFrame.scala | 28 +++++
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../spark/sql/CarbonDataFrameWriter.scala | 12 +-
.../processing/merger/CarbonDataMergerUtil.java | 82 ++++++-------
9 files changed, 148 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 862455e..334efb4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil;
/**
@@ -140,22 +139,6 @@ public class SegmentTaskIndexStore
}
/**
- * returns block timestamp value from the given task
- * @param taskKey
- * @param listOfUpdatedFactFiles
- * @return
- */
- private String getTimeStampValueFromBlock(String taskKey, List<String> listOfUpdatedFactFiles) {
- for (String blockName : listOfUpdatedFactFiles) {
- if (taskKey.equals(CarbonTablePath.DataFileUtil.getTaskNo(blockName))) {
- blockName = blockName.substring(blockName.lastIndexOf('-') + 1, blockName.lastIndexOf('.'));
- return blockName;
- }
- }
- return null;
- }
-
- /**
* Below method will be used to load the segment of segments
* One segment may have multiple task , so table segment will be loaded
* based on task id and will return the map of taksId to table segment
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index af26035..72c750f 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -54,7 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
public class CarbonUpdateUtil {
private static final LogService LOGGER =
- LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
+ LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName());
/**
* returns required filed from tuple id
@@ -86,11 +86,11 @@ public class CarbonUpdateUtil {
*/
public static String getTableBlockPath(String tid, String factPath) {
String part =
- CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
+ CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID));
String segment =
- CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
+ CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID));
return factPath + CarbonCommonConstants.FILE_SEPARATOR + part
- + CarbonCommonConstants.FILE_SEPARATOR + segment;
+ + CarbonCommonConstants.FILE_SEPARATOR + segment;
}
@@ -103,7 +103,7 @@ public class CarbonUpdateUtil {
* @return
*/
public static String getDeleteDeltaFilePath(String blockPath, String blockName,
- String timestamp) {
+ String timestamp) {
return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName
+ CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
@@ -119,7 +119,7 @@ public class CarbonUpdateUtil {
CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
boolean status = false;
SegmentUpdateStatusManager segmentUpdateStatusManager =
- new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+ new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
boolean lockStatus = false;
@@ -130,12 +130,12 @@ public class CarbonUpdateUtil {
AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
// read the existing file if present and update the same.
SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager
- .getUpdateStatusDetails();
+ .getUpdateStatusDetails();
List<SegmentUpdateDetails> oldList = new ArrayList(Arrays.asList(oldDetails));
@@ -187,9 +187,9 @@ public class CarbonUpdateUtil {
* @return
*/
public static boolean updateTableMetadataStatus(Set<String> updatedSegmentsList,
- CarbonTable table, String updatedTimeStamp,
- boolean isTimestampUpdationRequired,
- List<String> segmentsToBeDeleted) {
+ CarbonTable table, String updatedTimeStamp,
+ boolean isTimestampUpdationRequired,
+ List<String> segmentsToBeDeleted) {
boolean status = false;
@@ -198,8 +198,8 @@ public class CarbonUpdateUtil {
AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
@@ -211,11 +211,11 @@ public class CarbonUpdateUtil {
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
- + " for table status updation");
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ + " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ segmentStatusManager.readLoadMetadata(metaDataFilepath);
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
@@ -223,7 +223,7 @@ public class CarbonUpdateUtil {
// we are storing the link between the 2 status files in the segment 0 only.
if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
loadMetadata.setUpdateStatusFileName(
- CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
+ CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
}
// if the segments is in the list of marked for delete then update the status.
@@ -252,7 +252,7 @@ public class CarbonUpdateUtil {
try {
segmentStatusManager
- .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+ .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
} catch (IOException e) {
return false;
}
@@ -260,18 +260,18 @@ public class CarbonUpdateUtil {
status = true;
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getFactTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
- "Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ "Table unlocked successfully after table status updation" + table.getDatabaseName()
+ + "." + table.getFactTableName());
} else {
LOGGER.error(
- "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+ .getFactTableName() + " during table status updation");
}
}
}
@@ -287,7 +287,7 @@ public class CarbonUpdateUtil {
*/
public static String getUpdateStatusFileName(String updatedTimeStamp) {
return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN
- + updatedTimeStamp;
+ + updatedTimeStamp;
}
/**
@@ -301,13 +301,13 @@ public class CarbonUpdateUtil {
AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
// as of now considering only partition 0.
String partitionId = "0";
String partitionDir = carbonTablePath.getPartitionDir(partitionId);
CarbonFile file =
- FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
+ FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
if (!file.exists()) {
return;
}
@@ -317,8 +317,8 @@ public class CarbonUpdateUtil {
@Override public boolean accept(CarbonFile file) {
String fileName = file.getName();
return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)
- || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
- || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
+ || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
}
});
// deleting the files of a segment.
@@ -390,7 +390,7 @@ public class CarbonUpdateUtil {
// scan all the carbondata files and get the latest task ID.
CarbonFile segment =
- FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+ FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
@@ -420,7 +420,7 @@ public class CarbonUpdateUtil {
// scan all the carbondata files and get the latest task ID.
CarbonFile segment =
- FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
+ FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath));
CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
@@ -495,8 +495,8 @@ public class CarbonUpdateUtil {
SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
- table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
+ .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(),
+ table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
@@ -513,23 +513,23 @@ public class CarbonUpdateUtil {
// if the segment is mark for delete or compacted then any way it will get deleted.
if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- || segment.getLoadStatus()
- .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+ || segment.getLoadStatus()
+ .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
// take the list of files from this segment.
String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
CarbonFile segDir =
- FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+ FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
CarbonFile[] allSegmentFiles = segDir.listFiles();
// scan through the segment and find the carbondatafiles and index files.
SegmentUpdateStatusManager updateStatusManager =
- new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+ new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
// get Invalid update delta files.
CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
- .getUpdateDeltaFilesList(segment.getLoadName(), false,
- CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
+ .getUpdateDeltaFilesList(segment.getLoadName(), false,
+ CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
// now for each invalid delta file need to check the query execution time out
// and then delete.
@@ -541,8 +541,8 @@ public class CarbonUpdateUtil {
// do the same for the index files.
CarbonFile[] invalidIndexFiles = updateStatusManager
- .getUpdateDeltaFilesList(segment.getLoadName(), false,
- CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
+ .getUpdateDeltaFilesList(segment.getLoadName(), false,
+ CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
// now for each invalid index file need to check the query execution time out
// and then delete.
@@ -571,16 +571,16 @@ public class CarbonUpdateUtil {
// case 1
if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
completeListOfDeleteDeltaFiles = updateStatusManager
- .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
- allSegmentFiles);
+ .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
+ allSegmentFiles);
for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
compareTimestampsAndDelete(invalidFile, forceDelete, false);
}
CarbonFile[] blockRelatedFiles = updateStatusManager
- .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles,
- block.getActualBlockName());
+ .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles,
+ block.getActualBlockName());
// now for each invalid index file need to check the query execution time out
// and then delete.
@@ -593,8 +593,8 @@ public class CarbonUpdateUtil {
} else {
invalidDeleteDeltaFiles = updateStatusManager
- .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
- allSegmentFiles);
+ .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
+ allSegmentFiles);
for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
compareTimestampsAndDelete(invalidFile, forceDelete, false);
@@ -608,10 +608,10 @@ public class CarbonUpdateUtil {
if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
final String updateStatusTimestamp = validUpdateStatusFile
- .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
+ .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
CarbonFile metaFolder = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath(),
- FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
+ FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath()));
CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
@@ -645,7 +645,7 @@ public class CarbonUpdateUtil {
int maxTime;
try {
maxTime = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+ .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
} catch (NumberFormatException e) {
maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
}
@@ -665,15 +665,15 @@ public class CarbonUpdateUtil {
* @param isUpdateStatusFile if true then the parsing of file name logic changes.
*/
private static void compareTimestampsAndDelete(CarbonFile invalidFile,
- boolean forceDelete, boolean isUpdateStatusFile) {
+ boolean forceDelete, boolean isUpdateStatusFile) {
long fileTimestamp = 0L;
if (isUpdateStatusFile) {
fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName()
- .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
+ .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
} else {
fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(
- CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
+ CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName()));
}
// if the timestamp of the file is more than the current time by query execution timeout.
@@ -698,7 +698,7 @@ public class CarbonUpdateUtil {
*/
public static boolean isBlockInvalid(String blockStatus) {
if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus
- .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+ .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
return true;
}
return false;
@@ -718,7 +718,7 @@ public class CarbonUpdateUtil {
* @param segmentBlockCount
*/
public static void decrementDeletedBlockCount(SegmentUpdateDetails details,
- Map<String, Long> segmentBlockCount) {
+ Map<String, Long> segmentBlockCount) {
String segId = details.getSegmentName();
@@ -751,12 +751,12 @@ public class CarbonUpdateUtil {
* @param segmentUpdateStatusManager
*/
public static void createBlockDetailsMap(BlockMappingVO blockMappingVO,
- SegmentUpdateStatusManager segmentUpdateStatusManager) {
+ SegmentUpdateStatusManager segmentUpdateStatusManager) {
Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
Map<String, RowCountDetailsVO> outputMap =
- new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {
String key = blockRowEntry.getKey();
@@ -771,7 +771,7 @@ public class CarbonUpdateUtil {
}
RowCountDetailsVO rowCountDetailsVO =
- new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
+ new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount);
outputMap.put(key, rowCountDetailsVO);
}
@@ -789,8 +789,8 @@ public class CarbonUpdateUtil {
public static String getSegmentBlockNameKey(String segID, String blockName) {
String blockNameWithOutPart = blockName
- .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
- blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
+ .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1,
+ blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension()));
return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c2c41e5..658ff8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -562,6 +562,7 @@ public class SegmentStatusManager {
}
}
+
/**
* This API will return the update status file name.
* @param segmentList
@@ -582,22 +583,6 @@ public class SegmentStatusManager {
return "";
}
- /**
- * getting the task numbers present in the segment.
- * @param segmentId
- * @return
- */
- public List<String> getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager
- updateStatusManager) {
- List<String> taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- List<String> list = updateStatusManager.getUpdateDeltaFiles(segmentId);
- for (String eachFileName : list) {
- taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName));
- }
- return taskList;
- }
-
-
public static class ValidAndInvalidSegmentsInfo {
private final List<String> listOfValidSegments;
private final List<String> listOfValidUpdatedSegments;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
index 82bd02a..f577fcf 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
@@ -266,6 +266,7 @@ object CompareTest {
.option("tempCSV", "false")
.option("single_pass", "true")
.option("dictionary_exclude", "id") // id is high cardinality column
+ .option("sort_columns", "")
.mode(SaveMode.Overwrite)
.save()
}
@@ -278,6 +279,7 @@ object CompareTest {
val loadParquetTime = loadParquetTable(spark, df)
val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3")
println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time")
+ df.unpersist()
spark.read.parquet(parquetTableName).registerTempTable(parquetTableName)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 0dcaba2..e89da75 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -231,14 +231,14 @@ public class CarbonInputSplit extends FileSplit
}
// Comparing the time task id of the file to other
- // if both the task id of the file is same then we need to compare the
- // offset of
- // the file
+ // if both the task id of the file is same then we need to compare the offset of the file
String filePath1 = this.getPath().getName();
String filePath2 = other.getPath().getName();
if (CarbonTablePath.isCarbonDataFile(filePath1)) {
- int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
- int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+ int firstTaskId =
+ Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1).split("_")[0]);
+ int otherTaskId =
+ Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2).split("_")[0]);
if (firstTaskId != otherTaskId) {
return firstTaskId - otherTaskId;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index b790131..d8c70b5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -111,6 +111,34 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
)
}
+ test("test load dataframe without sort") {
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon3")
+ .option("sort_columns", "")
+ .mode(SaveMode.Overwrite)
+ .save()
+ sql("select count(*) from carbon3 where c3 > 400").show
+ df.registerTempTable("temp")
+ sql("select count(*) from temp where c3 > 400").show
+ //sql("select * from carbon3 where c3 > 500").show
+ checkAnswer(
+ sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+ )
+ }
+
+ test("test load dataframe using sort_columns") {
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon3")
+ .option("sort_columns", "c2, c3")
+ .mode(SaveMode.Overwrite)
+ .save()
+ checkAnswer(
+ sql("select count(*) from carbon3 where c3 > 500"), Row(500)
+ )
+ }
+
test("test decimal values for dataframe load"){
dataFrame.write
.format("carbondata")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index c29c1a2..93c2d18 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -57,5 +57,7 @@ class CarbonOption(options: Map[String, String]) {
def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
options.contains("bucketnumber")
+ def sortColumns: Option[String] = options.get("sort_columns")
+
def toMap: Map[String, String] = options
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 576da58..b1e9ada 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import scala.collection.mutable
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.execution.command.LoadTable
@@ -33,8 +35,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
// create a new table using dataframe's schema and write its content into the table
- sqlContext.sparkSession.sql(
- makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
+ val sqlString = makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))
+ sqlContext.sparkSession.sql(sqlString)
writeToCarbonFile(parameters)
}
@@ -84,7 +86,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
try {
- sqlContext.sql(makeLoadString(tempCSVFolder, options))
+ val sqlString = makeLoadString(tempCSVFolder, options)
+ sqlContext.sql(sqlString)
} finally {
fs.delete(tempCSVPath, true)
}
@@ -164,7 +167,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
}
val property = Map(
"DICTIONARY_INCLUDE" -> options.dictionaryInclude,
- "DICTIONARY_EXCLUDE" -> options.dictionaryExclude
+ "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
+ "SORT_COLUMNS" -> options.sortColumns
).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
s"""
| CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 2b81eec..a6a9b88 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -853,12 +853,12 @@ public final class CarbonDataMergerUtil {
* @return
*/
public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
- throws IOException {
+ throws IOException {
SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
try {
validAndInvalidSegments =
- new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+ new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
} catch (IOException e) {
LOGGER.error("Error while getting valid segment list for a table identifier");
throw new IOException();
@@ -913,9 +913,9 @@ public final class CarbonDataMergerUtil {
private static boolean isSegmentValid(LoadMetadataDetails seg) {
return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- || seg.getLoadStatus()
- .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
- .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
+ || seg.getLoadStatus()
+ .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
+ .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
}
/**
@@ -1200,16 +1200,16 @@ public final class CarbonDataMergerUtil {
CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
try {
deleteDeltaBlockDetails =
- dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
+ dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
} catch (Exception e) {
String blockFilePath = fullBlockFilePath
- .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+ .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath);
throw new IOException();
}
CarbonDeleteDeltaWriterImpl carbonDeleteWriter =
- new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
- FileFactory.getFileType(fullBlockFilePath));
+ new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
+ FileFactory.getFileType(fullBlockFilePath));
try {
carbonDeleteWriter.write(deleteDeltaBlockDetails);
} catch (IOException e) {
@@ -1220,11 +1220,11 @@ public final class CarbonDataMergerUtil {
}
public static Boolean updateStatusFile(
- List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
- String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
+ List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
+ String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
List<SegmentUpdateDetails> segmentUpdateDetails =
- new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
+ new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
// Check the list output.
@@ -1235,10 +1235,10 @@ public final class CarbonDataMergerUtil {
tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName());
for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager
- .getUpdateStatusDetails()) {
+ .getUpdateStatusDetails()) {
if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName())
- && origDetails.getSegmentName()
- .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
+ && origDetails.getSegmentName()
+ .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock());
tempSegmentUpdateDetails.setStatus(origDetails.getStatus());
@@ -1247,9 +1247,9 @@ public final class CarbonDataMergerUtil {
}
tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp(
- carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
+ carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
tempSegmentUpdateDetails
- .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
+ .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
segmentUpdateDetails.add(tempSegmentUpdateDetails);
} else return false;
@@ -1262,8 +1262,8 @@ public final class CarbonDataMergerUtil {
AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
@@ -1277,38 +1277,38 @@ public final class CarbonDataMergerUtil {
lockStatus = carbonLock.lockWithRetries();
if (lockStatus) {
LOGGER.info(
- "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
- + " for table status updation");
+ "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+ + " for table status updation");
LoadMetadataDetails[] listOfLoadFolderDetailsArray =
- segmentStatusManager.readLoadMetadata(metaDataFilepath);
+ segmentStatusManager.readLoadMetadata(metaDataFilepath);
for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
loadMetadata.setUpdateStatusFileName(
- CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
+ CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
}
}
try {
segmentStatusManager
- .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+ .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
} catch (IOException e) {
return false;
}
} else {
LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
- .getDatabaseName() + "." + table.getFactTableName());
+ .getDatabaseName() + "." + table.getFactTableName());
}
} finally {
if (lockStatus) {
if (carbonLock.unlock()) {
LOGGER.info(
- "Table unlocked successfully after table status updation" + table.getDatabaseName()
- + "." + table.getFactTableName());
+ "Table unlocked successfully after table status updation" + table.getDatabaseName()
+ + "." + table.getFactTableName());
} else {
LOGGER.error(
- "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getFactTableName() + " during table status updation");
+ "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+ .getFactTableName() + " during table status updation");
}
}
}
@@ -1326,7 +1326,7 @@ public final class CarbonDataMergerUtil {
String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
AbsoluteTableIdentifier absoluteTableIdentifier =
- model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+ model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
List<LoadMetadataDetails> originalList = Arrays.asList(details);
@@ -1340,24 +1340,24 @@ public final class CarbonDataMergerUtil {
ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
- model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
- LockUsage.TABLE_STATUS_LOCK);
+ model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
+ LockUsage.TABLE_STATUS_LOCK);
try {
if (carbonTableStatusLock.lockWithRetries()) {
LOGGER.info(
"Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
- + " for table status updation ");
+ + " for table status updation ");
CarbonTablePath carbonTablePath = CarbonStorePath
- .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
- absoluteTableIdentifier.getCarbonTableIdentifier());
+ .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+ absoluteTableIdentifier.getCarbonTableIdentifier());
segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
- originalList.toArray(new LoadMetadataDetails[originalList.size()]));
+ originalList.toArray(new LoadMetadataDetails[originalList.size()]));
} else {
LOGGER.error(
- "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
- .getTableName() + "for table status updation");
+ "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
+ .getTableName() + "for table status updation");
throw new Exception("Failed to update the MajorCompactionStatus.");
}
} catch (IOException e) {
@@ -1366,11 +1366,11 @@ public final class CarbonDataMergerUtil {
} finally {
if (carbonTableStatusLock.unlock()) {
LOGGER.info(
- "Table unlocked successfully after table status updation" + model.getDatabaseName()
- + "." + model.getTableName());
+ "Table unlocked successfully after table status updation" + model.getDatabaseName()
+ + "." + model.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
- .getTableName() + " during table status updation");
+ .getTableName() + " during table status updation");
}
}
[2/2] incubator-carbondata git commit: [CARBONDATA-882] Add
SORT_COLUMNS support in dataframe writer This closes #737
Posted by ja...@apache.org.
[CARBONDATA-882] Add SORT_COLUMNS support in dataframe writer This closes #737
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/ebf70bca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/ebf70bca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/ebf70bca
Branch: refs/heads/12-dev
Commit: ebf70bca834ab9145191fe43618cbb1ab6875941
Parents: 8843aec be5904f
Author: jackylk <ja...@huawei.com>
Authored: Tue Apr 18 20:56:01 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Apr 18 20:56:01 2017 +0800
----------------------------------------------------------------------
.../core/datastore/SegmentTaskIndexStore.java | 17 ---
.../core/mutate/CarbonUpdateUtil.java | 122 +++++++++----------
.../statusmanager/SegmentStatusManager.java | 17 +--
.../carbondata/examples/CompareTest.scala | 2 +
.../carbondata/hadoop/CarbonInputSplit.java | 10 +-
.../testsuite/dataload/TestLoadDataFrame.scala | 28 +++++
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../spark/sql/CarbonDataFrameWriter.scala | 12 +-
.../processing/merger/CarbonDataMergerUtil.java | 82 ++++++-------
9 files changed, 148 insertions(+), 144 deletions(-)
----------------------------------------------------------------------