You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:20 UTC

[02/50] [abbrv] carbondata git commit: [CARBONDATA-1796] While submiting new job, pass credentials in jobConf object

[CARBONDATA-1796] While submiting new job, pass credentials in jobConf object

This closes #1855


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b421c246
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b421c246
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b421c246

Branch: refs/heads/fgdatamap
Commit: b421c24689b18429f13fd150ab4dd422c61ca622
Parents: 2081fba
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Jan 24 14:26:28 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Tue Jan 30 18:52:25 2018 +0800

----------------------------------------------------------------------
 .../spark/rdd/CarbonIUDMergerRDD.scala          |  3 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  2 ++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  7 +++-
 .../scala/org/apache/spark/util/SparkUtil.scala | 35 ++------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  5 +--
 5 files changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index e8180cd..4378c15 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -25,9 +25,9 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
-import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -57,6 +57,7 @@ class CarbonIUDMergerRDD[K, V](
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val jobConf: JobConf = new JobConf(new Configuration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     val defaultParallelism = sparkContext.defaultParallelism

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 8d7b044..c482a92 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.util.CarbonException
@@ -276,6 +277,7 @@ class CarbonMergerRDD[K, V](
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       absoluteTableIdentifier)
     val jobConf: JobConf = new JobConf(new Configuration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     CarbonTableInputFormat.setPartitionsToPrune(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index f2c3060..49c0225 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -26,9 +26,11 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.SparkSession
@@ -81,7 +83,10 @@ class CarbonScanRDD(
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def getPartitions: Array[Partition] = {
-    val job = Job.getInstance(new Configuration())
+    val conf = new Configuration()
+    val jobConf = new JobConf(conf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job = Job.getInstance(jobConf)
     val format = prepareInputFormatForDriver(job.getConfiguration)
 
     // initialise query_id for job

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
index 9c37640..4635fc7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -19,8 +19,10 @@ package org.apache.spark.util
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
 
 import org.apache.carbondata.processing.loading.csvinput.BlockDetails
@@ -37,37 +39,4 @@ object SparkUtil {
     }
   }
 
-  /**
-   * get file splits,return Array[BlockDetails], if file path is empty,then return empty Array
-   *
-   */
-  def getSplits(path: String, sc: SparkContext): Array[BlockDetails] = {
-    val filePath = FileUtils.getPaths(path)
-    if (filePath == null || filePath.isEmpty) {
-      // return a empty block details
-      Array[BlockDetails]()
-    } else {
-      // clone the hadoop configuration
-      val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
-      // set folder or file
-      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePath)
-      hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
-      val newHadoopRDD = new NewHadoopRDD[LongWritable, Text](
-        sc,
-        classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
-        classOf[LongWritable],
-        classOf[Text],
-        hadoopConfiguration)
-      val splits: Array[FileSplit] = newHadoopRDD.getPartitions.map { part =>
-        part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
-      }
-      splits.map { block =>
-        new BlockDetails(block.getPath,
-          block.getStart,
-          block.getLength,
-          block.getLocations
-        )
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 809c8ff..8212e85 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1030,9 +1030,10 @@ object CarbonDataRDDFactory {
              org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
 
     CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConf)
-
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
     val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-    val jobContext = new Job(hadoopConf)
+    val jobContext = new Job(jobConf)
     val rawSplits = inputFormat.getSplits(jobContext).toArray
     val blockList = rawSplits.map { inputSplit =>
       val fileSplit = inputSplit.asInstanceOf[FileSplit]