You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/29 09:15:47 UTC

[1/2] incubator-carbondata git commit: DataLoadCoalescedRDD

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 879bfe742 -> 567fa5131


DataLoadCoalescedRDD

DataLoadPartitionCoalescer

concurrently read dataframe

add test case

fix comments

fix comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f8a0c876
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f8a0c876
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f8a0c876

Branch: refs/heads/master
Commit: f8a0c876158be256119219bde4cce0e074acf03a
Parents: 879bfe7
Author: QiangCai <da...@gmail.com>
Authored: Mon Oct 24 10:54:20 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Nov 29 17:06:23 2016 +0800

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataLoadRDD.scala           |  96 ++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  88 +++--
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  11 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  51 +++
 .../spark/util/GlobalDictionaryUtil.scala       |  11 +-
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  68 ++++
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++++++++++++++++++
 .../spark/sql/hive/DistributionUtil.scala       |  19 +-
 .../org/apache/spark/util/TaskContextUtil.scala |  29 ++
 .../TestDataLoadPartitionCoalescer.scala        | 170 +++++++++
 .../spark/util/AllDictionaryTestCase.scala      |   9 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  14 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |  23 +-
 .../util/GlobalDictionaryUtilTestCase.scala     |  10 +-
 .../processing/csvreaderstep/CsvInput.java      |  73 +++-
 .../csvreaderstep/JavaRddIterator.java          |  32 ++
 .../processing/csvreaderstep/RddInputUtils.java |  11 +-
 17 files changed, 921 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 87b5673..e306a89 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -28,9 +28,12 @@ import scala.util.Random
 
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.DataLoadCoalescedRDD
+import org.apache.spark.rdd.DataLoadPartitionWrap
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 import org.apache.spark.sql.Row
+import org.apache.spark.util.TaskContextUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
@@ -38,6 +41,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
 import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator
@@ -46,6 +50,7 @@ import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * This partition class use to split by TableSplit
@@ -125,6 +130,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
     try {
       CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
         kettleHomePath)
