You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/04/11 08:41:56 UTC
[1/2] incubator-carbondata git commit: lazy rdd iterator
Repository: incubator-carbondata
Updated Branches:
refs/heads/master d51387b81 -> 00535f4a9
lazy rdd iterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/e52e6413
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/e52e6413
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/e52e6413
Branch: refs/heads/master
Commit: e52e641372e298511ba0135054c97855153356dc
Parents: d51387b
Author: QiangCai <qi...@qq.com>
Authored: Fri Apr 7 19:38:19 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Apr 11 14:08:53 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 100 +++++++++++++++----
1 file changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e52e6413/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 72ee90f..a6d231d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
+import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.sql.Row
import org.apache.spark.util.SparkUtil
@@ -408,19 +409,14 @@ class NewDataFrameLoaderRDD[K, V](
val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
val serializer = SparkEnv.get.closureSerializer.newInstance()
- var serializeBuffer: ByteBuffer = null
+ var serializeBytes: Array[Byte] = null
while(partitionIterator.hasNext) {
val value = partitionIterator.next()
- val newInstance = {
- if (serializeBuffer == null) {
- serializeBuffer = serializer.serialize[RDD[Row]](value.rdd)
- }
- serializeBuffer.rewind()
- serializer.deserialize[RDD[Row]](serializeBuffer)
+ if (serializeBytes == null) {
+ serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array()
}
- recordReaders += new NewRddIterator(newInstance.iterator(value.partition, context),
- carbonLoadModel,
- context)
+ recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition,
+ carbonLoadModel, context)
}
val loader = new SparkPartitionLoader(model,
@@ -477,15 +473,16 @@ class NewRddIterator(rddIter: Iterator[Row],
carbonLoadModel: CarbonLoadModel,
context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
- val timeStampformatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
- .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- val timeStampFormat = new SimpleDateFormat(timeStampformatString)
- val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+ private val timeStampformatString = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
- val dateFormat = new SimpleDateFormat(dateFormatString)
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val serializationNullFormat =
+ private val dateFormat = new SimpleDateFormat(dateFormatString)
+ private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
def hasNext: Boolean = rddIter.hasNext
@@ -499,8 +496,73 @@ class NewRddIterator(rddIter: Iterator[Row],
columns
}
- override def initialize: Unit = {
+ override def initialize(): Unit = {
SparkUtil.setTaskContext(context)
}
}
+
+/**
+ * LazyRddIterator invoke rdd.iterator method when invoking hasNext method.
+ * @param serializer
+ * @param serializeBytes
+ * @param partition
+ * @param carbonLoadModel
+ * @param context
+ */
+class LazyRddIterator(serializer: SerializerInstance,
+ serializeBytes: Array[Byte],
+ partition: Partition,
+ carbonLoadModel: CarbonLoadModel,
+ context: TaskContext) extends CarbonIterator[Array[AnyRef]] {
+
+ private val timeStampformatString = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ private val dateFormatString = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ private val dateFormat = new SimpleDateFormat(dateFormatString)
+ private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ private val serializationNullFormat =
+ carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+
+ private var rddIter: Iterator[Row] = null
+ private var uninitialized = true
+ private var closed = false
+
+ def hasNext: Boolean = {
+ if (uninitialized) {
+ uninitialized = false
+ rddIter = serializer.deserialize[RDD[Row]](ByteBuffer.wrap(serializeBytes))
+ .iterator(partition, context)
+ }
+ if (closed) {
+ false
+ } else {
+ rddIter.hasNext
+ }
+ }
+
+ def next: Array[AnyRef] = {
+ val row = rddIter.next()
+ val columns = new Array[AnyRef](row.length)
+ for (i <- 0 until columns.length) {
+ columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
+ }
+ columns
+ }
+
+ override def initialize(): Unit = {
+ SparkUtil.setTaskContext(context)
+ }
+
+ override def close(): Unit = {
+ closed = true
+ rddIter = null
+ }
+
+}
[2/2] incubator-carbondata git commit: [CARBONDATA-887] lazy rdd
iterator for InsertInto. This closes #765
Posted by gv...@apache.org.
[CARBONDATA-887] lazy rdd iterator for InsertInto. This closes #765
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/00535f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/00535f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/00535f4a
Branch: refs/heads/master
Commit: 00535f4a9473d2879f4852d52fbcd5e07b695f52
Parents: d51387b e52e641
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Tue Apr 11 14:11:31 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Apr 11 14:11:31 2017 +0530
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 100 +++++++++++++++----
1 file changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------