You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/23 09:55:05 UTC

carbondata git commit: [CARBONDATA-1796] While submitting new job to HadoopRdd, token should be generated for accessing paths

Repository: carbondata
Updated Branches:
  refs/heads/master 5a3efdbbe -> 8432860c1


[CARBONDATA-1796] While submitting new job to HadoopRdd, token should be generated for accessing paths

In hadoop secure mode cluster,
while submitting job to hadoopRdd token should be generated for the path in JobConf, else Delegation Token exception will be thrown during load.

This closes #1552


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8432860c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8432860c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8432860c

Branch: refs/heads/master
Commit: 8432860c187a6f3b6524a4e10901164b84009662
Parents: 5a3efdb
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Nov 22 17:01:08 2017 +0530
Committer: Ravindra Pesala <ra...@gmail.com>
Committed: Thu Nov 23 15:24:27 2017 +0530

----------------------------------------------------------------------
 .../spark/load/DataLoadProcessBuilderOnSpark.scala           | 6 +++++-
 .../apache/carbondata/spark/util/GlobalDictionaryUtil.scala  | 8 ++++++++
 .../apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala   | 6 +++++-
 3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8432860c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 776973b..c14e0a7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -21,8 +21,10 @@ import java.util.Comparator
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.NewHadoopRDD
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.command.ExecutionErrors
@@ -58,12 +60,14 @@ object DataLoadProcessBuilderOnSpark {
       CommonUtil.configureCSVInputFormat(hadoopConf, model)
       hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
       val columnCount = model.getCsvHeaderColumns.length
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
       new NewHadoopRDD[NullWritable, StringArrayWritable](
         sc,
         classOf[CSVInputFormat],
         classOf[NullWritable],
         classOf[StringArrayWritable],
-        hadoopConf)
+        jobConf)
         .map(x => DataLoadProcessorStepOnSpark.toStringArrayRow(x._2, columnCount))
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8432860c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index c931c44..1b3b3f9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -30,8 +30,11 @@ import org.apache.commons.lang3.{ArrayUtils, StringUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.security.TokenCache
 import org.apache.spark.{Accumulator, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{NewHadoopRDD, RDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -362,6 +365,11 @@ object GlobalDictionaryUtil {
     })
     val values = new Array[String](columnNames.length)
     val row = new StringArrayRow(values)
+    val jobConf = new JobConf(hadoopConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    TokenCache.obtainTokensForNamenodes(jobConf.getCredentials,
+      Array[Path](new Path(carbonLoadModel.getFactFilePath)),
+      jobConf)
     val rdd = new NewHadoopRDD[NullWritable, StringArrayWritable](
       sqlContext.sparkContext,
       classOf[CSVInputFormat],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8432860c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 95ccc92..0619851 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -29,9 +29,11 @@ import scala.util.control.Breaks._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
@@ -845,12 +847,14 @@ object CarbonDataRDDFactory {
       CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel)
       hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
       val columnCount = columns.length
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
       new NewHadoopRDD[NullWritable, StringArrayWritable](
         sqlContext.sparkContext,
         classOf[CSVInputFormat],
         classOf[NullWritable],
         classOf[StringArrayWritable],
-        hadoopConf
+        jobConf
       ).map { currentRow =>
         if (null == currentRow || null == currentRow._2) {
           val row = new StringArrayRow(new Array[String](columnCount))