You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:50 UTC

[16/24] carbondata git commit: [CARBONDATA-1668] Remove isTableSplitPartition in data loading

[CARBONDATA-1668] Remove isTableSplitPartition in data loading

This closes #1466


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d6967bff
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d6967bff
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d6967bff

Branch: refs/heads/pre-aggregate
Commit: d6967bffcd5bec622eff22154edec3edf7a64dec
Parents: 6f68971
Author: Jacky Li <ja...@qq.com>
Authored: Sun Nov 5 20:35:09 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Nov 7 11:11:29 2017 +0800

----------------------------------------------------------------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 122 ++++---------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 172 ++++++++-----------
 2 files changed, 105 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6967bff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 1d6ad70..74f7528 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -177,8 +177,7 @@ class NewCarbonDataLoadRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    blocksGroupBy: Array[(String, Array[BlockDetails])],
-    isTableSplitPartition: Boolean)
+    blocksGroupBy: Array[(String, Array[BlockDetails])])
   extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -193,22 +192,8 @@ class NewCarbonDataLoadRDD[K, V](
     sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
 
   override def getPartitions: Array[Partition] = {
-    if (isTableSplitPartition) {
-      // for table split partition
-      var splits: Array[TableSplit] = null
-      splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-
-      splits.zipWithIndex.map { s =>
-        // filter the same partition unique id, because only one will match, so get 0 element
-        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
-          p._1 == s._1.getPartition.getUniqueID)(0)._2
-        new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
-      }
-    } else {
-      // for node partition
-      blocksGroupBy.zipWithIndex.map { b =>
-        new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
-      }
+    blocksGroupBy.zipWithIndex.map { b =>
+      new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
     }
   }
 
@@ -278,50 +263,25 @@ class NewCarbonDataLoadRDD[K, V](
         CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel)
         val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
         val format = new CSVInputFormat
-        if (isTableSplitPartition) {
-          // for table split partition
-          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
-          logInfo("Input split: " + split.serializableHadoopSplit.value)
-          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          model = carbonLoadModel.getCopyWithPartition(
-              split.serializableHadoopSplit.value.getPartition.getUniqueID,
-              split.serializableHadoopSplit.value.getPartition.getFilesPath,
-              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-          StandardLogService.setThreadName(StandardLogService
-            .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
-            , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
-              partitionID, split.partitionBlocksDetail.length)
-          val readers =
-          split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.map { case (reader, index) =>
-            new CSVRecordReaderIterator(reader,
-              split.partitionBlocksDetail(index),
-              hadoopAttemptContext)
-          }
-        } else {
-          // for node partition
-          val split = theSplit.asInstanceOf[CarbonNodePartition]
-          logInfo("Input split: " + split.serializableHadoopSplit)
-          logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
-              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
-          val blocksID = gernerateBlocksID
-          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          val filelist: java.util.List[String] = new java.util.ArrayList[String](
-              CarbonCommonConstants.CONSTANT_SIZE_TEN)
-          CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
-          model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
-              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          StandardLogService.setThreadName(StandardLogService
-            .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
-            , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
-          val readers =
-            split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
-          readers.zipWithIndex.map { case (reader, index) =>
-            new CSVRecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
-          }
+
+        val split = theSplit.asInstanceOf[CarbonNodePartition]
+        logInfo("Input split: " + split.serializableHadoopSplit)
+        logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+            split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        val fileList: java.util.List[String] = new java.util.ArrayList[String](
+            CarbonCommonConstants.CONSTANT_SIZE_TEN)
+        CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
+        model = carbonLoadModel.getCopyWithPartition(partitionID, fileList,
+            carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+        StandardLogService.setThreadName(StandardLogService
+          .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
+          , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
+        val readers =
+          split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+        readers.zipWithIndex.map { case (reader, index) =>
+          new CSVRecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
         }
       }
       /**
@@ -330,14 +290,8 @@ class NewCarbonDataLoadRDD[K, V](
        * @return
        */
       def gernerateBlocksID: String = {
-        if (isTableSplitPartition) {
-          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
-            .getPartition.getUniqueID + "_" + UUID.randomUUID()
-        } else {
-          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-          UUID.randomUUID()
-        }
+        carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+        UUID.randomUUID()
       }
 
       var finished = false
@@ -355,23 +309,17 @@ class NewCarbonDataLoadRDD[K, V](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    if (isTableSplitPartition) {
-      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
-      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
-      location
-    } else {
-      val theSplit = split.asInstanceOf[CarbonNodePartition]
-      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
-      logInfo("Preferred Location for split : " + firstOptionLocation.mkString(","))
-      /**
-       * At original logic, we were adding the next preferred location so that in case of the
-       * failure the Spark should know where to schedule the failed task.
-       * Remove the next preferred location is because some time Spark will pick the same node
-       * for 2 tasks, so one node is getting over loaded with the task and one have no task to
-       * do. And impacting the performance despite of any failure.
-       */
-      firstOptionLocation
-    }
+    val theSplit = split.asInstanceOf[CarbonNodePartition]
+    val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+    logInfo("Preferred Location for split : " + firstOptionLocation.mkString(","))
+    /**
+     * At original logic, we were adding the next preferred location so that in case of the
+     * failure the Spark should know where to schedule the failed task.
+     * Remove the next preferred location is because some time Spark will pick the same node
+     * for 2 tasks, so one node is getting over loaded with the task and one have no task to
+     * do. And impacting the performance despite of any failure.
+     */
+    firstOptionLocation
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6967bff/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cfd8cff..1ad25c3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -64,8 +64,7 @@ import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.splits.TableSplit
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil}
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
@@ -635,117 +634,86 @@ object CarbonDataRDDFactory {
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
         carbonLoadModel.getTableName, storePath, false, carbonTable)
-      // get partition way from configuration
-      // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
-      // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
-      // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
-      val isTableSplitPartition = false
       var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
       var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
       var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
       def loadDataFile(): Unit = {
-        if (isTableSplitPartition) {
-          /*
-         * when data handle by table split partition
-         * 1) get partition files, direct load or not will get the different files path
-         * 2) get files blocks by using SplitUtils
-         * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
+        /*
+         * when data load handle by node partition
+         * 1)clone the hadoop configuration,and set the file path to the configuration
+         * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
+         * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
+         *   for locally writing carbondata files(one file one block) in nodes
+         * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
          */
-          var splits = Array[TableSplit]()
-          // get all table Splits, this part means files were divide to different partitions
-          splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-          // get all partition blocks from file list
-          blocksGroupBy = splits.map {
-            split =>
-              val pathBuilder = new StringBuilder()
-              for (path <- split.getPartition.getFilesPath.asScala) {
-                pathBuilder.append(path).append(",")
-              }
-              if (pathBuilder.nonEmpty) {
-                pathBuilder.substring(0, pathBuilder.size - 1)
-              }
-              (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
-                sqlContext.sparkContext
-              ))
-          }
-        } else {
-          /*
-           * when data load handle by node partition
-           * 1)clone the hadoop configuration,and set the file path to the configuration
-           * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
-           * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
-           *   for locally writing carbondata files(one file one block) in nodes
-           * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
-           */
-          val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-          // FileUtils will skip file which is no csv, and return all file path which split by ','
-          val filePaths = carbonLoadModel.getFactFilePath
-          hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
-          hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
-          hadoopConfiguration.set("io.compression.codecs",
-            """org.apache.hadoop.io.compress.GzipCodec,
-               org.apache.hadoop.io.compress.DefaultCodec,
-               org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
-          CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
-
-          val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-          val jobContext = new Job(hadoopConfiguration)
-          val rawSplits = inputFormat.getSplits(jobContext).toArray
-          val blockList = rawSplits.map { inputSplit =>
-            val fileSplit = inputSplit.asInstanceOf[FileSplit]
-            new TableBlockInfo(fileSplit.getPath.toString,
-              fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
-            ).asInstanceOf[Distributable]
-          }
-          // group blocks to nodes, tasks
-          val startTime = System.currentTimeMillis
-          val activeNodes = DistributionUtil
-              .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
-          val nodeBlockMapping =
-            CarbonLoaderUtil
-                .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-                .toSeq
-          val timeElapsed: Long = System.currentTimeMillis - startTime
-          LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
-          LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-              s"No.of Nodes: ${nodeBlockMapping.size}")
-          var str = ""
-          nodeBlockMapping.foreach(entry => {
-            val tableBlock = entry._2
-            str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
-            tableBlock.asScala.foreach(tableBlockInfo =>
-              if (!tableBlockInfo.getLocations.exists(hostentry =>
-                hostentry.equalsIgnoreCase(entry._1)
-              )) {
-                str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                    .foldLeft("")((a, b) => a + "," + b)
-              }
-            )
-            str = str + "\n"
-          }
+        val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+        // FileUtils will skip file which is no csv, and return all file path which split by ','
+        val filePaths = carbonLoadModel.getFactFilePath
+        hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
+        hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+        hadoopConfiguration.set("io.compression.codecs",
+          """org.apache.hadoop.io.compress.GzipCodec,
+             org.apache.hadoop.io.compress.DefaultCodec,
+             org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
+
+        CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+
+        val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+        val jobContext = new Job(hadoopConfiguration)
+        val rawSplits = inputFormat.getSplits(jobContext).toArray
+        val blockList = rawSplits.map { inputSplit =>
+          val fileSplit = inputSplit.asInstanceOf[FileSplit]
+          new TableBlockInfo(fileSplit.getPath.toString,
+            fileSplit.getStart, "1",
+            fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
+          ).asInstanceOf[Distributable]
+        }
+        // group blocks to nodes, tasks
+        val startTime = System.currentTimeMillis
+        val activeNodes = DistributionUtil
+            .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+        val nodeBlockMapping =
+          CarbonLoaderUtil
+              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+              .toSeq
+        val timeElapsed: Long = System.currentTimeMillis - startTime
+        LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
+        LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
+            s"No.of Nodes: ${nodeBlockMapping.size}")
+        var str = ""
+        nodeBlockMapping.foreach(entry => {
+          val tableBlock = entry._2
+          str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
+          tableBlock.asScala.foreach(tableBlockInfo =>
+            if (!tableBlockInfo.getLocations.exists(hostentry =>
+              hostentry.equalsIgnoreCase(entry._1)
+            )) {
+              str = str + " , mismatch locations: " + tableBlockInfo.getLocations
+                  .foldLeft("")((a, b) => a + "," + b)
+            }
           )
-          LOGGER.info(str)
-          blocksGroupBy = nodeBlockMapping.map(entry => {
-            val blockDetailsList =
-              entry._2.asScala.map(distributable => {
-                val tableBlock = distributable.asInstanceOf[TableBlockInfo]
-                new BlockDetails(new Path(tableBlock.getFilePath),
-                  tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
-                )
-              }).toArray
-            (entry._1, blockDetailsList)
-          }
-          ).toArray
+          str = str + "\n"
         }
+        )
+        LOGGER.info(str)
+        blocksGroupBy = nodeBlockMapping.map { entry =>
+          val blockDetailsList =
+            entry._2.asScala.map { distributable =>
+              val tableBlock = distributable.asInstanceOf[TableBlockInfo]
+              new BlockDetails(new Path(tableBlock.getFilePath),
+                tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
+              )
+            }.toArray
+          (entry._1, blockDetailsList)
+        }.toArray
 
-        status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
+        status = new NewCarbonDataLoadRDD(
+          sqlContext.sparkContext,
           new DataLoadResultImpl(),
           carbonLoadModel,
-          blocksGroupBy,
-          isTableSplitPartition).collect()
+          blocksGroupBy
+        ).collect()
       }
 
       def loadDataFrame(): Unit = {