You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/30 07:37:00 UTC

[spark] branch branch-2.3 updated: [SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new b57fef9  [SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope
b57fef9 is described below

commit b57fef9f0cb29fb4bc4bb905a84fecb3a1ed5ddd
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sat Mar 30 02:35:49 2019 -0500

    [SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope
    
    ## What changes were proposed in this pull request?
    
    The cached FileSystem's token will expire if no tokens explicitly are add into it.
    
    ```scala
    19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 83189
    19/03/28 13:40:16 INFO rdd.MapPartitionsRDD: Removing RDD 82860 from persistence list
    19/03/28 13:40:16 INFO spark.ContextCleaner: Cleaned shuffle 6005
    19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 82860
    19/03/28 13:40:16 INFO scheduler.ReceivedBlockTracker: Deleting batches:
    19/03/28 13:40:16 INFO scheduler.InputInfoTracker: remove old batch metadata: 1553750250000 ms
    19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
    19/03/28 13:40:17 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
    19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
    19/03/28 13:40:17 WARN hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1396157959_1] for 53 seconds. Will retry shortly ...
    org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
    at org.apache.hadoop.ipc.Client.call(Client.java:1468)
    at org.apache.hadoop.ipc.Client.call(Client.java:1399)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
    at com.sun.proxy.$Proxy11.renewLease(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571)
    at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.renewLease(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:878)
    at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417)
    at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442)
    at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
    at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298)
    at java.lang.Thread.run(Thread.java:748)
    ```
    
    This PR shorten the FileSystem cached life cycle to the cleanup method inner scope in case of token expiry.
    
    ## How was this patch tested?
    
    existing ut
    
    Closes #24235 from yaooqinn/SPARK-27301.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
    (cherry picked from commit f4c73b7c685b901dd69950e4929c65e3b8dd3a55)
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../org/apache/spark/streaming/dstream/DStreamCheckpointData.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index e73837e..ebfaa83 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -39,7 +39,6 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
   // in that batch's checkpoint data
   @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
 
-  @transient private var fileSystem: FileSystem = null
   protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
 
   /**
@@ -80,6 +79,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
         // even after master fails, as the checkpoint data of `time` does not refer to those files
         val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
         logDebug("Files to delete:\n" + filesToDelete.mkString(","))
+        var fileSystem: FileSystem = null
         filesToDelete.foreach {
           case (time, file) =>
             try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org