You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/12/11 03:59:28 UTC
[1/2] incubator-carbondata git commit: merge from master
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 98564f04d -> 26e3e0023
merge from master
reset maven-source-plugin
remove comments on createTableFromThrift and rais jira later
spark streaming dataframe support
reset maven-source-plugin
add comments for NewDataFrameLoaderRDD and NewRddIterator
format comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6c9194d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6c9194d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6c9194d9
Branch: refs/heads/master
Commit: 6c9194d97c54351434866f423ef44907b887ae5a
Parents: 98564f0
Author: WilliamZhu <al...@gmail.com>
Authored: Tue Dec 6 12:39:22 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Dec 11 09:27:51 2016 +0530
----------------------------------------------------------------------
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../spark/rdd/NewCarbonDataLoadRDD.scala | 167 +++++++++++++++++++
.../spark/CarbonDataFrameWriter.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 52 ++++--
.../execution/command/carbonTableSchema.scala | 5 +-
.../apache/spark/sql/hive/CarbonMetastore.scala | 6 +-
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../sort/impl/ParallelReadMergeSorterImpl.java | 3 +
.../util/CarbonDataProcessorUtil.java | 31 ++++
9 files changed, 247 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index a0503c7..213712e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -45,4 +45,6 @@ class CarbonOption(options: Map[String, String]) {
def compress: Boolean = options.getOrElse("compress", "false").toBoolean
def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+
+ def toMap: Map[String, String] = options
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 32770f7..96bb5ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -30,6 +30,8 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.command.Partitioner
import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -308,3 +310,168 @@ class NewCarbonDataLoadRDD[K, V](
}
}
}
+
+/**
+ * It loads the data to carbon from spark DataFrame using
+ * @see org.apache.carbondata.processing.newflow.DataLoadExecutor without
+ * kettle requirement
+ */
+class NewDataFrameLoaderRDD[K, V](
+ sc: SparkContext,
+ result: DataLoadResult[K, V],
+ carbonLoadModel: CarbonLoadModel,
+ loadCount: Integer,
+ tableCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ prev: RDD[Row]) extends RDD[(K, V)](prev) {
+
+
+ override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[(K, V)] {
+ var partitionID = "0"
+ val loadMetadataDetails = new LoadMetadataDetails()
+ var model: CarbonLoadModel = carbonLoadModel
+ var uniqueLoadStatusId =
+ carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+ try {
+
+ loadMetadataDetails.setPartitionCount(partitionID)
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+ carbonLoadModel.setPartitionId(partitionID)
+ carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+ carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+
+ val iterator = new NewRddIterator(
+ firstParent[Row].iterator(theSplit, context),
+ carbonLoadModel)
+
+ class CarbonIteratorImpl(iterator: util.Iterator[Array[AnyRef]])
+ extends CarbonIterator[Array[AnyRef]] {
+ override def initialize(): Unit = {}
+
+ override def close(): Unit = {}
+
+ override def next(): Array[AnyRef] = {
+ iterator.next
+ }
+
+ override def hasNext: Boolean = {
+ iterator.hasNext
+ }
+ }
+
+
+ val recordReaders: Array[CarbonIterator[Array[AnyRef]]] =
+ Array(new CarbonIteratorImpl(iterator))
+
+ val loader = new SparkPartitionLoader(model,
+ theSplit.index,
+ null,
+ null,
+ loadCount,
+ loadMetadataDetails)
+ // Intialize to set carbon properties
+ loader.initialize()
+
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+ new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+
+ } catch {
+ case e: BadRecordFoundException =>
+ loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+ logInfo("Bad Record Found")
+ case e: Exception =>
+ logInfo("DataLoad failure", e)
+ LOGGER.error(e)
+ throw e
+ }
+ var finished = false
+
+ override def hasNext: Boolean = !finished
+
+ override def next(): (K, V) = {
+ finished = true
+ result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+ }
+ }
+ iter
+ }
+ override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+}
+
+/**
+ * This class wrap Scala's Iterator to Java's Iterator.
+ * It also convert all columns to string data since carbondata will recognize the right type
+ * according to schema from spark DataFrame.
+ * @see org.apache.carbondata.spark.rdd.RddIterator
+ * @param rddIter
+ * @param carbonLoadModel
+ */
+class NewRddIterator(rddIter: Iterator[Row],
+ carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[AnyRef]] {
+ val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ val format = new SimpleDateFormat(formatString)
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+
+ def hasNext: Boolean = rddIter.hasNext
+
+ private def getString(value: Any, level: Int = 1): String = {
+ if (value == null) {
+ ""
+ } else {
+ value match {
+ case s: String => s
+ case i: java.lang.Integer => i.toString
+ case d: java.lang.Double => d.toString
+ case t: java.sql.Timestamp => format format t
+ case d: java.sql.Date => format format d
+ case d: java.math.BigDecimal => d.toPlainString
+ case b: java.lang.Boolean => b.toString
+ case s: java.lang.Short => s.toString
+ case f: java.lang.Float => f.toString
+ case bs: Array[Byte] => new String(bs)
+ case s: scala.collection.Seq[Any] =>
+ val delimiter = if (level == 1) {
+ delimiterLevel1
+ } else {
+ delimiterLevel2
+ }
+ val builder = new StringBuilder()
+ s.foreach { x =>
+ builder.append(getString(x, level + 1)).append(delimiter)
+ }
+ builder.substring(0, builder.length - 1)
+ case m: scala.collection.Map[Any, Any] =>
+ throw new Exception("Unsupported data type: Map")
+ case r: org.apache.spark.sql.Row =>
+ val delimiter = if (level == 1) {
+ delimiterLevel1
+ } else {
+ delimiterLevel2
+ }
+ val builder = new StringBuilder()
+ for (i <- 0 until r.length) {
+ builder.append(getString(r(i), level + 1)).append(delimiter)
+ }
+ builder.substring(0, builder.length - 1)
+ case other => other.toString
+ }
+ }
+ }
+
+ def next: Array[AnyRef] = {
+ val row = rddIter.next()
+ val columns = new Array[Object](row.length)
+ for (i <- 0 until row.length) {
+ columns(i) = getString(row(i))
+ }
+ columns
+ }
+
+ def remove(): Unit = {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index c464538..41595d5 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -128,7 +128,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) {
options.tableName,
null,
Seq(),
- Map("fileheader" -> header),
+ Map("fileheader" -> header) ++ options.toMap,
isOverwriteExist = false,
null,
Some(dataFrame)).run(cc)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8463477..53a5f67 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -634,23 +634,41 @@ object CarbonDataRDDFactory {
def loadDataFrame(): Unit = {
try {
val rdd = dataFrame.get.rdd
- val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
- DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
- }.distinct.size
- val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
- sqlContext.sparkContext)
- val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
-
- status = new DataFrameLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storePath,
- kettleHomePath,
- columnar,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- newRdd).collect()
+
+ if (useKettle) {
+
+ val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
+ DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+ }.distinct.size
+ val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
+ sqlContext.sparkContext)
+ val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+
+ status = new DataFrameLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ storePath,
+ kettleHomePath,
+ columnar,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ newRdd).collect()
+ } else {
+
+ var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
+ numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
+ val coalesceRdd = rdd.coalesce(numPartitions, shuffle = false)
+
+ status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ coalesceRdd).collect()
+ }
+
} catch {
case ex: Exception =>
LOGGER.error(ex, "load data frame failed")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 39d0841..779bb22 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -434,10 +434,9 @@ case class LoadTable(
val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
- val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
// TODO It will be removed after kettle is removed.
- val useKettle = options.get("use_kettle") match {
+ val useKettle = options.get("useKettle") match {
case Some(value) => value.toBoolean
case _ =>
val useKettleLocal = System.getProperty("use.kettle")
@@ -448,6 +447,8 @@ case class LoadTable(
}
}
+ val kettleHomePath = if (useKettle) CarbonScalaUtil.getKettleHome(sqlContext) else ""
+
val delimiter = options.getOrElse("delimiter", ",")
val quoteChar = options.getOrElse("quotechar", "\"")
val fileHeader = options.getOrElse("fileheader", "")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index bee891c..b065850 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -18,14 +18,13 @@
package org.apache.spark.sql.hive
import java.io._
-import java.util.{GregorianCalendar, UUID}
+import java.util.UUID
import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.util.parsing.combinator.RegexParsers
-import org.apache.spark
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
@@ -42,7 +41,6 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType
import org.apache.carbondata.core.reader.ThriftReader
@@ -293,10 +291,12 @@ class CarbonMetastore(hiveContext: HiveContext, val storePath: String,
if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
FileFactory.mkdirs(schemaMetadataPath, fileType)
}
+
val thriftWriter = new ThriftWriter(schemaFilePath, false)
thriftWriter.open()
thriftWriter.write(thriftTableInfo)
thriftWriter.close()
+
metadata.tablesMeta += tableMeta
logInfo(s"Table $tableName for Database $dbName created successfully.")
LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 5f0c7e3..b02d467 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -45,4 +45,6 @@ class CarbonOption(options: Map[String, String]) {
def compress: Boolean = options.getOrElse("compress", "false").toBoolean
def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+
+ def toMap: Map[String, String] = options
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index e2e995c..5d12ec4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -54,6 +54,8 @@ public class ParallelReadMergeSorterImpl implements Sorter {
private static final LogService LOGGER =
LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
+ private static final Object taskContext = CarbonDataProcessorUtil.fetchTaskContext();
+
private SortParameters sortParameters;
private SortIntermediateFileMerger intermediateFileMerger;
@@ -200,6 +202,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
@Override
public Void call() throws CarbonDataLoadingException {
try {
+ CarbonDataProcessorUtil.configureTaskContext(taskContext);
while (iterator.hasNext()) {
CarbonRowBatch batch = iterator.next();
Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6c9194d9/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index eff59e5..3cb984d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -25,6 +25,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
@@ -604,4 +606,33 @@ public final class CarbonDataProcessorUtil {
}
return dateformatsHashMap;
}
+
+ /**
+ * Maybe we can extract interfaces later to support task context in hive ,spark
+ */
+ public static Object fetchTaskContext() {
+ try {
+ return Class.forName("org.apache.spark.TaskContext").getMethod("get").invoke(null);
+ } catch (Exception e) {
+ //just ignore
+ LOGGER.info("org.apache.spark.TaskContext not found");
+ return null;
+ }
+ }
+
+ public static void configureTaskContext(Object context) {
+ try {
+ Class clazz = Class.forName("org.apache.spark.TaskContext$");
+ for (Method method : clazz.getDeclaredMethods()) {
+ if (method.getName().equals("setTaskContext")) {
+ Field field = clazz.getField("MODULE$");
+ Object instance = field.get(null);
+ method.invoke(instance, new Object[]{context});
+ }
+ }
+ } catch (Exception e) {
+ //just ignore
+ LOGGER.info("org.apache.spark.TaskContext not found");
+ }
+ }
}
\ No newline at end of file
[2/2] incubator-carbondata git commit: [CARBONDATA-465] Spark
streaming dataframe support This closes #368
Posted by ra...@apache.org.
[CARBONDATA-465] Spark streaming dataframe support This closes #368
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/26e3e002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/26e3e002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/26e3e002
Branch: refs/heads/master
Commit: 26e3e00232e771459f18a5ae417ee76ecd07a93e
Parents: 98564f0 6c9194d
Author: ravipesala <ra...@gmail.com>
Authored: Sun Dec 11 09:29:01 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sun Dec 11 09:29:01 2016 +0530
----------------------------------------------------------------------
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../spark/rdd/NewCarbonDataLoadRDD.scala | 167 +++++++++++++++++++
.../spark/CarbonDataFrameWriter.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 52 ++++--
.../execution/command/carbonTableSchema.scala | 5 +-
.../apache/spark/sql/hive/CarbonMetastore.scala | 6 +-
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../sort/impl/ParallelReadMergeSorterImpl.java | 3 +
.../util/CarbonDataProcessorUtil.java | 31 ++++
9 files changed, 247 insertions(+), 23 deletions(-)
----------------------------------------------------------------------