You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/18 08:25:10 UTC
carbondata git commit: [CARBONDATA-1805][Dictionary] Optimize pruning
for dictionary loading
Repository: carbondata
Updated Branches:
refs/heads/master 4daf0634e -> 6e224dce6
[CARBONDATA-1805][Dictionary] Optimize pruning for dictionary loading
Refactor the loadDataFrame method in GlobalDictionaryUtil, do not convert all columns to spark InternalRow, convert dictionary columns only instead.
This closes #1559
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6e224dce
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6e224dce
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6e224dce
Branch: refs/heads/master
Commit: 6e224dce62c57c2b6c7a8df047663e64c6d77c4b
Parents: 4daf063
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Nov 28 22:42:29 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Dec 18 16:24:49 2017 +0800
----------------------------------------------------------------------
.../spark/util/GlobalDictionaryUtil.scala | 95 ++++++++++++--------
1 file changed, 60 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e224dce/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index ee2c843..330582f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -37,7 +37,6 @@ import org.apache.spark.{Accumulator, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.sql._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.util.FileUtils
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -346,36 +345,57 @@ object GlobalDictionaryUtil {
}
/**
- * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
+ * load and prune dictionary Rdd from csv file or input dataframe
*
- * @param sqlContext SQLContext
- * @param carbonLoadModel carbon data load model
+ * @param sqlContext sqlContext
+ * @param carbonLoadModel carbonLoadModel
+ * @param inputDF input dataframe
+ * @param requiredCols names of dictionary column
+ * @param hadoopConf hadoop configuration
+ * @return rdd that contains only dictionary columns
*/
- def loadDataFrame(sqlContext: SQLContext,
+ private def loadInputDataAsDictRdd(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- hadoopConf: Configuration): DataFrame = {
- CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
- hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
- val columnNames = carbonLoadModel.getCsvHeaderColumns
- val schema = StructType(columnNames.map[StructField, Array[StructField]] { column =>
- StructField(column, StringType)
- })
- val values = new Array[String](columnNames.length)
- val row = new StringArrayRow(values)
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
- Array[Path](new Path(carbonLoadModel.getFactFilePath)),
- jobConf)
- val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
- sqlContext.sparkContext,
- classOf[CSVInputFormat],
- classOf[NullWritable],
- classOf[StringArrayWritable],
- jobConf).setName("global dictionary").map[Row] { currentRow =>
- row.setValues(currentRow._2.get())
+ inputDF: Option[DataFrame],
+ requiredCols: Array[String],
+ hadoopConf: Configuration): RDD[Row] = {
+ if (inputDF.isDefined) {
+ inputDF.get.select(requiredCols.head, requiredCols.tail : _*).rdd
+ } else {
+ CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+ hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+ val headerCols = carbonLoadModel.getCsvHeaderColumns.map(_.toLowerCase)
+ val header2Idx = headerCols.zipWithIndex.toMap
+ // index of dictionary columns in header
+ val dictColIdx = requiredCols.map(c => header2Idx(c.toLowerCase))
+
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+ Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+ jobConf)
+ val dictRdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
+ sqlContext.sparkContext,
+ classOf[CSVInputFormat],
+ classOf[NullWritable],
+ classOf[StringArrayWritable],
+ jobConf)
+ .setName("global dictionary")
+ .map[Row] { currentRow =>
+ val rawRow = currentRow._2.get()
+ val destRow = new Array[String](dictColIdx.length)
+ for (i <- dictColIdx.indices) {
+ // dictionary index in this row
+ val idx = dictColIdx(i)
+ // copy specific dictionary value from source to dest
+ if (idx < rawRow.length) {
+ System.arraycopy(rawRow, idx, destRow, i, 1)
+ }
+ }
+ Row.fromSeq(destRow)
+ }
+ dictRdd
}
- sqlContext.createDataFrame(rdd, schema)
}
// Hack for spark2 integration
@@ -694,21 +714,25 @@ object GlobalDictionaryUtil {
carbonTable.getTableName).asScala.toArray
// generate global dict from pre defined column dict file
carbonLoadModel.initPredefDictMap()
-
val allDictionaryPath = carbonLoadModel.getAllDictPath
if (StringUtils.isEmpty(allDictionaryPath)) {
LOGGER.info("Generate global dictionary from source data files!")
// load data by using dataSource com.databricks.spark.csv
- var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel, hadoopConf))
- var headers = carbonLoadModel.getCsvHeaderColumns
- headers = headers.map(headerName => headerName.trim)
+ val headers = carbonLoadModel.getCsvHeaderColumns.map(_.trim)
val colDictFilePath = carbonLoadModel.getColDictFilePath
if (colDictFilePath != null) {
// generate predefined dictionary
generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
dimensions, carbonLoadModel, sqlContext, dictfolderPath)
}
- if (headers.length > df.columns.length) {
+
+ val headerOfInputData: Array[String] = if (dataFrame.isDefined) {
+ dataFrame.get.columns
+ } else {
+ headers
+ }
+
+ if (headers.length > headerOfInputData.length) {
val msg = "The number of columns in the file header do not match the " +
"number of columns in the data file; Either delimiter " +
"or fileheader provided is not correct"
@@ -717,14 +741,15 @@ object GlobalDictionaryUtil {
}
// use fact file to generate global dict
val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
- headers, df.columns)
+ headers, headerOfInputData)
if (requireDimension.nonEmpty) {
// select column to push down pruning
- df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
+ val dictRdd = loadInputDataAsDictRdd(sqlContext, carbonLoadModel, dataFrame,
+ requireColumnNames, hadoopConf)
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
requireDimension, dictfolderPath, false)
// combine distinct value in a block and partition by column
- val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
+ val inputRDD = new CarbonBlockDistinctValuesCombineRDD(dictRdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
// generate global dictionary files
val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()