You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2021/08/07 12:53:00 UTC

[jira] [Commented] (SPARK-36328) HadoopRDD#getPartitions fetches FileSystem Delegation Token for every partition

    [ https://issues.apache.org/jira/browse/SPARK-36328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17395218#comment-17395218 ] 

Apache Spark commented on SPARK-36328:
--------------------------------------

User 'Shockang' has created a pull request for this issue:
https://github.com/apache/spark/pull/33674

> HadoopRDD#getPartitions fetches FileSystem Delegation Token for every partition
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-36328
>                 URL: https://issues.apache.org/jira/browse/SPARK-36328
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.1.2
>            Reporter: Prabhu Joseph
>            Priority: Major
>
> Spark Job creates a separate JobConf for every RDD (every hive table partition) in HadoopRDD#getPartitions.
> {code}
>   override def getPartitions: Array[Partition] = {
>     val jobConf = getJobConf()
>     // add the credentials here as this can be called before SparkContext initialized
>     SparkHadoopUtil.get.addCredentials(jobConf)
> {code}
> Hadoop FileSystem fetches FileSystem Delegation Token and sets into the Credentials which is part of JobConf. On further requests, will reuse the token from the Credentials if already exists.
> {code}
>        if (serviceName != null) { // fs has token, grab it
>       final Text service = new Text(serviceName);
>       Token<?> token = credentials.getToken(service);
>       if (token == null) {
>         token = getDelegationToken(renewer);
>         if (token != null) {
>           tokens.add(token);
>           credentials.addToken(service, token);
>         }
>       }
>     }
> {code}
>  But since Spark Job creates a new JobConf (which will have a new Credentials) for every hive table partition, the token is not reused and gets fetched for every partition. This is slowing down the query as each delegation token has to go through KDC and SSL handshake on Secure Clusters.
> *Improvement:*
> Spark can add the credentials from previous JobConf into the new JobConf to reuse the FileSystem Delegation Token similar to how the User Credentials are added into JobConf after construction.
> {code}
>      val jobConf = getJobConf()
>     // add the credentials here as this can be called before SparkContext initialized
>     SparkHadoopUtil.get.addCredentials(jobConf)
> {code}
> *Repro*
> {code}
> beeline>
> create table parttable (key char(1), value int) partitioned by (p int);
> insert into table parttable partition(p=100) values ('d', 1), ('e', 2), ('f', 3);
> insert into table parttable partition(p=200) values ('d', 1), ('e', 2), ('f', 3);
> insert into table parttable partition(p=300) values ('d', 1), ('e', 2), ('f', 3);
> spark-sql>
> select value, count(*) from parttable group by value
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org