You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 04:15:50 UTC
[16/24] carbondata git commit: [CARBONDATA-1668] Remove
isTableSplitPartition in data loading
[CARBONDATA-1668] Remove isTableSplitPartition in data loading
This closes #1466
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d6967bff
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d6967bff
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d6967bff
Branch: refs/heads/pre-aggregate
Commit: d6967bffcd5bec622eff22154edec3edf7a64dec
Parents: 6f68971
Author: Jacky Li <ja...@qq.com>
Authored: Sun Nov 5 20:35:09 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Nov 7 11:11:29 2017 +0800
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 122 ++++---------
.../spark/rdd/CarbonDataRDDFactory.scala | 172 ++++++++-----------
2 files changed, 105 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6967bff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 1d6ad70..74f7528 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -177,8 +177,7 @@ class NewCarbonDataLoadRDD[K, V](
sc: SparkContext,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- blocksGroupBy: Array[(String, Array[BlockDetails])],
- isTableSplitPartition: Boolean)
+ blocksGroupBy: Array[(String, Array[BlockDetails])])
extends CarbonRDD[(K, V)](sc, Nil) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -193,22 +192,8 @@ class NewCarbonDataLoadRDD[K, V](
sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
override def getPartitions: Array[Partition] = {
- if (isTableSplitPartition) {
- // for table split partition
- var splits: Array[TableSplit] = null
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-
- splits.zipWithIndex.map { s =>
- // filter the same partition unique id, because only one will match, so get 0 element
- val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
- p._1 == s._1.getPartition.getUniqueID)(0)._2
- new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
- }
- } else {
- // for node partition
- blocksGroupBy.zipWithIndex.map { b =>
- new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
- }
+ blocksGroupBy.zipWithIndex.map { b =>
+ new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
}
}
@@ -278,50 +263,25 @@ class NewCarbonDataLoadRDD[K, V](
CommonUtil.configureCSVInputFormat(configuration, carbonLoadModel)
val hadoopAttemptContext = new TaskAttemptContextImpl(configuration, attemptId)
val format = new CSVInputFormat
- if (isTableSplitPartition) {
- // for table split partition
- val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
- logInfo("Input split: " + split.serializableHadoopSplit.value)
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- model = carbonLoadModel.getCopyWithPartition(
- split.serializableHadoopSplit.value.getPartition.getUniqueID,
- split.serializableHadoopSplit.value.getPartition.getFilesPath,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
- StandardLogService.setThreadName(StandardLogService
- .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
- , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
- partitionID, split.partitionBlocksDetail.length)
- val readers =
- split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.map { case (reader, index) =>
- new CSVRecordReaderIterator(reader,
- split.partitionBlocksDetail(index),
- hadoopAttemptContext)
- }
- } else {
- // for node partition
- val split = theSplit.asInstanceOf[CarbonNodePartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
- split.serializableHadoopSplit, split.nodeBlocksDetail.length)
- val blocksID = gernerateBlocksID
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- val filelist: java.util.List[String] = new java.util.ArrayList[String](
- CarbonCommonConstants.CONSTANT_SIZE_TEN)
- CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
- model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
- carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
- StandardLogService.setThreadName(StandardLogService
- .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
- , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
- val readers =
- split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
- readers.zipWithIndex.map { case (reader, index) =>
- new CSVRecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
- }
+
+ val split = theSplit.asInstanceOf[CarbonNodePartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
+ logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+ CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+ split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+ val fileList: java.util.List[String] = new java.util.ArrayList[String](
+ CarbonCommonConstants.CONSTANT_SIZE_TEN)
+ CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
+ model = carbonLoadModel.getCopyWithPartition(partitionID, fileList,
+ carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+ StandardLogService.setThreadName(StandardLogService
+ .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
+ , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
+ val readers =
+ split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+ readers.zipWithIndex.map { case (reader, index) =>
+ new CSVRecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
}
}
/**
@@ -330,14 +290,8 @@ class NewCarbonDataLoadRDD[K, V](
* @return
*/
def gernerateBlocksID: String = {
- if (isTableSplitPartition) {
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
- .getPartition.getUniqueID + "_" + UUID.randomUUID()
- } else {
- carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
- UUID.randomUUID()
- }
+ carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+ UUID.randomUUID()
}
var finished = false
@@ -355,23 +309,17 @@ class NewCarbonDataLoadRDD[K, V](
}
override def getPreferredLocations(split: Partition): Seq[String] = {
- if (isTableSplitPartition) {
- val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
- val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
- location
- } else {
- val theSplit = split.asInstanceOf[CarbonNodePartition]
- val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
- logInfo("Preferred Location for split : " + firstOptionLocation.mkString(","))
- /**
- * At original logic, we were adding the next preferred location so that in case of the
- * failure the Spark should know where to schedule the failed task.
- * Remove the next preferred location is because some time Spark will pick the same node
- * for 2 tasks, so one node is getting over loaded with the task and one have no task to
- * do. And impacting the performance despite of any failure.
- */
- firstOptionLocation
- }
+ val theSplit = split.asInstanceOf[CarbonNodePartition]
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split : " + firstOptionLocation.mkString(","))
+ /**
+ * At original logic, we were adding the next preferred location so that in case of the
+ * failure the Spark should know where to schedule the failed task.
+ * Remove the next preferred location is because some time Spark will pick the same node
+ * for 2 tasks, so one node is getting over loaded with the task and one have no task to
+ * do. And impacting the performance despite of any failure.
+ */
+ firstOptionLocation
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d6967bff/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cfd8cff..1ad25c3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -64,8 +64,7 @@ import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.sort.SortScopeOptions
import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.splits.TableSplit
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil, CarbonQueryUtil}
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
@@ -635,117 +634,86 @@ object CarbonDataRDDFactory {
// Check if any load need to be deleted before loading new data
DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName, storePath, false, carbonTable)
- // get partition way from configuration
- // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
- // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
- // CarbonCommonConstants.TABLE_SPLIT_PARTITION_DEFAULT_VALUE).toBoolean
- val isTableSplitPartition = false
var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
def loadDataFile(): Unit = {
- if (isTableSplitPartition) {
- /*
- * when data handle by table split partition
- * 1) get partition files, direct load or not will get the different files path
- * 2) get files blocks by using SplitUtils
- * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy
+ /*
+ * when data load handle by node partition
+ * 1)clone the hadoop configuration,and set the file path to the configuration
+ * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
+ * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
+ * for locally writing carbondata files(one file one block) in nodes
+ * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
*/
- var splits = Array[TableSplit]()
- // get all table Splits, this part means files were divide to different partitions
- splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
- // get all partition blocks from file list
- blocksGroupBy = splits.map {
- split =>
- val pathBuilder = new StringBuilder()
- for (path <- split.getPartition.getFilesPath.asScala) {
- pathBuilder.append(path).append(",")
- }
- if (pathBuilder.nonEmpty) {
- pathBuilder.substring(0, pathBuilder.size - 1)
- }
- (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
- sqlContext.sparkContext
- ))
- }
- } else {
- /*
- * when data load handle by node partition
- * 1)clone the hadoop configuration,and set the file path to the configuration
- * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
- * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
- * for locally writing carbondata files(one file one block) in nodes
- * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
- */
- val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
- // FileUtils will skip file which is no csv, and return all file path which split by ','
- val filePaths = carbonLoadModel.getFactFilePath
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
- hadoopConfiguration.set("io.compression.codecs",
- """org.apache.hadoop.io.compress.GzipCodec,
- org.apache.hadoop.io.compress.DefaultCodec,
- org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
- CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
-
- val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
- val jobContext = new Job(hadoopConfiguration)
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val blockList = rawSplits.map { inputSplit =>
- val fileSplit = inputSplit.asInstanceOf[FileSplit]
- new TableBlockInfo(fileSplit.getPath.toString,
- fileSplit.getStart, "1",
- fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
- ).asInstanceOf[Distributable]
- }
- // group blocks to nodes, tasks
- val startTime = System.currentTimeMillis
- val activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
- val nodeBlockMapping =
- CarbonLoaderUtil
- .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
- .toSeq
- val timeElapsed: Long = System.currentTimeMillis - startTime
- LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
- LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
- s"No.of Nodes: ${nodeBlockMapping.size}")
- var str = ""
- nodeBlockMapping.foreach(entry => {
- val tableBlock = entry._2
- str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
- tableBlock.asScala.foreach(tableBlockInfo =>
- if (!tableBlockInfo.getLocations.exists(hostentry =>
- hostentry.equalsIgnoreCase(entry._1)
- )) {
- str = str + " , mismatch locations: " + tableBlockInfo.getLocations
- .foldLeft("")((a, b) => a + "," + b)
- }
- )
- str = str + "\n"
- }
+ val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ // FileUtils will skip file which is no csv, and return all file path which split by ','
+ val filePaths = carbonLoadModel.getFactFilePath
+ hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
+ hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+ hadoopConfiguration.set("io.compression.codecs",
+ """org.apache.hadoop.io.compress.GzipCodec,
+ org.apache.hadoop.io.compress.DefaultCodec,
+ org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
+
+ CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+
+ val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+ val jobContext = new Job(hadoopConfiguration)
+ val rawSplits = inputFormat.getSplits(jobContext).toArray
+ val blockList = rawSplits.map { inputSplit =>
+ val fileSplit = inputSplit.asInstanceOf[FileSplit]
+ new TableBlockInfo(fileSplit.getPath.toString,
+ fileSplit.getStart, "1",
+ fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
+ ).asInstanceOf[Distributable]
+ }
+ // group blocks to nodes, tasks
+ val startTime = System.currentTimeMillis
+ val activeNodes = DistributionUtil
+ .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+ val nodeBlockMapping =
+ CarbonLoaderUtil
+ .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+ .toSeq
+ val timeElapsed: Long = System.currentTimeMillis - startTime
+ LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
+ LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
+ s"No.of Nodes: ${nodeBlockMapping.size}")
+ var str = ""
+ nodeBlockMapping.foreach(entry => {
+ val tableBlock = entry._2
+ str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
+ tableBlock.asScala.foreach(tableBlockInfo =>
+ if (!tableBlockInfo.getLocations.exists(hostentry =>
+ hostentry.equalsIgnoreCase(entry._1)
+ )) {
+ str = str + " , mismatch locations: " + tableBlockInfo.getLocations
+ .foldLeft("")((a, b) => a + "," + b)
+ }
)
- LOGGER.info(str)
- blocksGroupBy = nodeBlockMapping.map(entry => {
- val blockDetailsList =
- entry._2.asScala.map(distributable => {
- val tableBlock = distributable.asInstanceOf[TableBlockInfo]
- new BlockDetails(new Path(tableBlock.getFilePath),
- tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
- )
- }).toArray
- (entry._1, blockDetailsList)
- }
- ).toArray
+ str = str + "\n"
}
+ )
+ LOGGER.info(str)
+ blocksGroupBy = nodeBlockMapping.map { entry =>
+ val blockDetailsList =
+ entry._2.asScala.map { distributable =>
+ val tableBlock = distributable.asInstanceOf[TableBlockInfo]
+ new BlockDetails(new Path(tableBlock.getFilePath),
+ tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
+ )
+ }.toArray
+ (entry._1, blockDetailsList)
+ }.toArray
- status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
+ status = new NewCarbonDataLoadRDD(
+ sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
- blocksGroupBy,
- isTableSplitPartition).collect()
+ blocksGroupBy
+ ).collect()
}
def loadDataFrame(): Unit = {