You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/23 09:55:05 UTC
carbondata git commit: [CARBONDATA-1796] While submitting new job to
HadoopRdd, token should be generated for accessing paths
Repository: carbondata
Updated Branches:
refs/heads/master 5a3efdbbe -> 8432860c1
[CARBONDATA-1796] While submitting new job to HadoopRdd, token should be generated for accessing paths
In hadoop secure mode cluster,
while submitting job to hadoopRdd token should be generated for the path in JobConf, else Delegation Token exception will be thrown during load.
This closes #1552
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8432860c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8432860c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8432860c
Branch: refs/heads/master
Commit: 8432860c187a6f3b6524a4e10901164b84009662
Parents: 5a3efdb
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Nov 22 17:01:08 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Nov 23 15:24:27 2017 +0530
----------------------------------------------------------------------
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 6 +++++-
.../apache/carbondata/spark/util/GlobalDictionaryUtil.scala | 8 ++++++++
.../apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala | 6 +++++-
3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8432860c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 776973b..c14e0a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -21,8 +21,10 @@ import java.util.Comparator
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.command.ExecutionErrors
@@ -58,12 +60,14 @@ object DataLoadProcessBuilderOnSpark {
CommonUtil.configureCSVInputFormat(hadoopConf, model)
hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
val columnCount = model.getCsvHeaderColumns.length
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
new NewHadoopRDD[NullWritable, StringArrayWritable](
sc,
classOf[CSVInputFormat],
classOf[NullWritable],
classOf[StringArrayWritable],
- hadoopConf)
+ jobConf)
.map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8432860c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index c931c44..1b3b3f9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -30,8 +30,11 @@ import org.apache.commons.lang3.{ArrayUtils, StringUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.security.TokenCache
import org.apache.spark.{Accumulator, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -362,6 +365,11 @@ object GlobalDictionaryUtil {
})
val values = new Array[String](columnNames.length)
val row = new StringArrayRow(values)
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+ Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+ jobConf)
val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
sqlContext.sparkContext,
classOf[CSVInputFormat],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8432860c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 95ccc92..0619851 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,9 +29,11 @@ import scala.util.control.Breaks._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
@@ -845,12 +847,14 @@ object CarbonDataRDDFactory {
CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
val columnCount = columns.length
+ val jobConf = new JobConf(hadoopConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
new NewHadoopRDD[NullWritable, StringArrayWritable](
sqlContext.sparkContext,
classOf[CSVInputFormat],
classOf[NullWritable],
classOf[StringArrayWritable],
- hadoopConf
+ jobConf
).map { currentRow =>
if (null == currentRow || null == currentRow._2) {
val row = new StringArrayRow(new Array[String](columnCount))