You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:20 UTC
[02/50] [abbrv] carbondata git commit: [CARBONDATA-1796] While
submiting new job, pass credentials in jobConf object
[CARBONDATA-1796] While submiting new job, pass credentials in jobConf object
This closes #1855
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b421c246
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b421c246
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b421c246
Branch: refs/heads/fgdatamap
Commit: b421c24689b18429f13fd150ab4dd422c61ca622
Parents: 2081fba
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Jan 24 14:26:28 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Tue Jan 30 18:52:25 2018 +0800
----------------------------------------------------------------------
.../spark/rdd/CarbonIUDMergerRDD.scala | 3 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 ++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 7 +++-
.../scala/org/apache/spark/util/SparkUtil.scala | 35 ++------------------
.../spark/rdd/CarbonDataRDDFactory.scala | 5 +--
5 files changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index e8180cd..4378c15 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -25,9 +25,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Partition, SparkContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.command.CarbonMergerMapping
-import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
@@ -57,6 +57,7 @@ class CarbonIUDMergerRDD[K, V](
tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
)
val jobConf: JobConf = new JobConf(new Configuration)
+ SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
val defaultParallelism = sparkContext.defaultParallelism
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 8d7b044..c482a92 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.util.CarbonException
@@ -276,6 +277,7 @@ class CarbonMergerRDD[K, V](
val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
absoluteTableIdentifier)
val jobConf: JobConf = new JobConf(new Configuration)
+ SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
CarbonTableInputFormat.setPartitionsToPrune(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index f2c3060..49c0225 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -26,9 +26,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.SparkSession
@@ -81,7 +83,10 @@ class CarbonScanRDD(
@transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def getPartitions: Array[Partition] = {
- val job = Job.getInstance(new Configuration())
+ val conf = new Configuration()
+ val jobConf = new JobConf(conf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val job = Job.getInstance(jobConf)
val format = prepareInputFormatForDriver(job.getConfiguration)
// initialise query_id for job
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
index 9c37640..4635fc7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
@@ -19,8 +19,10 @@ package org.apache.spark.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
import org.apache.carbondata.processing.loading.csvinput.BlockDetails
@@ -37,37 +39,4 @@ object SparkUtil {
}
}
- /**
- * get file splits,return Array[BlockDetails], if file path is empty,then return empty Array
- *
- */
- def getSplits(path: String, sc: SparkContext): Array[BlockDetails] = {
- val filePath = FileUtils.getPaths(path)
- if (filePath == null || filePath.isEmpty) {
- // return a empty block details
- Array[BlockDetails]()
- } else {
- // clone the hadoop configuration
- val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
- // set folder or file
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePath)
- hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
- val newHadoopRDD = new NewHadoopRDD[LongWritable, Text](
- sc,
- classOf[org.apache.hadoop.mapreduce.lib.input.TextInputFormat],
- classOf[LongWritable],
- classOf[Text],
- hadoopConfiguration)
- val splits: Array[FileSplit] = newHadoopRDD.getPartitions.map { part =>
- part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
- }
- splits.map { block =>
- new BlockDetails(block.getPath,
- block.getStart,
- block.getLength,
- block.getLocations
- )
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b421c246/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 809c8ff..8212e85 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -1030,9 +1030,10 @@ object CarbonDataRDDFactory {
org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConf)
-
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
- val jobContext = new Job(hadoopConf)
+ val jobContext = new Job(jobConf)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val blockList = rawSplits.map { inputSplit =>
val fileSplit = inputSplit.asInstanceOf[FileSplit]