You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 06:33:39 UTC
carbondata git commit: [CARBONDATA-1717]Remove spark broadcast for
gettting hadoop configurations
Repository: carbondata
Updated Branches:
refs/heads/master 733bb516d -> b6777fcc3
[CARBONDATA-1717]Remove spark broadcast for gettting hadoop configurations
This closes #1500
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6777fcc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6777fcc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6777fcc
Branch: refs/heads/master
Commit: b6777fcc32df3ce3616ea02f5566ab5bf4ca6e30
Parents: 733bb51
Author: akashrn5 <ak...@gmail.com>
Authored: Fri Oct 27 18:11:03 2017 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Fri Nov 17 14:33:10 2017 +0800
----------------------------------------------------------------------
.../spark/rdd/NewCarbonDataLoadRDD.scala | 26 ++++++++++++++++----
1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6777fcc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 6f44a0d..9ca21bc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.spark.rdd
-import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream}
+import java.io._
import java.nio.ByteBuffer
import java.text.SimpleDateFormat
import java.util.{Date, UUID}
@@ -41,7 +41,9 @@ import org.apache.carbondata.common.CarbonIterator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
@@ -187,9 +189,23 @@ class NewCarbonDataLoadRDD[K, V](
formatter.format(new Date())
}
- // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast =
- sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
+ private val confBytes = {
+ val bao = new ByteArrayOutputStream()
+ val oos = new ObjectOutputStream(bao)
+ sc.hadoopConfiguration.write(oos)
+ oos.close()
+ CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
+ }
+
+ private def getConf = {
+ val configuration = new Configuration(false)
+ val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
+ .unCompressByte(confBytes))
+ val ois = new ObjectInputStream(bai)
+ configuration.readFields(ois)
+ ois.close()
+ configuration
+ }
override def getPartitions: Array[Partition] = {
blocksGroupBy.zipWithIndex.map { b =>
@@ -255,7 +271,7 @@ class NewCarbonDataLoadRDD[K, V](
def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0)
- var configuration: Configuration = confBroadcast.value.value
+ var configuration: Configuration = getConf
if (configuration == null) {
configuration = new Configuration()
}