You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:36 UTC

[10/49] carbondata git commit: [CARBONDATA-1667] Remove direct load related code

[CARBONDATA-1667] Remove direct load related code

This closes #1465


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f689719
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f689719
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f689719

Branch: refs/heads/fgdatamap
Commit: 6f6897191819994da2066584721c462f03184cc6
Parents: f812e41
Author: Jacky Li <ja...@qq.com>
Authored: Sat Nov 4 18:53:46 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Mon Nov 6 12:21:32 2017 +0800

----------------------------------------------------------------------
 .../presto/util/CarbonDataStoreCreator.scala    |  1 -
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 35 ++++----------
 .../carbondata/spark/util/DataLoadingUtil.scala |  1 -
 .../spark/rdd/CarbonDataRDDFactory.scala        | 49 ++++++--------------
 .../loading/model/CarbonLoadModel.java          | 11 -----
 5 files changed, 25 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index acee71b..09cddfe 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -118,7 +118,6 @@ object CarbonDataStoreCreator {
       loadModel.setBadRecordsAction(
         TableOptionConstant.BAD_RECORDS_ACTION.getName + "," +
         "force")
-      loadModel.setDirectLoad(true)
       loadModel.setIsEmptyDataBadRecord(
         DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD +
         "," +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 49b708c..1d6ad70 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -196,13 +196,7 @@ class NewCarbonDataLoadRDD[K, V](
     if (isTableSplitPartition) {
       // for table split partition
       var splits: Array[TableSplit] = null
-
-      if (carbonLoadModel.isDirectLoad) {
-        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-      } else {
-        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName, null)
-      }
+      splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
 
       splits.zipWithIndex.map { s =>
         // filter the same partition unique id, because only one will match, so get 0 element
@@ -289,15 +283,10 @@ class NewCarbonDataLoadRDD[K, V](
           val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
           logInfo("Input split: " + split.serializableHadoopSplit.value)
           carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          if (carbonLoadModel.isDirectLoad) {
-            model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID,
-                split.serializableHadoopSplit.value.getPartition.getFilesPath,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          } else {
-            model = carbonLoadModel.getCopyWithPartition(
-                split.serializableHadoopSplit.value.getPartition.getUniqueID)
-          }
+          model = carbonLoadModel.getCopyWithPartition(
+              split.serializableHadoopSplit.value.getPartition.getUniqueID,
+              split.serializableHadoopSplit.value.getPartition.getFilesPath,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
           partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
           StandardLogService.setThreadName(StandardLogService
             .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
@@ -320,15 +309,11 @@ class NewCarbonDataLoadRDD[K, V](
               split.serializableHadoopSplit, split.nodeBlocksDetail.length)
           val blocksID = gernerateBlocksID
           carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          if (carbonLoadModel.isDirectLoad) {
-            val filelist: java.util.List[String] = new java.util.ArrayList[String](
-                CarbonCommonConstants.CONSTANT_SIZE_TEN)
-            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
-            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
-                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          } else {
-            model = carbonLoadModel.getCopyWithPartition(partitionID)
-          }
+          val filelist: java.util.List[String] = new java.util.ArrayList[String](
+              CarbonCommonConstants.CONSTANT_SIZE_TEN)
+          CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+          model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
           StandardLogService.setThreadName(StandardLogService
             .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
             , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 445fdbb..5a24d7d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -285,7 +285,6 @@ object DataLoadingUtil {
     carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
     carbonLoadModel.setCsvHeader(fileHeader)
     carbonLoadModel.setColDictFilePath(column_dict)
-    carbonLoadModel.setDirectLoad(true)
     carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
 
     val validatedMaxColumns = CommonUtil.validateMaxColumns(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 628d444..cfd8cff 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -653,40 +653,21 @@ object CarbonDataRDDFactory {
          * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
          */
           var splits = Array[TableSplit]()
-          if (carbonLoadModel.isDirectLoad) {
-            // get all table Splits, this part means files were divide to different partitions
-            splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-            // get all partition blocks from file list
-            blocksGroupBy = splits.map {
-              split =>
-                val pathBuilder = new StringBuilder()
-                for (path <- split.getPartition.getFilesPath.asScala) {
-                  pathBuilder.append(path).append(",")
-                }
-                if (pathBuilder.nonEmpty) {
-                  pathBuilder.substring(0, pathBuilder.size - 1)
-                }
-                (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
-                  sqlContext.sparkContext
-                ))
-            }
-          } else {
-            // get all table Splits,when come to this, means data have been partition
-            splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-              carbonLoadModel.getTableName, null)
-            // get all partition blocks from factFilePath/uniqueID/
-            blocksGroupBy = splits.map {
-              split =>
-                val pathBuilder = new StringBuilder()
-                pathBuilder.append(carbonLoadModel.getFactFilePath)
-                if (!carbonLoadModel.getFactFilePath.endsWith("/")
-                    && !carbonLoadModel.getFactFilePath.endsWith("\\")) {
-                  pathBuilder.append("/")
-                }
-                pathBuilder.append(split.getPartition.getUniqueID).append("/")
-                (split.getPartition.getUniqueID,
-                    SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
-            }
+          // get all table Splits, this part means files were divide to different partitions
+          splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
+          // get all partition blocks from file list
+          blocksGroupBy = splits.map {
+            split =>
+              val pathBuilder = new StringBuilder()
+              for (path <- split.getPartition.getFilesPath.asScala) {
+                pathBuilder.append(path).append(",")
+              }
+              if (pathBuilder.nonEmpty) {
+                pathBuilder.substring(0, pathBuilder.size - 1)
+              }
+              (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
+                sqlContext.sparkContext
+              ))
           }
         } else {
           /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 39ee270..6a156a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -53,7 +53,6 @@ public class CarbonLoadModel implements Serializable {
   private String complexDelimiterLevel1;
   private String complexDelimiterLevel2;
 
-  private boolean isDirectLoad;
   private List<LoadMetadataDetails> loadMetadataDetails;
   private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
 
@@ -206,14 +205,6 @@ public class CarbonLoadModel implements Serializable {
     this.complexDelimiterLevel2 = complexDelimiterLevel2;
   }
 
-  public boolean isDirectLoad() {
-    return isDirectLoad;
-  }
-
-  public void setDirectLoad(boolean isDirectLoad) {
-    this.isDirectLoad = isDirectLoad;
-  }
-
   public String getAllDictPath() {
     return allDictPath;
   }
@@ -383,7 +374,6 @@ public class CarbonLoadModel implements Serializable {
     copy.isRetentionRequest = isRetentionRequest;
     copy.csvHeader = csvHeader;
     copy.csvHeaderColumns = csvHeaderColumns;
-    copy.isDirectLoad = isDirectLoad;
     copy.csvDelimiter = csvDelimiter;
     copy.complexDelimiterLevel1 = complexDelimiterLevel1;
     copy.complexDelimiterLevel2 = complexDelimiterLevel2;
@@ -434,7 +424,6 @@ public class CarbonLoadModel implements Serializable {
     copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
     copyObj.csvHeader = header;
     copyObj.csvHeaderColumns = csvHeaderColumns;
-    copyObj.isDirectLoad = true;
     copyObj.csvDelimiter = delimiter;
     copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
     copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;