You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/29 09:15:47 UTC
[1/2] incubator-carbondata git commit: DataLoadCoalescedRDD
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 879bfe742 -> 567fa5131
DataLoadCoalescedRDD
DataLoadPartitionCoalescer
concurrently read dataframe
add test case
fix comments
fix comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f8a0c876
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f8a0c876
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f8a0c876
Branch: refs/heads/master
Commit: f8a0c876158be256119219bde4cce0e074acf03a
Parents: 879bfe7
Author: QiangCai <da...@gmail.com>
Authored: Mon Oct 24 10:54:20 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Nov 29 17:06:23 2016 +0800
----------------------------------------------------------------------
.../spark/rdd/CarbonDataLoadRDD.scala | 96 ++---
.../spark/rdd/CarbonDataRDDFactory.scala | 88 +++--
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 11 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 51 +++
.../spark/util/GlobalDictionaryUtil.scala | 11 +-
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 68 ++++
.../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++++++++++++++++++
.../spark/sql/hive/DistributionUtil.scala | 19 +-
.../org/apache/spark/util/TaskContextUtil.scala | 29 ++
.../TestDataLoadPartitionCoalescer.scala | 170 +++++++++
.../spark/util/AllDictionaryTestCase.scala | 9 +-
.../util/ExternalColumnDictionaryTestCase.scala | 14 +-
...GlobalDictionaryUtilConcurrentTestCase.scala | 23 +-
.../util/GlobalDictionaryUtilTestCase.scala | 10 +-
.../processing/csvreaderstep/CsvInput.java | 73 +++-
.../csvreaderstep/JavaRddIterator.java | 32 ++
.../processing/csvreaderstep/RddInputUtils.java | 11 +-
17 files changed, 921 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 87b5673..e306a89 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -28,9 +28,12 @@ import scala.util.Random
import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.DataLoadCoalescedRDD
+import org.apache.spark.rdd.DataLoadPartitionWrap
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
import org.apache.spark.sql.Row
+import org.apache.spark.util.TaskContextUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
@@ -38,6 +41,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.processing.constants.DataProcessorConstants
+import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.graphgenerator.GraphGenerator
@@ -46,6 +50,7 @@ import org.apache.carbondata.spark.DataLoadResult
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.CarbonQueryUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
* This partition class use to split by TableSplit
@@ -125,6 +130,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
try {
CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
kettleHomePath)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
} catch {
case e: DataLoadingException => if (e.getErrorCode ==
DataProcessorConstants.BAD_REC_FOUND) {
@@ -235,14 +241,11 @@ class DataFileLoaderRDD[K, V](
theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-
carbonLoadModel.setSegmentId(String.valueOf(loadCount))
setModelAndBlocksInfo()
val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize()
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loader.initialize
if (model.isRetentionRequest) {
recreateAggregationTableForRetention
} else if (model.isAggLoadRequest) {
@@ -495,7 +498,7 @@ class DataFrameLoaderRDD[K, V](
loadCount: Integer,
tableCreationTime: Long,
schemaLastUpdatedTime: Long,
- prev: RDD[Row]) extends RDD[(K, V)](prev) {
+ prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
@@ -509,18 +512,19 @@ class DataFrameLoaderRDD[K, V](
theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setSegmentId(String.valueOf(loadCount))
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
- loader.initialize()
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ loader.initialize
val rddIteratorKey = UUID.randomUUID().toString
try {
RddInputUtils.put(rddIteratorKey,
- new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
+ new PartitionIterator(
+ firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
+ carbonLoadModel,
+ context))
carbonLoadModel.setRddIteratorKey(rddIteratorKey)
loader.run()
} finally {
@@ -548,77 +552,53 @@ class DataFrameLoaderRDD[K, V](
override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
}
+class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
+ carbonLoadModel: CarbonLoadModel,
+ context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
+ def hasNext: Boolean = partitionIter.hasNext
+ def next: JavaRddIterator[Array[String]] = {
+ val value = partitionIter.next
+ new RddIterator(value.rdd.iterator(value.partition, context),
+ carbonLoadModel,
+ context)
+ }
+ def initialize: Unit = {
+ TaskContextUtil.setTaskContext(context)
+ }
+}
/**
* This class wrap Scala's Iterator to Java's Iterator.
* It also convert all columns to string data to use csv data loading flow.
*
* @param rddIter
* @param carbonLoadModel
+ * @param context
*/
class RddIterator(rddIter: Iterator[Row],
- carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
+ carbonLoadModel: CarbonLoadModel,
+ context: TaskContext) extends JavaRddIterator[Array[String]] {
+
val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val format = new SimpleDateFormat(formatString)
val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
def hasNext: Boolean = rddIter.hasNext
- private def getString(value: Any, level: Int = 1): String = {
- if (value == null) {
- ""
- } else {
- value match {
- case s: String => s
- case i: java.lang.Integer => i.toString
- case d: java.lang.Double => d.toString
- case t: java.sql.Timestamp => format format t
- case d: java.sql.Date => format format d
- case d: java.math.BigDecimal => d.toPlainString
- case b: java.lang.Boolean => b.toString
- case s: java.lang.Short => s.toString
- case f: java.lang.Float => f.toString
- case bs: Array[Byte] => new String(bs)
- case s: scala.collection.Seq[Any] =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
- val builder = new StringBuilder()
- s.foreach { x =>
- builder.append(getString(x, level + 1)).append(delimiter)
- }
- builder.substring(0, builder.length - 1)
- case m: scala.collection.Map[Any, Any] =>
- throw new Exception("Unsupported data type: Map")
- case r: org.apache.spark.sql.Row =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
- val builder = new StringBuilder()
- for (i <- 0 until r.length) {
- builder.append(getString(r(i), level + 1)).append(delimiter)
- }
- builder.substring(0, builder.length - 1)
- case other => other.toString
- }
- }
- }
-
def next: Array[String] = {
val row = rddIter.next()
val columns = new Array[String](row.length)
- for (i <- 0 until row.length) {
- columns(i) = getString(row(i))
+ for (i <- 0 until columns.length) {
+ columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, format)
}
columns
}
- def remove(): Unit = {
+ def initialize: Unit = {
+ TaskContextUtil.setTaskContext(context)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6c09607..c30ead7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -30,7 +30,8 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
-import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer}
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
@@ -902,21 +903,30 @@ object CarbonDataRDDFactory {
}
def loadDataFrame(): Unit = {
- var rdd = dataFrame.get.rdd
- var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
- numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
- rdd = rdd.coalesce(numPartitions, shuffle = false)
-
- status = new DataFrameLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storePath,
- kettleHomePath,
- columinar,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- rdd).collect()
+ try {
+ val rdd = dataFrame.get.rdd
+ val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
+ DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+ }.distinct.size
+ val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+ sqlContext.sparkContext)
+ val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+
+ status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ storePath,
+ kettleHomePath,
+ columinar,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ newRdd).collect()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, "load data frame failed")
+ throw ex
+ }
}
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
@@ -932,28 +942,32 @@ object CarbonDataRDDFactory {
loadDataFile()
}
val newStatusMap = scala.collection.mutable.Map.empty[String, String]
- status.foreach { eachLoadStatus =>
- val state = newStatusMap.get(eachLoadStatus._1)
- state match {
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
- if eachLoadStatus._2.getLoadStatus ==
- CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
- case _ =>
- newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ if (status.nonEmpty) {
+ status.foreach { eachLoadStatus =>
+ val state = newStatusMap.get(eachLoadStatus._1)
+ state match {
+ case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ if eachLoadStatus._2.getLoadStatus ==
+ CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ case _ =>
+ newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
+ }
}
- }
- newStatusMap.foreach {
- case (key, value) =>
- if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
- } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
- !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
- loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
- }
+ newStatusMap.foreach {
+ case (key, value) =>
+ if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+ } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+ !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+ }
+ }
+ } else {
+ loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
}
if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
@@ -1116,6 +1130,4 @@ object CarbonDataRDDFactory {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index ced45b7..c91cec0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.spark.rdd
import java.io.{DataInputStream, InputStreamReader}
import java.nio.charset.Charset
+import java.text.SimpleDateFormat
import java.util.regex.Pattern
import scala.collection.mutable
@@ -42,6 +43,7 @@ import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
import org.apache.carbondata.spark.util.GlobalDictionaryUtil
import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
@@ -157,7 +159,8 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
isFirstLoad: Boolean,
hdfsTempLocation: String,
lockType: String,
- zooKeeperUrl: String) extends Serializable
+ zooKeeperUrl: String,
+ serializationNullFormat: String) extends Serializable
case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
@@ -251,13 +254,17 @@ class CarbonBlockDistinctValuesCombineRDD(
val dimNum = model.dimensions.length
var row: Row = null
val rddIter = firstParent[Row].iterator(split, context)
+ val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ val format = new SimpleDateFormat(formatString)
// generate block distinct value set
while (rddIter.hasNext) {
row = rddIter.next()
if (row != null) {
rowCount += 1
for (i <- 0 until dimNum) {
- dimensionParsers(i).parseString(row.getString(i))
+ dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
+ model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index afc6cc5..d91a012 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -18,6 +18,7 @@
package org.apache.carbondata.spark.util
import java.io.File
+import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
@@ -165,4 +166,54 @@ object CarbonScalaUtil {
kettleHomePath
}
+ def getString(value: Any,
+ serializationNullFormat: String,
+ delimiterLevel1: String,
+ delimiterLevel2: String,
+ format: SimpleDateFormat,
+ level: Int = 1): String = {
+ if (value == null) {
+ serializationNullFormat
+ } else {
+ value match {
+ case s: String => s
+ case d: java.math.BigDecimal => d.toPlainString
+ case i: java.lang.Integer => i.toString
+ case d: java.lang.Double => d.toString
+ case t: java.sql.Timestamp => format format t
+ case d: java.sql.Date => format format d
+ case b: java.lang.Boolean => b.toString
+ case s: java.lang.Short => s.toString
+ case f: java.lang.Float => f.toString
+ case bs: Array[Byte] => new String(bs)
+ case s: scala.collection.Seq[Any] =>
+ val delimiter = if (level == 1) {
+ delimiterLevel1
+ } else {
+ delimiterLevel2
+ }
+ val builder = new StringBuilder()
+ s.foreach { x =>
+ builder.append(getString(x, serializationNullFormat, delimiterLevel1,
+ delimiterLevel2, format, level + 1)).append(delimiter)
+ }
+ builder.substring(0, builder.length - 1)
+ case m: scala.collection.Map[Any, Any] =>
+ throw new Exception("Unsupported data type: Map")
+ case r: org.apache.spark.sql.Row =>
+ val delimiter = if (level == 1) {
+ delimiterLevel1
+ } else {
+ delimiterLevel2
+ }
+ val builder = new StringBuilder()
+ for (i <- 0 until r.length) {
+ builder.append(getString(r(i), serializationNullFormat, delimiterLevel1,
+ delimiterLevel2, format, level + 1)).append(delimiter)
+ }
+ builder.substring(0, builder.length - 1)
+ case other => other.toString
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index f17c62b..9a4e209 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -323,6 +323,8 @@ object GlobalDictionaryUtil {
CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
+ val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// get load count
if (null == carbonLoadModel.getLoadMetadataDetails) {
CarbonDataRDDFactory.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
@@ -343,7 +345,8 @@ object GlobalDictionaryUtil {
carbonLoadModel.getLoadMetadataDetails.size() == 0,
hdfsTempLocation,
lockType,
- zookeeperUrl)
+ zookeeperUrl,
+ serializationNullFormat)
}
/**
@@ -763,11 +766,7 @@ object GlobalDictionaryUtil {
if (StringUtils.isEmpty(allDictionaryPath)) {
LOGGER.info("Generate global dictionary from source data files!")
// load data by using dataSource com.databricks.spark.csv
- var df = if (dataFrame.isDefined) {
- dataFrame.get
- } else {
- loadDataFrame(sqlContext, carbonLoadModel)
- }
+ var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
df.columns
} else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
new file mode 100644
index 0000000..3acde94
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark._
+
+case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
+
+class DataLoadCoalescedRDD[T: ClassTag](
+ @transient var prev: RDD[T],
+ nodeList: Array[String])
+ extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) with Logging {
+
+ override def getPartitions: Array[Partition] = {
+ new DataLoadPartitionCoalescer(prev, nodeList).run
+ }
+
+ override def compute(split: Partition,
+ context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+
+ new Iterator[DataLoadPartitionWrap[T]] {
+ val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
+ def hasNext = iter.hasNext
+ def next: DataLoadPartitionWrap[T] = {
+ DataLoadPartitionWrap(firstParent[T], iter.next())
+ }
+ }
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(new NarrowDependency(prev) {
+ def getParents(id: Int): Seq[Int] =
+ partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
+ })
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
+
+ /**
+ * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+ * then the preferred machine will be one which most parent splits prefer too.
+ * @param partition
+ * @return the machine most preferred by split
+ */
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..8e0971c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.rdd
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.LinkedHashSet
+
+import org.apache.spark.Logging
+import org.apache.spark.Partition
+import org.apache.spark.scheduler.TaskLocation
+
+/**
+ * DataLoadPartitionCoalescer
+ * Repartition the partitions of rdd to few partitions, one partition per node.
+ * exmaple:
+ * blk_hst host1 host2 host3 host4 host5
+ * block1 host1 host2 host3
+ * block2 host2 host4 host5
+ * block3 host3 host4 host5
+ * block4 host1 host2 host4
+ * block5 host1 host3 host4
+ * block6 host1 host2 host5
+ * -------------------------------------------------------
+ * 1. sort host by number of blocks
+ * -------------------------------------------------------
+ * host3: block1 block3 block5
+ * host5: block2 block3 block6
+ * host1: block1 block4 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 2. sort blocks of each host1
+ * new partitions are before old partitions
+ * -------------------------------------------------------
+ * host3: block1 block3 block5
+ * host5: block2 block6+block3
+ * host1: block4+block1 block5 block6
+ * host2: block1 block2 block4 block6
+ * host4: block2 block3 block4 block5
+ * -------------------------------------------------------
+ * 3. assign blocks to host
+ * -------------------------------------------------------
+ * step1: host3 choose block1, remove from host1, host2
+ * step2: host5 choose block2, remove from host2, host4
+ * step3: host1 choose block4, .....
+ * -------------------------------------------------------
+ * result:
+ * host3: block1 block5
+ * host5: block2
+ * host1: block4
+ * host2: block6
+ * host4: block3
+ */
+class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) extends Logging {
+
+ val prevPartitions = prev.partitions
+ var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length))
+ // host => partition id list
+ val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
+ // partition id => host list
+ val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
+ val noLocalityPartitions = new ArrayBuffer[Int]
+ var noLocality = true
+ /**
+ * assign a task location for a partition
+ */
+ private def getLocation(index: Int): Option[String] = {
+ if (index < nodeList.length) {
+ Some(nodeList(index))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * collect partitions to each node
+ */
+ private def groupByNode(): Unit = {
+ // initialize hostMapPartitionIds
+ nodeList.foreach { node =>
+ val map = new LinkedHashSet[Int]
+ hostMapPartitionIds.put(node, map)
+ }
+ // collect partitions for each node
+ val tmpNoLocalityPartitions = new ArrayBuffer[Int]
+ prevPartitions.foreach { p =>
+ val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
+ if (locs.isEmpty) {
+ // if a partition has no location, add to noLocalityPartitions
+ tmpNoLocalityPartitions += p.index
+ } else {
+ // add partion to hostMapPartitionIds and partitionIdMapHosts
+ locs.foreach { loc =>
+ val host = loc.host
+ hostMapPartitionIds.get(host) match {
+ // if the location of the partition is not in node list,
+ // will add this partition to noLocalityPartitions
+ case None => tmpNoLocalityPartitions += p.index
+ case Some(ids) =>
+ noLocality = false
+ ids += p.index
+ partitionIdMapHosts.get(p.index) match {
+ case None =>
+ val hosts = new ArrayBuffer[String]
+ hosts += host
+ partitionIdMapHosts.put(p.index, hosts)
+ case Some(hosts) =>
+ hosts += host
+ }
+ }
+ }
+ }
+ }
+
+ // remove locality partition
+ tmpNoLocalityPartitions.distinct.foreach {index =>
+ partitionIdMapHosts.get(index) match {
+ case None => noLocalityPartitions += index
+ case Some(_) =>
+ }
+ }
+ }
+
+ /**
+ * sort host and partitions
+ */
+ private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = {
+ val oldPartitionIdSet = new HashSet[Int]
+ // sort host by number of partitions
+ hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc =>
+ // order: newPartitionIds + oldPartitionIds
+ val sortedPartitionIdSet = new LinkedHashSet[Int]
+ var newPartitionIds = new ArrayBuffer[Int]
+ var oldPartitionIds = new ArrayBuffer[Int]
+ loc._2.foreach { p =>
+ if (oldPartitionIdSet.contains(p)) {
+ oldPartitionIds += p
+ } else {
+ newPartitionIds += p
+ oldPartitionIdSet.add(p)
+ }
+ }
+ // sort and add new partitions
+ newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+ // sort and add old partitions
+ oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
+ // update hostMapPartitionIds
+ hostMapPartitionIds.put(loc._1, sortedPartitionIdSet)
+ (loc._1, sortedPartitionIdSet)
+ }.toArray
+ }
+
+ /**
+ * assign locality partition to each host
+ */
+ private def assignPartitonNodeLocality(
+ noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = {
+ val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length)
+ for (i <- 0 until localityResult.length) {
+ localityResult(i) = new ArrayBuffer[Int]
+ }
+ val noEmptyHostSet = new HashSet[String]
+ noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)}
+
+ var hostIndex = 0
+ while (noEmptyHostSet.nonEmpty) {
+ val hostEntry = noEmptyHosts(hostIndex)
+ if (noEmptyHostSet.contains(hostEntry._1)) {
+ if (hostEntry._2.nonEmpty) {
+ var partitionId = hostEntry._2.iterator.next
+ localityResult(hostIndex) += partitionId
+ // remove from sortedParts
+ partitionIdMapHosts.get(partitionId) match {
+ case Some(locs) =>
+ locs.foreach { loc =>
+ hostMapPartitionIds.get(loc) match {
+ case Some(parts) =>
+ parts.remove(partitionId)
+ }
+ }
+ }
+ } else {
+ noEmptyHostSet.remove(hostEntry._1)
+ }
+ }
+
+ hostIndex = hostIndex + 1
+ if (hostIndex == noEmptyHosts.length) {
+ hostIndex = 0
+ }
+ }
+ localityResult
+ }
+
+ /**
+ * assign no locality partitions to each host
+ */
+ private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String],
+ noEmptyHosts: mutable.Buffer[String],
+ localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = {
+ val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length)
+ logInfo(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}")
+ val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length)
+ for (i <- 0 until noLocalityResult.length) {
+ noLocalityResult(i) = new ArrayBuffer[Int]
+ }
+ var noLocalityPartitionIndex = 0
+ if (noLocalityPartitions.nonEmpty) {
+ if (emptyHosts.nonEmpty) {
+ // at first, assign avg number to empty node
+ for (i <- 0 until avgNumber) {
+ noLocalityResult.foreach { partitionIds =>
+ if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+ partitionIds += noLocalityPartitions(noLocalityPartitionIndex)
+ noLocalityPartitionIndex = noLocalityPartitionIndex + 1
+ }
+ }
+ }
+ }
+ // still have no locality partitions
+ // assign to all hosts
+ if (noLocalityPartitionIndex < noLocalityPartitions.length) {
+ var partIndex = 0
+ for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) {
+ if (partIndex < localityResult.length) {
+ localityResult(partIndex) += noLocalityPartitions(i)
+ } else {
+ noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i)
+ }
+ partIndex = partIndex + 1
+ if (partIndex == localityResult.length + noLocalityResult.length) {
+ partIndex = 0
+ }
+ }
+ }
+ }
+ noLocalityResult
+ }
+
+ /**
+ * no locality repartition
+ */
+ private def repartitionNoLocality(): Array[Partition] = {
+ // no locality repartition
+ logInfo("no locality partition")
+ val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts)
+ for (i <- 0 until numOfParts) {
+ prevPartIndexs(i) = new ArrayBuffer[Int]
+ }
+ for (i <- 0 until prevPartitions.length) {
+ prevPartIndexs(i % numOfParts) += prevPartitions(i).index
+ }
+ prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
+ new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
+ }
+ }
+
+ private def repartitionLocality(): Array[Partition] = {
+ logInfo("locality partition")
+ val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq
+ // empty host seq
+ val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer
+ // non empty host array
+ var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty)
+
+ // 1. do locality repartition
+ // sort host and partitions
+ tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts)
+ // assign locality partition to non empty hosts
+ val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts)
+ // collect non empty hosts and empty hosts
+ val noEmptyHosts = mutable.Buffer[String]()
+ val localityResult = mutable.Buffer[ArrayBuffer[Int]]()
+ for(index <- 0 until templocalityResult.size) {
+ if (templocalityResult(index).isEmpty) {
+ emptyHosts += tempNoEmptyHosts(index)._1
+ } else {
+ noEmptyHosts += tempNoEmptyHosts(index)._1
+ localityResult += templocalityResult(index)
+ }
+ }
+ // 2. do no locality repartition
+ // assign no locality partitions to all hosts
+ val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult)
+
+ // 3. generate CoalescedRDDPartition
+ (0 until localityResult.length + noLocalityResult.length).map { index =>
+ val ids = if (index < localityResult.length) {
+ localityResult(index).toArray
+ } else {
+ noLocalityResult(index - localityResult.length).toArray
+ }
+ val loc = if (index < localityResult.length) {
+ Some(noEmptyHosts(index))
+ } else {
+ Some(emptyHosts(index - localityResult.length))
+ }
+ logInfo(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
+ new CoalescedRDDPartition(index, prev, ids, loc)
+ }.filter(_.parentsIndices.nonEmpty).toArray
+
+ }
+
+ def run(): Array[Partition] = {
+ // 1. group partitions by node
+ groupByNode()
+ logInfo(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}")
+ val partitions = if (noLocality) {
+ // 2.A no locality partition
+ repartitionNoLocality()
+ } else {
+ // 2.B locality partition
+ repartitionLocality()
+ }
+ DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions)
+ partitions
+ }
+}
+
+object DataLoadPartitionCoalescer {
+ def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = {
+ prev.context.getPreferredLocs(prev, p.index)
+ }
+
+ def getParentsIndices(p: Partition): Array[Int] = {
+ p.asInstanceOf[CoalescedRDDPartition].parentsIndices
+ }
+
+ def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+ val prevPartIds = new ArrayBuffer[Int]
+ parts.foreach{ p =>
+ prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p)
+ }
+ // all partitions must be arranged once.
+ assert(prevPartIds.size == prevParts.size)
+ val prevPartIdsMap = prevPartIds.map{ id =>
+ (id, id)
+ }.toMap
+ prevParts.foreach{ p =>
+ prevPartIdsMap.get(p.index) match {
+ case None => assert(false, "partition " + p.index + " not found")
+ case Some(_) =>
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 02453bd..e5264ca 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -103,8 +103,13 @@ object DistributionUtil {
* @return
*/
def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable],
- sparkContext: SparkContext): Seq[String] = {
- val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava)
+ sparkContext: SparkContext): Seq[String] = {
+ val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava)
+ ensureExecutorsByNumberAndGetNodeList(nodeMapping.size(), sparkContext)
+ }
+
+ def ensureExecutorsByNumberAndGetNodeList(nodesOfData: Int,
+ sparkContext: SparkContext): Seq[String] = {
var confExecutorsTemp: String = null
if (sparkContext.getConf.contains("spark.executor.instances")) {
confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances")
@@ -116,15 +121,11 @@ object DistributionUtil {
}
}
- val confExecutors = if (null != confExecutorsTemp) {
- confExecutorsTemp.toInt
- } else {
- 1
- }
- val requiredExecutors = if (nodeMapping.size > confExecutors) {
+ val confExecutors = if (null != confExecutorsTemp) confExecutorsTemp.toInt else 1
+ val requiredExecutors = if (nodesOfData < 1 || nodesOfData > confExecutors) {
confExecutors
} else {
- nodeMapping.size()
+ nodesOfData
}
val startTime = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
new file mode 100644
index 0000000..e73f78c
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/util/TaskContextUtil.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import org.apache.spark.TaskContext
+
+
+object TaskContextUtil {
+ def setTaskContext(context: TaskContext): Unit = {
+ val localThreadContext = TaskContext.get()
+ if (localThreadContext == null) {
+ TaskContext.setTaskContext(context)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
new file mode 100644
index 0000000..9dd74c4
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadPartitionCoalescer.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.testsuite.dataload
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.rdd.{DataLoadPartitionCoalescer, RDD}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestDataLoadPartitionCoalescer extends QueryTest with BeforeAndAfterAll {
+ var nodeList: Array[String] = _
+
+ class DummyPartition(val index: Int,
+ rawSplit: FileSplit) extends Partition {
+ val serializableHadoopSplit = new SerializableWritable(rawSplit)
+ }
+
+ class Dummy(sc: SparkContext, partitions: Array[Partition]) extends RDD[Row](sc, Nil) {
+ override def compute(split: Partition, context: TaskContext): Iterator[Row] = {
+ new Iterator[Row] {
+ var isFirst = true;
+ override def hasNext: Boolean = isFirst;
+
+ override def next(): Row = {
+ isFirst = false
+ new GenericRow(Array[Any]())
+ }
+ }
+ }
+
+ override protected def getPartitions: Array[Partition] = partitions
+
+ override protected def getPreferredLocations(split: Partition): Seq[String] = {
+ split.asInstanceOf[DummyPartition].serializableHadoopSplit.value.getLocations.toSeq
+ }
+
+ }
+
+ override def beforeAll: Unit = {
+ nodeList = Array("host1", "host2", "host3")
+
+ }
+
+ def createPartition(index: Int, file: String, hosts: Array[String]) : Partition = {
+ new DummyPartition(index, new FileSplit(new Path(file), 0, 1, hosts))
+ }
+
+ def repartition(parts: Array[Partition]): Array[Partition] = {
+ new DataLoadPartitionCoalescer(new Dummy(sparkContext, parts), nodeList).run
+ }
+
+ def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
+ DataLoadPartitionCoalescer.checkPartition(prevParts, parts)
+ }
+
+ test("test number of partitions is more than nodes's") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host3")),
+ createPartition(2, "3.csv", Array("host1", "host2", "host3")),
+ createPartition(3, "4.csv", Array("host1", "host2", "host3")),
+ createPartition(4, "5.csv", Array("host1", "host2", "host3"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 3)
+ checkPartition(prevParts, parts)
+ }
+
+ test("test number of partitions equals nodes's") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host3")),
+ createPartition(2, "3.csv", Array("host1", "host2", "host3"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 3)
+ checkPartition(prevParts, parts)
+ }
+
+ test("test number of partitions is less than nodes's") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host3"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 2)
+ checkPartition(prevParts, parts)
+ }
+
+ test("all partitions are locality") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host3"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 2)
+ checkPartition(prevParts, parts)
+ }
+
+ test("part of partitions are locality1") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+ createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 3)
+ checkPartition(prevParts, parts)
+ }
+
+ test("part of partitions are locality2") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host3")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+ createPartition(2, "3.csv", Array("host3", "host5", "host6"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 3)
+ checkPartition(prevParts, parts)
+ }
+
+ test("part of partitions are locality3") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array("host1", "host2", "host7")),
+ createPartition(1, "2.csv", Array("host1", "host2", "host4")),
+ createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 3)
+ checkPartition(prevParts, parts)
+ }
+
+ test("all partition are not locality") {
+ val prevParts = Array[Partition](
+ createPartition(0, "1.csv", Array()),
+ createPartition(1, "2.csv", Array()),
+ createPartition(2, "3.csv", Array("host4", "host5", "host6"))
+ )
+ val parts = repartition(prevParts)
+ assert(parts.size == 3)
+ checkPartition(prevParts, parts)
+ }
+
+ override def afterAll {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index ef2adbc..3263bf9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -23,18 +23,15 @@ import java.io.File
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
+import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.scalatest.BeforeAndAfterAll
-
+import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.model.CarbonLoadModel
/**
* Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
- *
- * @date: Apr 10, 2016 10:34:58 PM
- * @See org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
*/
class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
@@ -64,6 +61,8 @@ class AllDictionaryTestCase extends QueryTest with BeforeAndAfterAll {
carbonLoadModel.setComplexDelimiterLevel1("\\$")
carbonLoadModel.setComplexDelimiterLevel2("\\:")
carbonLoadModel.setAllDictPath(allDictFilePath)
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
carbonLoadModel
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 1531ade..40341a8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -20,17 +20,19 @@ package org.apache.carbondata.spark.util
import java.io.File
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.scalatest.BeforeAndAfterAll
+
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.apache.spark.sql.common.util.CarbonHiveContext
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.constants.TableOptionConstant
+import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* test case for external column dictionary generation
@@ -145,6 +147,8 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
carbonLoadModel.setComplexDelimiterLevel2("\\:")
carbonLoadModel.setColDictFilePath(extColFilePath)
carbonLoadModel.setQuoteChar("\"");
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
carbonLoadModel
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index 7d6e994..22cf2ea 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -20,28 +20,23 @@ package org.apache.carbondata.spark.util
import java.io.File
+import java.util.concurrent.Executors
+import java.util.concurrent.Callable
+
+import scala.collection.mutable.ListBuffer
+
import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.apache.spark.sql.common.util.CarbonHiveContext
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.QueryTest
-
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import scala.collection.mutable.ListBuffer
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.Future
-import java.util.concurrent.FutureTask
-import java.util.concurrent.Callable
-import java.util.concurrent.TimeUnit
-
import org.apache.carbondata.common.ext.PathFactory
-import org.apache.carbondata.core.carbon.path.CarbonTablePath
-import org.apache.carbondata.core.carbon.ColumnIdentifier
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.model.CarbonLoadModel
class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAfterAll {
@@ -70,6 +65,8 @@ class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAft
carbonLoadModel.setComplexDelimiterLevel2("\\:")
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
carbonLoadModel.setQuoteChar("\"")
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
carbonLoadModel
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index b78ffdb..d567ccd 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -24,18 +24,16 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
import org.apache.spark.sql.common.util.CarbonHiveContext
import org.apache.spark.sql.common.util.CarbonHiveContext.sql
import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.scalatest.BeforeAndAfterAll
-
+import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.model.CarbonLoadModel
+
/**
* Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil
- *
- * @date: Apr 10, 2016 10:34:58 PM
- * @See org.apache.carbondata.spark.util.GlobalDictionaryUtil
*/
class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll {
@@ -71,6 +69,8 @@ class GlobalDictionaryUtilTestCase extends QueryTest with BeforeAndAfterAll {
carbonLoadModel.setComplexDelimiterLevel2("\\:")
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
carbonLoadModel.setQuoteChar("\"")
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
carbonLoadModel
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
index cf861bc..c64c504 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CsvInput.java
@@ -22,7 +22,6 @@ package org.apache.carbondata.processing.csvreaderstep;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -384,21 +383,75 @@ public class CsvInput extends BaseStep implements StepInterface {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
meta.getPartitionID(), System.currentTimeMillis());
} else {
- scanRddIterator();
+ scanRddIterator(numberOfNodes);
}
setOutputDone();
return false;
}
- private void scanRddIterator() throws RuntimeException {
- Iterator<String[]> iterator = RddInputUtils.getAndRemove(rddIteratorKey);
- if (iterator != null) {
- try{
- while(iterator.hasNext()){
- putRow(data.outputRowMeta, iterator.next());
+ class RddScanCallable implements Callable<Void> {
+ List<JavaRddIterator<String[]>> iterList;
+
+ RddScanCallable() {
+ this.iterList = new ArrayList<JavaRddIterator<String[]>>(1000);
+ }
+
+ public void addJavaRddIterator(JavaRddIterator<String[]> iter) {
+ this.iterList.add(iter);
+ }
+
+ @Override
+ public Void call() throws Exception {
+ StandardLogService.setThreadName(("PROCESS_DataFrame_PARTITIONS"),
+ Thread.currentThread().getName());
+ try {
+ String[] values = null;
+ for (JavaRddIterator<String[]> iter: iterList) {
+ iter.initialize();
+ while (iter.hasNext()) {
+ values = iter.next();
+ synchronized (putRowLock) {
+ putRow(data.outputRowMeta, values);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error(e, "Scan rdd during data load is terminated due to error.");
+ throw e;
+ }
+ return null;
+ }
+ }
+
+ private void scanRddIterator(int numberOfNodes) throws RuntimeException {
+ JavaRddIterator<JavaRddIterator<String[]>> iter = RddInputUtils.getAndRemove(rddIteratorKey);
+ if (iter != null) {
+ iter.initialize();
+ exec = Executors.newFixedThreadPool(numberOfNodes);
+ List<Future<Void>> results = new ArrayList<Future<Void>>(numberOfNodes);
+ RddScanCallable[] calls = new RddScanCallable[numberOfNodes];
+ for (int i = 0; i < numberOfNodes; i++ ) {
+ calls[i] = new RddScanCallable();
+ }
+ int index = 0 ;
+ while (iter.hasNext()) {
+ calls[index].addJavaRddIterator(iter.next());
+ index = index + 1;
+ if (index == numberOfNodes) {
+ index = 0;
+ }
+ }
+ for (RddScanCallable call: calls) {
+ results.add(exec.submit(call));
+ }
+ try {
+ for (Future<Void> futrue : results) {
+ futrue.get();
}
- } catch (KettleException e) {
- throw new RuntimeException(e);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Thread InterruptedException", e);
+ } finally {
+ exec.shutdownNow();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
new file mode 100644
index 0000000..9e11816
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/JavaRddIterator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.csvreaderstep;
+/**
+ * JavaRddIterator wrap spark rdd iterator.
+ * It can avoid this module dependency spark module.
+ * @param <E>
+ */
+public interface JavaRddIterator<E> {
+
+ boolean hasNext();
+
+ E next();
+
+ void initialize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f8a0c876/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
index f9a0429..b3dfdab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/RddInputUtils.java
@@ -20,19 +20,18 @@
package org.apache.carbondata.processing.csvreaderstep;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
public class RddInputUtils {
- private static Map<String, Iterator<String[]>> iteratorMap = new HashMap<String,
- Iterator<String[]>>();
+ private static Map<String, JavaRddIterator<JavaRddIterator<String[]>>> iteratorMap = new
+ HashMap<String, JavaRddIterator<JavaRddIterator<String[]>>>();
- public static void put(String key, Iterator<String[]> value) {
+ public static void put(String key, JavaRddIterator<JavaRddIterator<String[]>> value) {
iteratorMap.put(key, value);
}
- public static Iterator<String[]> getAndRemove(String key) {
- Iterator<String[]> iter = iteratorMap.get(key);
+ public static JavaRddIterator<JavaRddIterator<String[]>> getAndRemove(String key) {
+ JavaRddIterator<JavaRddIterator<String[]>> iter = iteratorMap.get(key);
remove(key);
return iter;
}
[2/2] incubator-carbondata git commit: [CARBONDATA-368]Imporve
performance of dataframe loading This closes #278
Posted by ja...@apache.org.
[CARBONDATA-368]Imporve performance of dataframe loading This closes #278
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/567fa513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/567fa513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/567fa513
Branch: refs/heads/master
Commit: 567fa5131628b70c8c4829368fda6d48cb013af3
Parents: 879bfe7 f8a0c87
Author: jackylk <ja...@huawei.com>
Authored: Tue Nov 29 17:15:20 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Nov 29 17:15:20 2016 +0800
----------------------------------------------------------------------
.../spark/rdd/CarbonDataLoadRDD.scala | 96 ++---
.../spark/rdd/CarbonDataRDDFactory.scala | 88 +++--
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 11 +-
.../carbondata/spark/util/CarbonScalaUtil.scala | 51 +++
.../spark/util/GlobalDictionaryUtil.scala | 11 +-
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 68 ++++
.../spark/rdd/DataLoadPartitionCoalescer.scala | 363 +++++++++++++++++++
.../spark/sql/hive/DistributionUtil.scala | 19 +-
.../org/apache/spark/util/TaskContextUtil.scala | 29 ++
.../TestDataLoadPartitionCoalescer.scala | 170 +++++++++
.../spark/util/AllDictionaryTestCase.scala | 9 +-
.../util/ExternalColumnDictionaryTestCase.scala | 14 +-
...GlobalDictionaryUtilConcurrentTestCase.scala | 23 +-
.../util/GlobalDictionaryUtilTestCase.scala | 10 +-
.../processing/csvreaderstep/CsvInput.java | 73 +++-
.../csvreaderstep/JavaRddIterator.java | 32 ++
.../processing/csvreaderstep/RddInputUtils.java | 11 +-
17 files changed, 921 insertions(+), 157 deletions(-)
----------------------------------------------------------------------