You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/18 08:25:10 UTC

carbondata git commit: [CARBONDATA-1805][Dictionary] Optimize pruning for dictionary loading

Repository: carbondata
Updated Branches:
  refs/heads/master 4daf0634e -> 6e224dce6


[CARBONDATA-1805][Dictionary] Optimize pruning for dictionary loading

Refactor the loadDataFrame method in GlobalDictionaryUtil, do not convert all columns to spark InternalRow, convert dictionary columns only instead.

This closes #1559


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6e224dce
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6e224dce
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6e224dce

Branch: refs/heads/master
Commit: 6e224dce62c57c2b6c7a8df047663e64c6d77c4b
Parents: 4daf063
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Nov 28 22:42:29 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Dec 18 16:24:49 2017 +0800

----------------------------------------------------------------------
 .../spark/util/GlobalDictionaryUtil.scala       | 95 ++++++++++++--------
 1 file changed, 60 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e224dce/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index ee2c843..330582f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -37,7 +37,6 @@ import org.apache.spark.{Accumulator, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{NewHadoopRDD, RDD}
 import org.apache.spark.sql._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -346,36 +345,57 @@ object GlobalDictionaryUtil {
   }
 
   /**
-   * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
+   * load and prune dictionary Rdd from csv file or input dataframe
    *
-   * @param sqlContext      SQLContext
-   * @param carbonLoadModel carbon data load model
+   * @param sqlContext sqlContext
+   * @param carbonLoadModel carbonLoadModel
+   * @param inputDF input dataframe
+   * @param requiredCols names of dictionary column
+   * @param hadoopConf hadoop configuration
+   * @return rdd that contains only dictionary columns
    */
-  def loadDataFrame(sqlContext: SQLContext,
+  private def loadInputDataAsDictRdd(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration): DataFrame = {
-    CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
-    hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
-    val columnNames = carbonLoadModel.getCsvHeaderColumns
-    val schema = StructType(columnNames.map[StructField, Array[StructField]] { column =>
-      StructField(column, StringType)
-    })
-    val values = new Array[String](columnNames.length)
-    val row = new StringArrayRow(values)
-    val jobConf = new JobConf(hadoopConf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
-    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
-      Array[Path](new Path(carbonLoadModel.getFactFilePath)),
-      jobConf)
-    val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
-      sqlContext.sparkContext,
-      classOf[CSVInputFormat],
-      classOf[NullWritable],
-      classOf[StringArrayWritable],
-      jobConf).setName("global dictionary").map[Row] { currentRow =>
-      row.setValues(currentRow._2.get())
+      inputDF: Option[DataFrame],
+      requiredCols: Array[String],
+      hadoopConf: Configuration): RDD[Row] = {
+    if (inputDF.isDefined) {
+      inputDF.get.select(requiredCols.head, requiredCols.tail : _*).rdd
+    } else {
+      CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
+      hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+      val headerCols = carbonLoadModel.getCsvHeaderColumns.map(_.toLowerCase)
+      val header2Idx = headerCols.zipWithIndex.toMap
+      // index of dictionary columns in header
+      val dictColIdx = requiredCols.map(c => header2Idx(c.toLowerCase))
+
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
+      TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+        Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+        jobConf)
+      val dictRdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
+        sqlContext.sparkContext,
+        classOf[CSVInputFormat],
+        classOf[NullWritable],
+        classOf[StringArrayWritable],
+        jobConf)
+        .setName("global dictionary")
+        .map[Row] { currentRow =>
+        val rawRow = currentRow._2.get()
+        val destRow = new Array[String](dictColIdx.length)
+        for (i <- dictColIdx.indices) {
+          // dictionary index in this row
+          val idx = dictColIdx(i)
+          // copy specific dictionary value from source to dest
+          if (idx < rawRow.length) {
+            System.arraycopy(rawRow, idx, destRow, i, 1)
+          }
+        }
+        Row.fromSeq(destRow)
+      }
+      dictRdd
     }
-    sqlContext.createDataFrame(rdd, schema)
   }
 
   // Hack for spark2 integration
@@ -694,21 +714,25 @@ object GlobalDictionaryUtil {
         carbonTable.getTableName).asScala.toArray
       // generate global dict from pre defined column dict file
       carbonLoadModel.initPredefDictMap()
-
       val allDictionaryPath = carbonLoadModel.getAllDictPath
       if (StringUtils.isEmpty(allDictionaryPath)) {
         LOGGER.info("Generate global dictionary from source data files!")
         // load data by using dataSource com.databricks.spark.csv
-        var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel, hadoopConf))
-        var headers = carbonLoadModel.getCsvHeaderColumns
-        headers = headers.map(headerName => headerName.trim)
+        val headers = carbonLoadModel.getCsvHeaderColumns.map(_.trim)
         val colDictFilePath = carbonLoadModel.getColDictFilePath
         if (colDictFilePath != null) {
           // generate predefined dictionary
           generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
             dimensions, carbonLoadModel, sqlContext, dictfolderPath)
         }
-        if (headers.length > df.columns.length) {
+
+        val headerOfInputData: Array[String] = if (dataFrame.isDefined) {
+          dataFrame.get.columns
+        } else {
+          headers
+        }
+
+        if (headers.length > headerOfInputData.length) {
           val msg = "The number of columns in the file header do not match the " +
                     "number of columns in the data file; Either delimiter " +
                     "or fileheader provided is not correct"
@@ -717,14 +741,15 @@ object GlobalDictionaryUtil {
         }
         // use fact file to generate global dict
         val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
-          headers, df.columns)
+          headers, headerOfInputData)
         if (requireDimension.nonEmpty) {
           // select column to push down pruning
-          df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
+          val dictRdd = loadInputDataAsDictRdd(sqlContext, carbonLoadModel, dataFrame,
+            requireColumnNames, hadoopConf)
           val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
             requireDimension, dictfolderPath, false)
           // combine distinct value in a block and partition by column
-          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
+          val inputRDD = new CarbonBlockDistinctValuesCombineRDD(dictRdd, model)
             .partitionBy(new ColumnPartitioner(model.primDimensions.length))
           // generate global dictionary files
           val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()