You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/11/30 22:00:57 UTC

[spark] branch branch-3.0 updated: [SPARK-33440][CORE] Use current timestamp with warning log in HadoopFSDelegationTokenProvider when the issue date for token is not set up properly

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 242581f  [SPARK-33440][CORE] Use current timestamp with warning log in HadoopFSDelegationTokenProvider when the issue date for token is not set up properly
242581f is described below

commit 242581f4926c994bfc5af388cae31645112b2798
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Tue Dec 1 06:44:15 2020 +0900

    [SPARK-33440][CORE] Use current timestamp with warning log in HadoopFSDelegationTokenProvider when the issue date for token is not set up properly
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to use current timestamp with warning log when the issue date for token is not set up properly. The next section will explain the rationalization with details.
    
    ### Why are the changes needed?
    
    Unfortunately not every implementations respect the `issue date` in `AbstractDelegationTokenIdentifier`, which Spark relies on while calculating. The default value of issue date is 0L, which is far from actual issue date, breaking logic on calculating next renewal date under some circumstance, leading to 0 interval (immediate) on rescheduling token renewal.
    
    In HadoopFSDelegationTokenProvider, Spark calculates token renewal interval as below:
    
    https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L123-L134
    
    The interval is calculated as `token.renew() - identifier.getIssueDate`, which is providing correct interval assuming both `token.renew()` and `identifier.getIssueDate` produce correct value, but it's going to be weird when `identifier.getIssueDate` provides 0L (default value), like below:
    
    ```
    20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal interval is 1603175657000 for token S3ADelegationToken/IDBroker
    20/10/13 06:34:19 INFO security.HadoopFSDelegationTokenProvider: Renewal interval is 86400048 for token HDFS_DELEGATION_TOKEN
    ```
    
    Hopefully we pick the minimum value as safety guard (so in this case, `86400048` is being picked up), but the safety guard leads unintentional bad impact on this case.
    
    https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala#L58-L71
    
    Spark leverages the interval being calculated in above, "minimum" value of intervals, and blindly adds the value to token's issue date to calculates the next renewal date for the token, and picks "minimum" value again. In problematic case, the value would be `86400048` (86400048 + 0) which is quite smaller than current timestamp.
    
    https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L228-L234
    
    The next renewal date is subtracted with current timestamp again to get the interval, and multiplexed by configured ratio to produce the final schedule interval. In problematic case, this value goes to negative.
    
    https://github.com/apache/spark/blob/2c64b731ae6a976b0d75a95901db849b4a0e2393/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala#L180-L188
    
    There's a safety guard to not allow negative value, but that's simply 0 meaning schedule immediately. This triggers next calculation of next renewal date to calculate the schedule interval, lead to the same behavior, hence updating delegation token immediately and continuously.
    
    As we fetch token just before the calculation happens, the actual issue date is likely slightly before, hence it's not that dangerous to use current timestamp as issue date for the token the issue date has not been set up properly. Still, it's better not to leave the token implementation as it is, so we log warn message to let end users consult with token implementer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. End users won't encounter the tight loop of schedule of token renewal after the PR. In end users' perspective of reflection, there's nothing end users need to change.
    
    ### How was this patch tested?
    
    Manually tested with problematic environment.
    
    Closes #30366 from HeartSaVioR/SPARK-33440.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    (cherry picked from commit f5d2165c95fe83f24be9841807613950c1d5d6d0)
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
---
 .../security/HadoopDelegationTokenManager.scala    |  4 +++-
 .../security/HadoopFSDelegationTokenProvider.scala | 27 +++++++++++++++++++---
 2 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 3168c76..6ce195b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -178,7 +178,7 @@ private[spark] class HadoopDelegationTokenManager(
 
   private def scheduleRenewal(delay: Long): Unit = {
     val _delay = math.max(0, delay)
-    logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
+    logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(_delay)}.")
 
     val renewalTask = new Runnable() {
       override def run(): Unit = {
@@ -230,6 +230,8 @@ private[spark] class HadoopDelegationTokenManager(
         val now = System.currentTimeMillis
         val ratio = sparkConf.get(CREDENTIALS_RENEWAL_INTERVAL_RATIO)
         val delay = (ratio * (nextRenewal - now)).toLong
+        logInfo(s"Calculated delay on renewal is $delay, based on next renewal $nextRenewal " +
+          s"and the ratio $ratio, and current time $now")
         scheduleRenewal(delay)
         creds
       }
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 4e91e72..cd9516b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -63,7 +63,8 @@ private[deploy] class HadoopFSDelegationTokenProvider
             val identifier = token
               .decodeIdentifier()
               .asInstanceOf[AbstractDelegationTokenIdentifier]
-            identifier.getIssueDate + interval
+            val tokenKind = token.getKind.toString
+            getIssueDate(tokenKind, identifier) + interval
           }
         if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
       }
@@ -126,13 +127,33 @@ private[deploy] class HadoopFSDelegationTokenProvider
       Try {
         val newExpiration = token.renew(hadoopConf)
         val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
-        val interval = newExpiration - identifier.getIssueDate
-        logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
+        val tokenKind = token.getKind.toString
+        val interval = newExpiration - getIssueDate(tokenKind, identifier)
+        logInfo(s"Renewal interval is $interval for token $tokenKind")
         interval
       }.toOption
     }
     if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
   }
+
+  private def getIssueDate(kind: String, identifier: AbstractDelegationTokenIdentifier): Long = {
+    val now = System.currentTimeMillis()
+    val issueDate = identifier.getIssueDate
+    if (issueDate > now) {
+      logWarning(s"Token $kind has set up issue date later than current time. (provided: " +
+        s"$issueDate / current timestamp: $now) Please make sure clocks are in sync between " +
+        "machines. If the issue is not a clock mismatch, consult token implementor to check " +
+        "whether issue date is valid.")
+      issueDate
+    } else if (issueDate > 0L) {
+      issueDate
+    } else {
+      logWarning(s"Token $kind has not set up issue date properly. (provided: $issueDate) " +
+        s"Using current timestamp ($now) as issue date instead. Consult token implementor to fix " +
+        "the behavior.")
+      now
+    }
+  }
 }
 
 private[deploy] object HadoopFSDelegationTokenProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org