You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rajeev Kumar (Jira)" <ji...@apache.org> on 2020/05/04 04:43:00 UTC

[jira] [Commented] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache

    [ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17098651#comment-17098651 ] 

Rajeev Kumar commented on SPARK-26385:
--------------------------------------

Sorry for coming late on this thread. I was doing some testing. I found one issue in HadoopFSDelegationTokenProvider. I might be wrong also. Please validate below.

Spark does not renew the token rather it creates the token at the scheduled interval.

Spark needs two HDFS_DELEGATION_TOKEN. One for resource manager and second for application user.

 
{code:java}
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds) 
// Get the token renewal interval if it is not set. It will only be called once. 
if (tokenRenewalInterval == null) { 
     tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens) 
}
{code}
At the first call to the obtainDelegationTokens it creates TWO tokens correctly.

Token for resource manager is getting created by method fetchDelegationTokens.

Token for application user is getting created inside getTokenRenewalInterval method.
{code:java}
// Code snippet
private var tokenRenewalInterval: Option[Long] = null
{code}
{code:java}
sparkConf.get(PRINCIPAL).flatMap { renewer => 
  val creds = new Credentials() 
  fetchDelegationTokens(renewer, filesystems, creds) 
{code}
But after 18 hours or whatever the renewal period when scheduled thread of AMCredentialRenewer tries to create HDFS_DELEFATION_TOKEN, it creates only one token (for resource manager as result of call to fetchDelegationTokens method ).

But it does not create HDFS_DELEFATION_TOKEN for application user because tokenRenewalInterval is NOT NULL this time. Hence after expiration of HDFS_DELEFATION_TOKEN (typically 24 hrs) spark fails to update the spark checkpointing directory and job dies.

As part of my testing, I just called getTokenRenewalInterval in else block and job is running fine. It did not die after 24 hrs.
{code:java}
if (tokenRenewalInterval == null) {
// I put this custom log
  logInfo("Token Renewal interval is null. Calling getTokenRenewalInterval "
    + getTokenRenewer(hadoopConf))
  tokenRenewalInterval =
    getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
} else {
// I put this custom log
  logInfo("Token Renewal interval is NOT null. Calling getTokenRenewalInterval "
    + getTokenRenewer(hadoopConf))
  getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
}
{code}
 Logs are -
{code:java}
20/05/01 14:36:19 INFO HadoopFSDelegationTokenProvider: Token Renewal interval is null. Calling getTokenRenewalInterval rm/host:port
20/05/02 08:36:42 INFO HadoopFSDelegationTokenProvider: Token Renewal interval is NOT null. Calling getTokenRenewalInterval rm/host:port
20/05/03 02:37:00 INFO HadoopFSDelegationTokenProvider: Token Renewal interval is NOT null. Calling getTokenRenewalInterval rm/host:port
20/05/03 20:37:18 INFO HadoopFSDelegationTokenProvider: Token Renewal interval is NOT null. Calling getTokenRenewalInterval rm/host:port
{code}
 

 

> YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-26385
>                 URL: https://issues.apache.org/jira/browse/SPARK-26385
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Hadoop 2.6.0, Spark 2.4.0
>            Reporter: T M
>            Priority: Major
>
>  
> Hello,
>  
> I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, Spark 2.4.0). After 25-26 hours, my job stops working with following error:
> {code:java}
> 2018-12-16 22:35:17 ERROR org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, realUser=, issueDate=1544903057122, maxDate=1545507857122, sequenceNumber=10314, masterKeyId=344) can't be found in cache at org.apache.hadoop.ipc.Client.call(Client.java:1470) at org.apache.hadoop.ipc.Client.call(Client.java:1401) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code}
>  
> ^It is important to notice that I tried usual fix for this kind of problems:^
>  
> {code:java}
> --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true"
>  
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org