You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jim Huang (Jira)" <ji...@apache.org> on 2020/12/24 22:10:00 UTC

[jira] [Commented] (SPARK-31685) Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue

    [ https://issues.apache.org/jira/browse/SPARK-31685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17254670#comment-17254670 ] 

Jim Huang commented on SPARK-31685:
-----------------------------------

Thanks Rajeev for all the work you have done on this ticket so far.  

I am running into the same issue with the following stack and spark version
 * Hadoop 2.7.3
 * spark-2.4.5-bin-without-hadoop
 * yarn-client mode

The logic path to trigger this bug is a bit elusive and difficult to pinpoint because I have observed only few occurrences and each time the complete runtime duration of the Spark Structure Streaming job's wallclock time seem to be random (hundreds of hours). 

Currently, I *believe* it may have to do with another runtime event taking place.  The current hypothesis: when the original YARN node running one of the Spark executor fails for any reason (i.e. YARN healthcheck-script, YARN node being decommissioned, YARN container preemption, etc..), a new YARN container is assigned and started up on another YARN node by the YARN AM (Application Manager).  The symptom is reported by YARN AM that it tried to restart that particular Spark executor task within that container 3 times and failed with the exact error message reported here and caused the entire Spark job to fail.  I believe this external event is another logic path that eventually hit the code you are testing.  

 

> Spark structured streaming with Kafka fails with HDFS_DELEGATION_TOKEN expiration issue
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-31685
>                 URL: https://issues.apache.org/jira/browse/SPARK-31685
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.4
>         Environment: spark-2.4.4-bin-hadoop2.7
>            Reporter: Rajeev Kumar
>            Priority: Major
>
> I am facing issue for spark-2.4.4-bin-hadoop2.7. I am using spark structured streaming with Kafka. Reading the stream from Kafka and saving it to HBase.
> I get this error on the driver after 24 hours.
>  
> {code:java}
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired
>         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>         at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source)
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
>         at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>         at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
>         at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
>         at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:130)
>         at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1169)
>         at org.apache.hadoop.fs.FileContext$15.next(FileContext.java:1165)
>         at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>         at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1171)
>         at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1630)
>         at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142)
>         at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:381)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         ... 1 more         ApplicationMaster host: <host>
>          ApplicationMaster RPC port: <port>
>          queue: default
>          start time: <start time>
>          final status: FAILED
>          tracking URL: <tracking url>
>          user: <user>
> {code}
>  
> I am putting the logs from my application (after removing ip and username).
> When application starts it prints this log. We can see it is creating the HDFS_DELEGATION_TOKEN (token id = 6972072)
> Driver Log -
> {code:java}
> 20/03/17 13:24:09 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 6972072 for <user> on ha-hdfs:<name>20/03/17 13:24:09 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_631280530_1, ugi=<user>@<abc.com> (auth:KERBEROS)]]20/03/17 13:24:09 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 6972073 for <user> on ha-hdfs:<name>20/03/17 13:24:09 INFO HadoopFSDelegationTokenProvider: Renewal interval is 86400039 for token HDFS_DELEGATION_TOKEN20/03/17 13:24:10 DEBUG HadoopDelegationTokenManager: Service hive does not require a token. Check your configuration to see if security is disabled or not.20/03/17 13:24:11 DEBUG HBaseDelegationTokenProvider: Attempting to fetch HBase security token.20/03/17 13:24:12 INFO HBaseDelegationTokenProvider: Get token from HBase: Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@d1)20/03/17 13:24:12 INFO AMCredentialRenewer: Scheduling login from keytab in 18.0 h.
> {code}
> After 18 hours as mentioned in log, it created new tokens also. Token number is increased (7041621).
> Driver logs -
> {code:java}
> 20/03/18 07:24:10 INFO AMCredentialRenewer: Attempting to login to KDC using principal: <user>20/03/18 07:24:10 INFO AMCredentialRenewer: Successfully logged into KDC.20/03/18 07:24:16 DEBUG HadoopFSDelegationTokenProvider: Delegation token renewer is: rm/<host_name>@<abc.com>20/03/18 07:24:16 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-2072296893_22, ugi=<user>@<abc.com>  (auth:KERBEROS)]]20/03/18 07:24:16 INFO DFSClient: Created HDFS_DELEGATION_TOKEN token 7041621 for <user> on ha-hdfs:<name>20/03/18 07:24:16 DEBUG HadoopDelegationTokenManager: Service hive does not require a token. Check your configuration to see if security is disabled or not.20/03/18 07:24:16 DEBUG HBaseDelegationTokenProvider: Attempting to fetch HBase security token.20/03/18 07:24:16 INFO HBaseDelegationTokenProvider: Get token from HBase: Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102)20/03/18 07:24:16 INFO AMCredentialRenewer: Scheduling login from keytab in 18.0 h.20/03/18 07:24:16 INFO AMCredentialRenewer: Updating delegation tokens.20/03/18 07:24:17 INFO SparkHadoopUtil: Updating delegation tokens for current user.20/03/18 07:24:17 DEBUG SparkHadoopUtil: Adding/updating delegation tokens List(Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102); org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102, Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:<name>, Ident: (HDFS_DELEGATION_TOKEN token 7041621 for <user>); HDFS_DELEGATION_TOKEN token 7041621 for <user>; Renewer: yarn; Issued: 3/18/20 7:24 AM; Max Date: 3/25/20 7:24 AM)20/03/18 07:24:17 INFO SparkHadoopUtil: Updating delegation tokens for current user.20/03/18 07:24:17 DEBUG SparkHadoopUtil: Adding/updating delegation tokens List(Kind: HBASE_AUTH_TOKEN, Service: 812777c29d67, Ident: (org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102); org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier@102, Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:<name>, Ident: (HDFS_DELEGATION_TOKEN token 7041621 for <user>); HDFS_DELEGATION_TOKEN token 7041621 for <user>; Renewer: yarn; Issued: 3/18/20 7:24 AM; Max Date: 3/25/20 7:24 AM)
> {code}
> Everything goes fine till 24 hours. After that I see LeaseRenewer exception. But it is picking the older token number (6972072).This behaviour is same even if I use "--conf spark.hadoop.fs.hdfs.impl.disable.cache=true"
> Driver log -
>  
>  
> {code:java}
> 20/03/18 13:24:28 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired20/03/18 13:24:28 WARN LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_631280530_1] for 30 seconds.  Will retry shortly ...org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired        at org.apache.hadoop.ipc.Client.call(Client.java:1475)        at org.apache.hadoop.ipc.Client.call(Client.java:1412)        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)        at com.sun.proxy.$Proxy10.renewLease(Unknown Source)        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:590)        at sun.reflect.GeneratedMethodAccessor59.invoke(Unknown Source)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:497)        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)        at com.sun.proxy.$Proxy11.renewLease(Unknown Source)        at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:892)        at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:423)        at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:448)        at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)        at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:304)        at java.lang.Thread.run(Thread.java:745) 20/03/18 13:24:29 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired
> {code}
>  
> Now, when the application receives new stream, it fails with the error -
> Driver log -
>  
> {code:java}
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 6972072 for <user>) is expired
> {code}
>  
> I can see below is executor log. Nothing abnormal.
> {code:java}
> INFO CoarseGrainedExecutorBackend: Received tokens of 330 bytes
> INFO SparkHadoopUtil: Updating delegation tokens for current user.
> DEBUG SparkHadoopUtil: Adding/updating delegation tokens List(Kind: HBASE_AUTH_TOKEN, Service: <some id>, Ident: <some number>; null, Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:<host>, Ident: (HDFS_DELEGATION_TOKEN token <number> for <user>); HDFS_DELEGATION_TOKEN token <number> for <user>; Renewer: yarn; Issued: 3/10/20 3:58 PM; Max Date: 3/17/20 3:58 PM)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org