+      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
     } catch {
       case e: DataLoadingException => if (e.getErrorCode ==
                                           DataProcessorConstants.BAD_REC_FOUND) {
@@ -235,14 +241,11 @@ class DataFileLoaderRDD[K, V](
                                theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         setModelAndBlocksInfo()
         val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
           kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize()
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        loader.initialize
         if (model.isRetentionRequest) {
           recreateAggregationTableForRetention
         } else if (model.isAggLoadRequest) {
@@ -495,7 +498,7 @@ class DataFrameLoaderRDD[K, V](
     loadCount: Integer,
     tableCreationTime: Long,
     schemaLastUpdatedTime: Long,
-    prev: RDD[Row]) extends RDD[(K, V)](prev) {
+    prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
@@ -509,18 +512,19 @@ class DataFrameLoaderRDD[K, V](
                                theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
           kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize()
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        loader.initialize
         val rddIteratorKey = UUID.randomUUID().toString
         try {
           RddInputUtils.put(rddIteratorKey,
-            new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
+              new PartitionIterator(
+                  firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
+                  carbonLoadModel,
+                  context))
           carbonLoadModel.setRddIteratorKey(rddIteratorKey)
           loader.run()
         } finally {
@@ -548,77 +552,53 @@ class DataFrameLoaderRDD[K, V](
   override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
 }
 
+class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
+    carbonLoadModel: CarbonLoadModel,
+    context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
+  def hasNext: Boolean = partitionIter.hasNext
+  def next: JavaRddIterator[Array[String]] = {
+    val value = partitionIter.next
+    new RddIterator(value.rdd.iterator(value.partition, context),
+        carbonLoadModel,
+        context)
+  }
+  def initialize: Unit = {
+    TaskContextUtil.setTaskContext(context)
+  }
+}
 /**
  * This class wrap Scala's Iterator to Java's Iterator.
  * It also convert all columns to string data to use csv data loading flow.
  *
  * @param rddIter
  * @param carbonLoadModel
+ * @param context
  */
 class RddIterator(rddIter: Iterator[Row],
-    carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+                  carbonLoadModel: CarbonLoadModel,
+                  context: TaskContext) extends JavaRddIterator[Array[String]] {
+
   val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
   val format = new SimpleDateFormat(formatString)
   val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
   val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-
+  val serializationNullFormat =
+      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
   def hasNext: Boolean = rddIter.hasNext
 
-  private def getString(value: Any, level: Int = 1): String = {
-    if (value == null) {
-      ""
-    } else {
-      value match {
-        case s: String => s
-        case i: java.lang.Integer => i.toString
-        case d: java.lang.Double => d.toString
-        case t: java.sql.Timestamp => format format t
-        case d: java.sql.Date => format format d
-        case d: java.math.BigDecimal => d.toPlainString
-        case b: java.lang.Boolean => b.toString
-        case s: java.lang.Short => s.toString
-        case f: java.lang.Float => f.toString
-        case bs: Array[Byte] => new String(bs)
-        case s: scala.collection.Seq[Any] =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          s.foreach { x =>
-            builder.append(getString(x, level + 1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - 1)
-        case m: scala.collection.Map[Any, Any] =>
-          throw new Exception("Unsupported data type: Map")
-        case r: org.apache.spark.sql.Row =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
-          val builder = new StringBuilder()
-          for (i <- 0 until r.length) {
-            builder.append(getString(r(i), level + 1)).append(delimiter)
-          }
-          builder.substring(0, builder.length - 1)
-        case other => other.toString
-      }
-    }
-  }
-
   def next: Array[String] = {
     val row = rddIter.next()
     val columns = new Array[String](row.length)
-    for (i <- 0 until row.length) {
-      columns(i) = getString(row(i))
+    for (i <- 0 until columns.length) {
+      columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
+          delimiterLevel1, delimiterLevel2, format)
     }
     columns
   }
 
-  def remove(): Unit = {
+  def initialize: Unit = {
+    TaskContextUtil.setTaskContext(context)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6c09607..c30ead7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -30,7 +30,8 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkContext, SparkEnv, SparkException}
-import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
@@ -902,21 +903,30 @@ object CarbonDataRDDFactory {
       }
 
       def loadDataFrame(): Unit = {
-        var rdd = dataFrame.get.rdd
-        var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
-        numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
-        rdd = rdd.coalesce(numPartitions, shuffle = false)
-
-        status = new DataFrameLoaderRDD(sqlContext.sparkContext,
-          new DataLoadResultImpl(),
-          carbonLoadModel,
-          storePath,
-          kettleHomePath,
-          columinar,
-          currentLoadCount,
-          tableCreationTime,
-          schemaLastUpdatedTime,
-          rdd).collect()
+        try {
+          val rdd = dataFrame.get.rdd
+          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
+            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+          }.distinct.size
+          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+            sqlContext.sparkContext)
+          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+
+          status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+            new DataLoadResultImpl(),
+            carbonLoadModel,
+            storePath,
+            kettleHomePath,
+            columinar,
+            currentLoadCount,
+            tableCreationTime,
+            schemaLastUpdatedTime,
+            newRdd).collect()
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex, "load data frame failed")
+            throw ex
+        }
       }
 
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
@@ -932,28 +942,32 @@ object CarbonDataRDDFactory {
           loadDataFile()
         }
         val newStatusMap = scala.collection.mutable.Map.empty[String, String]
-        status.foreach { eachLoadStatus =>
-          val state = newStatusMap.get(eachLoadStatus._1)
-          state match {
-            case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-              newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-            case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-              if eachLoadStatus._2.getLoadStatus ==
-                 CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-              newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
-            case _ =>
-              newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+        if (status.nonEmpty) {
+          status.foreach { eachLoadStatus =>
+            val state = newStatusMap.get(eachLoadStatus._1)
+            state match {
+              case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+              case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+                if eachLoadStatus._2.getLoadStatus ==
+                    CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+              case _ =>
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+            }
           }
-        }
 
-        newStatusMap.foreach {
-          case (key, value) =>
-            if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-              loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-            } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-                       !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
-              loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-            }
+          newStatusMap.foreach {
+            case (key, value) =>
+              if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+              } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+                  !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+              }
+          }
+        } else {
+          loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
         }
 
         if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
@@ -1116,6 +1130,4 @@ object CarbonDataRDDFactory {
       CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index ced45b7..c91cec0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.spark.rdd
 
 import java.io.{DataInputStream, InputStreamReader}
 import java.nio.charset.Charset
+import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 
 import scala.collection.mutable
@@ -42,6 +43,7 @@ import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
 
@@ -157,7 +159,8 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
     isFirstLoad: Boolean,
     hdfsTempLocation: String,
     lockType: String,
-    zooKeeperUrl: String) extends Serializable
+    zooKeeperUrl: String,
+    serializationNullFormat: String) extends Serializable
 
 case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
 
@@ -251,13 +254,17 @@ class CarbonBlockDistinctValuesCombineRDD(
       val dimNum = model.dimensions.length
       var row: Row = null
       val rddIter = firstParent[Row].iterator(split, context)
+      val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+          .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      val format = new SimpleDateFormat(formatString)
       // generate block distinct value set
       while (rddIter.hasNext) {
         row = rddIter.next()
         if (row != null) {
           rowCount += 1
           for (i <- 0 until dimNum) {
-            dimensionParsers(i).parseString(row.getString(i))
+            dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
+                model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index afc6cc5..d91a012 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.util
 
 import java.io.File
+import java.text.SimpleDateFormat
 
 import scala.collection.JavaConverters._
 
@@ -165,4 +166,54 @@ object CarbonScalaUtil {
     kettleHomePath
   }
 
+  def getString(value: Any,
+      serializationNullFormat: String,
+      delimiterLevel1: String,
+      delimiterLevel2: String,
+      format: SimpleDateFormat,
+      level: Int = 1): String = {
+    if (value == null) {
+      serializationNullFormat
+    } else {
+      value match {
+        case s: String => s
+        case d: java.math.BigDecimal => d.toPlainString
+        case i: java.lang.Integer => i.toString
+        case d: java.lang.Double => d.toString
+        case t: java.sql.Timestamp => format format t
+        case d: java.sql.Date => format format d
+        case b: java.lang.Boolean => b.toString
+        case s: java.lang.Short => s.toString
+        case f: java.lang.Float => f.toString
+        case bs: Array[Byte] => new String(bs)
+        case s: scala.collection.Seq[Any] =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          s.foreach { x =>
+            builder.append(getString(x, serializationNullFormat, delimiterLevel1,
+                delimiterLevel2, format, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case m: scala.collection.Map[Any, Any] =>
+          throw new Exception("Unsupported data type: Map")
+        case r: org.apache.spark.sql.Row =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          for (i <- 0 until r.length) {
+            builder.append(getString(r(i), serializationNullFormat, delimiterLevel1,
+                delimiterLevel2, format, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - 1)
+        case other => other.toString
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index f17c62b..9a4e209 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -323,6 +323,8 @@ object GlobalDictionaryUtil {
       CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
       CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
 
+    val serializationNullFormat =
+      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
     // get load count
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CarbonDataRDDFactory.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
@@ -343,7 +345,8 @@ object GlobalDictionaryUtil {
       carbonLoadModel.getLoadMetadataDetails.size() == 0,
       hdfsTempLocation,
       lockType,
-      zookeeperUrl)
+      zookeeperUrl,
+      serializationNullFormat)
   }
 
   /**
@@ -763,11 +766,7 @@ object GlobalDictionaryUtil {
       if (StringUtils.isEmpty(allDictionaryPath)) {
         LOGGER.info("Generate global dictionary from source data files!")
         // load data by using dataSource com.databricks.spark.csv
-        var df = if (dataFrame.isDefined) {
-          dataFrame.get
-        } else {
-          loadDataFrame(sqlContext, carbonLoadModel)
-        }
+        var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
         var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
           df.columns
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
new file mode 100644
index 0000000..3acde94
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+
+case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
+
+class DataLoadCoalescedRDD[T: ClassTag](
+  @transient var prev: RDD[T],
+  nodeList: Array[String])
+    extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) with Logging {
+
+  override def getPartitions: Array[Partition] = {
+    new DataLoadPartitionCoalescer(prev, nodeList).run
+  }
+
+  override def compute(split: Partition,
+      context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+
+    new Iterator[DataLoadPartitionWrap[T]] {
+      val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
+      def hasNext = iter.hasNext
+      def next: DataLoadPartitionWrap[T] = {
+        DataLoadPartitionWrap(firstParent[T], iter.next())
+      }
+    }
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    Seq(new NarrowDependency(prev) {
+      def getParents(id: Int): Seq[Int] =
+        partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
+    })
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
+
+  /**
+   * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+   * then the preferred machine will be one which most parent splits prefer too.
+   * @param partition
+   * @return the machine most preferred by split
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..8e0971c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.LinkedHashSet
+
+import org.apache.spark.Logging
+import org.apache.spark.Partition
+import org.apache.spark.scheduler.TaskLocation
+
+/**
+ * DataLoadPartitionCoalescer
+ * Repartition the partitions of rdd to few partitions, one partition per node.
+ * exmaple:
+ * blk_hst  host1 host2 host3 host4 host5
+ * block1   host1 host2 host3
+ * block2         host2       host4 host5
+ * block3               host3 host4 host5
+ * block4   host1 host2       host4
+ * block5   host1       host3 host4
+ * block6   host1 host2             host5
+ * -------------------------------------------------------
+ * 1. sort host by number of blocks
+ * -------------------------------------------------------
+ * host3: block1 block3 block5
+ * host5: block2 block3 block6
+ * host1: block1 block4 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 2. sort blocks of each host1
+ * new partitions are before old partitions
+ * -------------------------------------------------------
+ * host3:                      block1 block3 block5
+ * host5:        block2 block6+block3
+ * host1: block4+block1 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 3. assign blocks to host
+ * -------------------------------------------------------
+ * step1: host3 choose block1, remove from host1, host2
+ * step2: host5 choose block2, remove from host2, host4
+ * step3: host1 choose block4, .....
+ * -------------------------------------------------------
+ * result:
+ * host3:                      block1       block5
+ * host5:        block2
+ * host1: block4
+ * host2:                      block6
+ * host4:        block3
+ */
+class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) extends Logging {
+
+  val prevPartitions = prev.partitions
+  var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length))
+  // host => partition id list
+  val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
+  // partition id => host list
+  val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
+  val noLocalityPartitions = new ArrayBuffer[Int]
+  var noLocality = true
+  /**
+   * assign a task location for a partition
+   */
+  private def getLocation(index: Int): Option[String] = {
+    if (index < nodeList.length) {
+      Some(nodeList(index))
+    } else {
+      None
+    }
+  }
+
+  /**
+   * collect partitions to each node
+   */
+  private def groupByNode(): Unit = {
+    // initialize hostMapPartitionIds
+    nodeList.foreach { node =>
+      val map = new LinkedHashSet[Int]
+      hostMapPartitionIds.put(node, map)
+    }
+    // collect partitions for each node
+    val tmpNoLocalityPartitions = new ArrayBuffer[Int]
+    prevPartitions.foreach { p =>
+      val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
+      if (locs.isEmpty) {
+        // if a partition has no location, add to noLocalityPartitions
+        tmpNoLocalityPartitions += p.index
+      } else {
+        // add partion to hostMapPartitionIds and partitionIdMapHosts
+        locs.foreach { loc =>
+          val host = loc.host
+          hostMapPartitionIds.get(host) match {
+            // if the location of the partition is not in node list,
+            // will add this partition to noLocalityPartitions
+            case None => tmpNoLocalityPartitions += p.index
+            case Some(ids) =>
+              noLocality = false
+              ids += p.index
+              partitionIdMapHosts.get(p.index) match {
+                case None =>
+                  val hosts = new ArrayBuffer[String]
+                  hosts += host
+                  partitionIdMapHosts.put(p.index, hosts)
+                case Some(hosts) =>
+                  hosts += host
+              }
+          }
+        }
+      }
+    }
+
+    // remove locality partition
+    tmpNoLocalityPartitions.distinct.foreach {index =>
+      partitionIdMapHosts.get(index) match {
+        case None => noLocalityPartitions += index
+        case Some(_) =>
+      }
+    }
+  }
+
+  /**
+   * sort host and partitions
+   */
+  private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = {
+    val oldPartitionIdSet = new HashSet[Int]
+    // sort host by number of partitions
+    hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc =>
+      // order: newPartitionIds + oldPartitionIds
+      val sortedPartitionIdSet = new LinkedHashSet[Int]
+      var newPartitionIds = new ArrayBuffer[Int]
+      var oldPartitionIds = new ArrayBuffer[Int]
+      loc._2.foreach { p =>
+        if (oldPartitionIdSet.contains(p)) {
+          oldPartitionIds += p
+        } else {
+          newPartitionIds += p
+          oldPartitionIdSet.add(p)
+        }
+      }
+      // sort and add new partitions
+      newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+      // sort and add old partitions
+      oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+      // update hostMapPartitionIds
+      hostMapPartitionIds.put(loc._1, sortedPartitionIdSet)
+      (loc._1, sortedPartitionIdSet)
+    }.toArray
+  }
+
+  /**
+   *  assign locality partition to each host
+   */
+  private def assignPartitonNodeLocality(
+      noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = {
+    val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length)
+    for (i <- 0 until localityResult.length) {
+      localityResult(i) = new ArrayBuffer[Int]
+    }
+    val noEmptyHostSet = new HashSet[String]
+    noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)}
+
+    var hostIndex = 0
+    while (noEmptyHostSet.nonEmpty) {
+      val hostEntry = noEmptyHosts(hostIndex)
+      if (noEmptyHostSet.contains(hostEntry._1)) {
+        if (hostEntry._2.nonEmpty) {
+          var partitionId = hostEntry._2.iterator.next
+          localityResult(hostIndex) += partitionId
+          // remove from sortedParts
+          partitionIdMapHosts.get(partitionId) match {
+            case Some(locs) =>
+              locs.foreach { loc =>
+                hostMapPartitionIds.get(loc) match {
+                  case Some(parts) =>
+                    parts.remove(partitionId)
+                }
+              }
+          }
+        } else {
+          noEmptyHostSet.remove(hostEntry._1)
+        }
+      }
+
+      hostIndex = hostIndex + 1
+      if (hostIndex == noEmptyHosts.length) {
+        hostIndex = 0
+      }
+    }
+    localityResult
+  }
+
+  /**
+   * assign no locality partitions to each host
+   */
+  private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String],
+      noEmptyHosts: mutable.Buffer[String],
+      localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = {
+    val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length)
+    logInfo(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}")
+    val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length)
+    for (i <- 0 until noLocalityResult.length) {
+      noLocalityResult(i) = new ArrayBuffer[Int]
+    }
+    var noLocalityPartitionIndex = 0
+    if (noLocalityPartitions.nonEmpty) {
+      if (emptyHosts.nonEmpty) {
+        // at first, assign avg number to empty node
+        for (i <- 0 until avgNumber) {
+          noLocalityResult.foreach { partitionIds =>
+            if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+              partitionIds += noLocalityPartitions(noLocalityPartitionIndex)
+              noLocalityPartitionIndex = noLocalityPartitionIndex + 1
+            }
+          }
+        }
+      }
+      // still have no locality partitions
+      // assign to all hosts
+      if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+        var partIndex = 0
+        for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) {
+          if (partIndex < localityResult.length) {
+            localityResult(partIndex) += noLocalityPartitions(i)
+          } else {
+            noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i)
+          }
+          partIndex = partIndex + 1
+          if (partIndex == localityResult.length + noLocalityResult.length) {
+            partIndex = 0
+          }
+        }
+      }
+    }
+    noLocalityResult
+  }
+
+  /**
+   * no locality repartition
+   */
+  private def repartitionNoLocality(): Array[Partition] = {
+    // no locality repartition
+    logInfo("no locality partition")
+    val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts)
+    for (i <- 0 until numOfParts) {
+      prevPartIndexs(i) = new ArrayBuffer[Int]
+    }
+    for (i <- 0 until prevPartitions.length) {
+      prevPartIndexs(i % numOfParts) += prevPartitions(i).index
+    }
+    prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
+      new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
+    }
+  }
+
+  private def repartitionLocality(): Array[Partition] = {
+    logInfo("locality partition")
+    val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq
+    // empty host seq
+    val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer
+    // non empty host array
+    var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty)
+
+    // 1. do locality repartition
+    // sort host and partitions
+    tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts)
+    // assign locality partition to non empty hosts
+    val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts)
+    // collect non empty hosts and empty hosts
+    val noEmptyHosts = mutable.Buffer[String]()
+    val localityResult = mutable.Buffer[ArrayBuffer[Int]]()
+    for(index <- 0 until templocalityResult.size) {
+      if (templocalityResult(index).isEmpty) {
+        emptyHosts += tempNoEmptyHosts(index)._1
+      } else {
+        noEmptyHosts += tempNoEmptyHosts(index)._1
+        localityResult += templocalityResult(index)
+      }
+    }
+    // 2. do no locality repartition
+    // assign no locality partitions to all hosts
+    val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult)
+
+    // 3. generate CoalescedRDDPartition
+    (0 until localityResult.length + noLocalityResult.length).map { index =>
+      val ids = if (index < localityResult.length) {
+        localityResult(index).toArray
+      } else {
+        noLocalityResult(index - localityResult.length).toArray
+      }
+      val loc = if (index < localityResult.length) {
+        Some(noEmptyHosts(index))
+      } else {
+        Some(emptyHosts(index - localityResult.length))
+      }
+      logInfo(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
+      new CoalescedRDDPartition(index, prev, ids, loc)
+    }.filter(_.parentsIndices.nonEmpty).toArray
+
+  }
+
+  def run(): Array[Partition] = {
+    // 1. group partitions by node
+    groupByNode()
+    logInfo(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}")
+    val partitions = if (noLocality) {
+      // 2.A no locality partition
+      repartitionNoLocality()
+    } else {
+      // 2.B locality partition
+      repartitionLocality()
+    }
+    DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions)
+    partitions
+  }
+}
+
+object DataLoadPartitionCoalescer {
+  def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = {
+    prev.context.getPreferredLocs(prev, p.index)
+  }
+
+  def getParentsIndices(p: Partition): Array[Int] = {
+    p.asInstanceOf[CoalescedRDDPartition].parentsIndices
+  }
+
+  def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+    val prevPartIds = new ArrayBuffer[Int]
+    parts.foreach{ p =>
+      prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p)
+    }
+    // all partitions must be arranged once.
+    assert(prevPartIds.size == prevParts.size)
+    val prevPartIdsMap = prevPartIds.map{ id =>
+      (id, id)
+    }.toMap
+    prevParts.foreach{ p =>
+      prevPartIdsMap.get(p.index) match {
+        case None => assert(false, "partition " + p.index + " not found")
+        case Some(_) =>
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 02453bd..e5264ca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -103,8 +103,13 @@ object DistributionUtil {
    * @return
    */
   def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
-    sparkContext: SparkContext): Seq[String] = {
-    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
+      sparkContext: SparkContext): Seq[String] = {
+    val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+    ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext)
+  }
+
+  def ensureExecutorsByNumberAndGetNodeList(nodesOfData: Int,
+      sparkContext: SparkContext): Seq[String] = {
     var confExecutorsTemp: String = null
     if (sparkContext.getConf.contains("spark.executor.instances")) {
       confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
@@ -116,15 +121,11 @@ object DistributionUtil {
       }
     }
 
-    val confExecutors = if (null != confExecutorsTemp) {
-      confExecutorsTemp.toInt
-    } else {
-      1
-    }
-    val requiredExecutors = if (nodeMapping.size > confExecutors) {
+    val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+    val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
       confExecutors
     } else {
-      nodeMapping.size()
+      nodesOfData
     }
 
     val startTime = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
new file mode 100644
index 0000000..e73f78c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.TaskContext
+
+
+object TaskContextUtil {
+  def setTaskContext(context: TaskContext): Unit = {
+    val localThreadContext = TaskContext.get()
+    if (localThreadContext == null) {
+      TaskContext.setTaskContext(context)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..9dd74c4
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.dataload
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.rdd.{DataLoadPartitionCoalescer, RDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestDataLoadPartitionCoalescer extends QueryTest with BeforeAndAfterAll {
+  var nodeList: Array[String] = _
+
+  class DummyPartition(val index: Int,
+                       rawSplit: FileSplit) extends Partition {
+    val serializableHadoopSplit = new SerializableWritable(rawSplit)
+  }
+
+  class Dummy(sc: SparkContext, partitions: Array[Partition]) extends RDD[Row](sc, Nil) {
+    override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+      new Iterator[Row] {
+        var isFirst = true;
+        override def hasNext: Boolean = isFirst;
+
+        override def next(): Row = {
+          isFirst = false
+          new GenericRow(Array[Any]())
+        }
+      }
+    }
+
+    override protected def getPartitions: Array[Partition] = partitions
+
+    override protected def getPreferredLocations(split: Partition): Seq[String] = {
+      split.asInstanceOf[DummyPartition].serializableHadoopSplit.value.getLocations.toSeq
+    }
+
+  }
+
+  override def beforeAll: Unit = {
+    nodeList = Array("host1", "host2", "host3")
+
+  }
+
+  def createPartition(index: Int, file: String, hosts: Array[String]) : Partition = {
+    new DummyPartition(index, new FileSplit(new Path(file), 0, 1, hosts))
+  }
+
+  def repartition(parts: Array[Partition]): Array[Partition] = {
+    new DataLoadPartitionCoalescer(new Dummy(sparkContext, parts), nodeList).run
+  }
+
+  def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+    DataLoadPartitionCoalescer.checkPartition(prevParts, parts)
+  }
+
+  test("test number of partitions is more than nodes's") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3")),
+      createPartition(2, "3.csv", Array("host1", "host2", "host3")),
+      createPartition(3, "4.csv", Array("host1", "host2", "host3")),
+      createPartition(4, "5.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("test number of partitions equals nodes's") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3")),
+      createPartition(2, "3.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("test number of partitions is less than nodes's") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 2)
+    checkPartition(prevParts, parts)
+  }
+
+  test("all partitions are locality") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host3"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 2)
+    checkPartition(prevParts, parts)
+  }
+
+  test("part of partitions are locality1") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+      createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("part of partitions are locality2") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+      createPartition(2, "3.csv", Array("host3", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("part of partitions are locality3") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array("host1", "host2", "host7")),
+      createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+      createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  test("all partition are not locality") {
+    val prevParts = Array[Partition](
+      createPartition(0, "1.csv", Array()),
+      createPartition(1, "2.csv", Array()),
+      createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+    )
+    val parts = repartition(prevParts)
+    assert(parts.size == 3)
+    checkPartition(prevParts, parts)
+  }
+
+  override def afterAll {
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index ef2adbc..3263bf9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -23,18 +23,15 @@ import java.io.File
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.scalatest.BeforeAndAfterAll
-
+import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.model.CarbonLoadModel
 
 /**
   * Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
-  *
-  * @date: Apr 10, 2016 10:34:58 PM
-  * @See org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
   */
 class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -64,6 +61,8 @@ class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setComplexDelimiterLevel1("\\$")
     carbonLoadModel.setComplexDelimiterLevel2("\\:")
     carbonLoadModel.setAllDictPath(allDictFilePath)
+    carbonLoadModel.setSerializationNullFormat(
+          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 1531ade..40341a8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -20,17 +20,19 @@ package org.apache.carbondata.spark.util
 
 import java.io.File
 
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.scalatest.BeforeAndAfterAll
+
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.apache.spark.sql.common.util.CarbonHiveContext
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.constants.TableOptionConstant
+import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
   /**
  * test case for external column dictionary generation
@@ -145,6 +147,8 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
     carbonLoadModel.setComplexDelimiterLevel2("\\:")
     carbonLoadModel.setColDictFilePath(extColFilePath)
     carbonLoadModel.setQuoteChar("\"");
+    carbonLoadModel.setSerializationNullFormat(
+      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index 7d6e994..22cf2ea 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -20,28 +20,23 @@ package org.apache.carbondata.spark.util
 
 import java.io.File
 
+import java.util.concurrent.Executors
+import java.util.concurrent.Callable
+
+import scala.collection.mutable.ListBuffer
+
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.apache.spark.sql.common.util.CarbonHiveContext
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
-
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import scala.collection.mutable.ListBuffer
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
-import java.util.concurrent.FutureTask
-import java.util.concurrent.Callable
-import java.util.concurrent.TimeUnit
-
 import org.apache.carbondata.common.ext.PathFactory
-import org.apache.carbondata.core.carbon.path.CarbonTablePath
-import org.apache.carbondata.core.carbon.ColumnIdentifier
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.model.CarbonLoadModel
 
 class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAfterAll {
@@ -70,6 +65,8 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft
     carbonLoadModel.setComplexDelimiterLevel2("\\:")
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
     carbonLoadModel.setQuoteChar("\"")
+    carbonLoadModel.setSerializationNullFormat(
+      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index b78ffdb..d567ccd 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -24,18 +24,16 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.apache.spark.sql.common.util.CarbonHiveContext
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.scalatest.BeforeAndAfterAll
-
+import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.model.CarbonLoadModel
 
+
 /**
   * Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil
-  *
-  * @date: Apr 10, 2016 10:34:58 PM
-  * @See org.apache.carbondata.spark.util.GlobalDictionaryUtil
   */
 class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -71,6 +69,8 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setComplexDelimiterLevel2("\\:")
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
     carbonLoadModel.setQuoteChar("\"")
+    carbonLoadModel.setSerializationNullFormat(
+      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
     carbonLoadModel
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
index cf861bc..c64c504 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,7 +22,6 @@ package org.apache.carbondata.processing.csvreaderstep;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -384,21 +383,75 @@ public class CsvInput extends BaseStep implements StepInterface {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
               meta.getPartitionID(), System.currentTimeMillis());
     } else {
-      scanRddIterator();
+      scanRddIterator(numberOfNodes);
     }
     setOutputDone();
     return false;
   }
 
-  private void scanRddIterator() throws RuntimeException {
-    Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey);
-    if (iterator != null) {
-      try{
-        while(iterator.hasNext()){
-          putRow(data.outputRowMeta, iterator.next());
+  class RddScanCallable implements Callable<Void> {
+    List<JavaRddIterator<String[]>> iterList;
+
+    RddScanCallable() {
+      this.iterList = new ArrayList<JavaRddIterator<String[]>>(1000);
+    }
+
+    public void addJavaRddIterator(JavaRddIterator<String[]> iter) {
+      this.iterList.add(iter);
+    }
+
+    @Override
+    public Void call() throws Exception {
+      StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"),
+          Thread.currentThread().getName());
+      try {
+        String[] values = null;
+        for (JavaRddIterator<String[]> iter: iterList) {
+          iter.initialize();
+          while (iter.hasNext()) {
+            values = iter.next();
+            synchronized (putRowLock) {
+              putRow(data.outputRowMeta, values);
+            }
+          }
+        }
+      } catch (Exception e) {
+        LOGGER.error(e, "Scan rdd during data load is terminated due to error.");
+        throw e;
+      }
+      return null;
+    }
+  }
+
+  private void scanRddIterator(int numberOfNodes) throws RuntimeException {
+    JavaRddIterator<JavaRddIterator<String[]>> iter = RddInputUtils.getAndRemove(rddIteratorKey);
+    if (iter != null) {
+      iter.initialize();
+      exec = Executors.newFixedThreadPool(numberOfNodes);
+      List<Future<Void>> results = new ArrayList<Future<Void>>(numberOfNodes);
+      RddScanCallable[] calls = new RddScanCallable[numberOfNodes];
+      for (int i = 0; i < numberOfNodes; i++ ) {
+        calls[i] = new RddScanCallable();
+      }
+      int index = 0 ;
+      while (iter.hasNext()) {
+        calls[index].addJavaRddIterator(iter.next());
+        index = index + 1;
+        if (index == numberOfNodes) {
+          index = 0;
+        }
+      }
+      for (RddScanCallable call: calls) {
+        results.add(exec.submit(call));
+      }
+      try {
+        for (Future<Void> futrue : results) {
+          futrue.get();
         }
-      } catch (KettleException e) {
-        throw new RuntimeException(e);
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException("Thread InterruptedException", e);
+      } finally {
+        exec.shutdownNow();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
new file mode 100644
index 0000000..9e11816
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.csvreaderstep;
+/**
+ * JavaRddIterator wrap spark rdd iterator.
+ * It can avoid this module dependency spark module.
+ * @param <E>
+ */
+public interface JavaRddIterator<E> {
+
+  boolean hasNext();
+
+  E next();
+
+  void initialize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
index f9a0429..b3dfdab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
@@ -20,19 +20,18 @@
 package org.apache.carbondata.processing.csvreaderstep;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 public class RddInputUtils {
-  private static Map<String, Iterator<String[]>> iteratorMap = new HashMap<String,
-      Iterator<String[]>>();
+  private static Map<String, JavaRddIterator<JavaRddIterator<String[]>>> iteratorMap = new
+      HashMap<String, JavaRddIterator<JavaRddIterator<String[]>>>();
 
-  public static void put(String key, Iterator<String[]> value) {
+  public static void put(String key, JavaRddIterator<JavaRddIterator<String[]>> value) {
     iteratorMap.put(key, value);
   }
 
-  public static Iterator<String[]> getAndRemove(String key) {
-    Iterator<String[]> iter = iteratorMap.get(key);
+  public static JavaRddIterator<JavaRddIterator<String[]>> getAndRemove(String key) {
+    JavaRddIterator<JavaRddIterator<String[]>> iter = iteratorMap.get(key);
     remove(key);
     return iter;
   }


[2/2] incubator-carbondata git commit: [CARBONDATA-368]Imporve performance of dataframe loading This closes #278

Posted by ja...@apache.org.
[CARBONDATA-368]Imporve performance of dataframe loading This closes #278


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/567fa513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/567fa513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/567fa513

Branch: refs/heads/master
Commit: 567fa5131628b70c8c4829368fda6d48cb013af3
Parents: 879bfe7 f8a0c87
Author: jackylk <ja...@huawei.com>
Authored: Tue Nov 29 17:15:20 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Nov 29 17:15:20 2016 +0800

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataLoadRDD.scala           |  96 ++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  88 +++--
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  11 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  51 +++
 .../spark/util/GlobalDictionaryUtil.scala       |  11 +-
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  68 ++++
 .../spark/rdd/DataLoadPartitionCoalescer.scala  | 363 +++++++++++++++++++
 .../spark/sql/hive/DistributionUtil.scala       |  19 +-
 .../org/apache/spark/util/TaskContextUtil.scala |  29 ++
 .../TestDataLoadPartitionCoalescer.scala        | 170 +++++++++
 .../spark/util/AllDictionaryTestCase.scala      |   9 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  14 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |  23 +-
 .../util/GlobalDictionaryUtilTestCase.scala     |  10 +-
 .../processing/csvreaderstep/CsvInput.java      |  73 +++-
 .../csvreaderstep/JavaRddIterator.java          |  32 ++
 .../processing/csvreaderstep/RddInputUtils.java |  11 +-
 17 files changed, 921 insertions(+), 157 deletions(-)
----------------------------------------------------------------------