You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:49 UTC
[15/24] carbondata git commit: [CARBONDATA-1667] Remove direct load
related code
[CARBONDATA-1667] Remove direct load related code
This closes #1465
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f689719
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f689719
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f689719
Branch: refs/heads/pre-aggregate
Commit: 6f6897191819994da2066584721c462f03184cc6
Parents: f812e41
Author: Jacky Li <ja...@qq.com>
Authored: Sat Nov 4 18:53:46 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Nov 6 12:21:32 2017 +0800
----------------------------------------------------------------------
.../presto/util/CarbonDataStoreCreator.scala | 1 -
.../spark/rdd/NewCarbonDataLoadRDD.scala | 35 ++++----------
.../carbondata/spark/util/DataLoadingUtil.scala | 1 -
.../spark/rdd/CarbonDataRDDFactory.scala | 49 ++++++--------------
.../loading/model/CarbonLoadModel.java | 11 -----
5 files changed, 25 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index acee71b..09cddfe 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -118,7 +118,6 @@ object CarbonDataStoreCreator {
loadModel.setBadRecordsAction(
TableOptionConstant.BAD_RECORDS_ACTION.getName + "," +
"force")
- loadModel.setDirectLoad(true)
loadModel.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD +
"," +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 49b708c..1d6ad70 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -196,13 +196,7 @@ class NewCarbonDataLoadRDD[K, V](
if (isTableSplitPartition) {
// for table split partition
var splits: Array[TableSplit] = null
-
- if (carbonLoadModel.isDirectLoad) {
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
- } else {
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null)
- }
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
splits.zipWithIndex.map { s =>
// filter the same partition unique id, because only one will match, so get 0 element
@@ -289,15 +283,10 @@ class NewCarbonDataLoadRDD[K, V](
val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
logInfo("Input split: " + split.serializableHadoopSplit.value)
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID,
- split.serializableHadoopSplit.value.getPartition.getFilesPath,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID)
- }
+ model = carbonLoadModel.getCopyWithPartition(
+ split.serializableHadoopSplit.value.getPartition.getUniqueID,
+ split.serializableHadoopSplit.value.getPartition.getFilesPath,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
StandardLogService.setThreadName(StandardLogService
.getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
@@ -320,15 +309,11 @@ class NewCarbonDataLoadRDD[K, V](
split.serializableHadoopSplit, split.nodeBlocksDetail.length)
val blocksID = gernerateBlocksID
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- if (carbonLoadModel.isDirectLoad) {
- val filelist: java.util.List[String] = new java.util.ArrayList[String](
- CarbonCommonConstants.CONSTANT_SIZE_TEN)
- CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- } else {
- model = carbonLoadModel.getCopyWithPartition(partitionID)
- }
+ val filelist: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
StandardLogService.setThreadName(StandardLogService
.getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
, ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 445fdbb..5a24d7d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -285,7 +285,6 @@ object DataLoadingUtil {
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
carbonLoadModel.setCsvHeader(fileHeader)
carbonLoadModel.setColDictFilePath(column_dict)
- carbonLoadModel.setDirectLoad(true)
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
val validatedMaxColumns = CommonUtil.validateMaxColumns(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 628d444..cfd8cff 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -653,40 +653,21 @@ object CarbonDataRDDFactory {
* 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
*/
var splits = Array[TableSplit]()
- if (carbonLoadModel.isDirectLoad) {
- // get all table Splits, this part means files were divide to different partitions
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
- // get all partition blocks from file list
- blocksGroupBy = splits.map {
- split =>
- val pathBuilder = new StringBuilder()
- for (path <- split.getPartition.getFilesPath.asScala) {
- pathBuilder.append(path).append(",")
- }
- if (pathBuilder.nonEmpty) {
- pathBuilder.substring(0, pathBuilder.size - 1)
- }
- (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
- sqlContext.sparkContext
- ))
- }
- } else {
- // get all table Splits,when come to this, means data have been partition
- splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, null)
- // get all partition blocks from factFilePath/uniqueID/
- blocksGroupBy = splits.map {
- split =>
- val pathBuilder = new StringBuilder()
- pathBuilder.append(carbonLoadModel.getFactFilePath)
- if (!carbonLoadModel.getFactFilePath.endsWith("/")
- && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
- pathBuilder.append("/")
- }
- pathBuilder.append(split.getPartition.getUniqueID).append("/")
- (split.getPartition.getUniqueID,
- SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
- }
+ // get all table Splits, this part means files were divide to different partitions
+ splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
+ // get all partition blocks from file list
+ blocksGroupBy = splits.map {
+ split =>
+ val pathBuilder = new StringBuilder()
+ for (path <- split.getPartition.getFilesPath.asScala) {
+ pathBuilder.append(path).append(",")
+ }
+ if (pathBuilder.nonEmpty) {
+ pathBuilder.substring(0, pathBuilder.size - 1)
+ }
+ (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
+ sqlContext.sparkContext
+ ))
}
} else {
/*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 39ee270..6a156a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -53,7 +53,6 @@ public class CarbonLoadModel implements Serializable {
private String complexDelimiterLevel1;
private String complexDelimiterLevel2;
- private boolean isDirectLoad;
private List<LoadMetadataDetails> loadMetadataDetails;
private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
@@ -206,14 +205,6 @@ public class CarbonLoadModel implements Serializable {
this.complexDelimiterLevel2 = complexDelimiterLevel2;
}
- public boolean isDirectLoad() {
- return isDirectLoad;
- }
-
- public void setDirectLoad(boolean isDirectLoad) {
- this.isDirectLoad = isDirectLoad;
- }
-
public String getAllDictPath() {
return allDictPath;
}
@@ -383,7 +374,6 @@ public class CarbonLoadModel implements Serializable {
copy.isRetentionRequest = isRetentionRequest;
copy.csvHeader = csvHeader;
copy.csvHeaderColumns = csvHeaderColumns;
- copy.isDirectLoad = isDirectLoad;
copy.csvDelimiter = csvDelimiter;
copy.complexDelimiterLevel1 = complexDelimiterLevel1;
copy.complexDelimiterLevel2 = complexDelimiterLevel2;
@@ -434,7 +424,6 @@ public class CarbonLoadModel implements Serializable {
copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
copyObj.csvHeader = header;
copyObj.csvHeaderColumns = csvHeaderColumns;
- copyObj.isDirectLoad = true;
copyObj.csvDelimiter = delimiter;
copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;