You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/03/30 07:37:00 UTC
[spark] branch branch-2.3 updated: [SPARK-27301][DSTREAM] Shorten
the FileSystem cached life cycle to the cleanup method inner scope
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new b57fef9 [SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope
b57fef9 is described below
commit b57fef9f0cb29fb4bc4bb905a84fecb3a1ed5ddd
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Sat Mar 30 02:35:49 2019 -0500
[SPARK-27301][DSTREAM] Shorten the FileSystem cached life cycle to the cleanup method inner scope
## What changes were proposed in this pull request?
The cached FileSystem's token will expire if no tokens explicitly are add into it.
```scala
19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 83189
19/03/28 13:40:16 INFO rdd.MapPartitionsRDD: Removing RDD 82860 from persistence list
19/03/28 13:40:16 INFO spark.ContextCleaner: Cleaned shuffle 6005
19/03/28 13:40:16 INFO storage.BlockManager: Removing RDD 82860
19/03/28 13:40:16 INFO scheduler.ReceivedBlockTracker: Deleting batches:
19/03/28 13:40:16 INFO scheduler.InputInfoTracker: remove old batch metadata: 1553750250000 ms
19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
19/03/28 13:40:17 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
19/03/28 13:40:17 WARN security.UserGroupInformation: PriviledgedActionException as:ursHADOOP.HZ.NETEASE.COM (auth:KERBEROS) cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
19/03/28 13:40:17 WARN hdfs.LeaseRenewer: Failed to renew lease for [DFSClient_NONMAPREDUCE_-1396157959_1] for 53 seconds. Will retry shortly ...
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 53240500 for urs) is expired, current time: 2019-03-28 13:40:17,010+0800 expected renewal time: 2019-03-28 13:39:48,523+0800
at org.apache.hadoop.ipc.Client.call(Client.java:1468)
at org.apache.hadoop.ipc.Client.call(Client.java:1399)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
at com.sun.proxy.$Proxy11.renewLease(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.renewLease(ClientNamenodeProtocolTranslatorPB.java:571)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy12.renewLease(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.renewLease(DFSClient.java:878)
at org.apache.hadoop.hdfs.LeaseRenewer.renew(LeaseRenewer.java:417)
at org.apache.hadoop.hdfs.LeaseRenewer.run(LeaseRenewer.java:442)
at org.apache.hadoop.hdfs.LeaseRenewer.access$700(LeaseRenewer.java:71)
at org.apache.hadoop.hdfs.LeaseRenewer$1.run(LeaseRenewer.java:298)
at java.lang.Thread.run(Thread.java:748)
```
This PR shorten the FileSystem cached life cycle to the cleanup method inner scope in case of token expiry.
## How was this patch tested?
existing ut
Closes #24235 from yaooqinn/SPARK-27301.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>
(cherry picked from commit f4c73b7c685b901dd69950e4929c65e3b8dd3a55)
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../org/apache/spark/streaming/dstream/DStreamCheckpointData.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index e73837e..ebfaa83 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -39,7 +39,6 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
- @transient private var fileSystem: FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
/**
@@ -80,6 +79,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
// even after master fails, as the checkpoint data of `time` does not refer to those files
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
logDebug("Files to delete:\n" + filesToDelete.mkString(","))
+ var fileSystem: FileSystem = null
filesToDelete.foreach {
case (time, file) =>
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org