You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2017/11/17 06:33:39 UTC

carbondata git commit: [CARBONDATA-1717]Remove spark broadcast for gettting hadoop configurations

Repository: carbondata
Updated Branches:
  refs/heads/master 733bb516d -> b6777fcc3


[CARBONDATA-1717]Remove spark broadcast for gettting hadoop configurations

This closes #1500


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6777fcc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6777fcc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6777fcc

Branch: refs/heads/master
Commit: b6777fcc32df3ce3616ea02f5566ab5bf4ca6e30
Parents: 733bb51
Author: akashrn5 <ak...@gmail.com>
Authored: Fri Oct 27 18:11:03 2017 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Fri Nov 17 14:33:10 2017 +0800

----------------------------------------------------------------------
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 26 ++++++++++++++++----
 1 file changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6777fcc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 6f44a0d..9ca21bc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream}
+import java.io._
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
 import java.util.{Date, UUID}
@@ -41,7 +41,9 @@ import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
@@ -187,9 +189,23 @@ class NewCarbonDataLoadRDD[K, V](
     formatter.format(new Date())
   }
 
-  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast =
-    sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
+  private val confBytes = {
+    val bao = new ByteArrayOutputStream()
+    val oos = new ObjectOutputStream(bao)
+    sc.hadoopConfiguration.write(oos)
+    oos.close()
+    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
+  }
+
+  private def getConf = {
+    val configuration = new Configuration(false)
+    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
+      .unCompressByte(confBytes))
+    val ois = new ObjectInputStream(bai)
+    configuration.readFields(ois)
+    ois.close()
+    configuration
+  }
 
   override def getPartitions: Array[Partition] = {
     blocksGroupBy.zipWithIndex.map { b =>
@@ -255,7 +271,7 @@ class NewCarbonDataLoadRDD[K, V](
 
       def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = {
         val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, theSplit.index, 0)
-        var configuration: Configuration = confBroadcast.value.value
+        var configuration: Configuration = getConf
         if (configuration == null) {
           configuration = new Configuration()
         }