You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/06/17 01:10:24 UTC

svn commit: r1136736 - in /hadoop/common/branches/branch-0.20-security-204: CHANGES.txt src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java

Author: omalley
Date: Thu Jun 16 23:10:24 2011
New Revision: 1136736

URL: http://svn.apache.org/viewvc?rev=1136736&view=rev
Log:
MAPREDUCE-2452. Moves the cancellation of delegation tokens to a separate
thread. (ddas)

Modified:
    hadoop/common/branches/branch-0.20-security-204/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java

Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136736&r1=1136735&r2=1136736&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 23:10:24 2011
@@ -14,6 +14,9 @@ Release 0.20.204.0 - unreleased
     MAPREDUCE-2495. exit() the TaskTracker when the distributed cache cleanup
     thread dies. (Robert Joseph Evans via cdouglas)
 
+    MAPREDUCE-2452. Moves the cancellation of delegation tokens to a separate
+    thread. (ddas)
+
     MAPREDUCE-2555. Avoid sprious logging from completedtasks. (Thomas Graves
     via cdouglas)
 

Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 23:10:24 2011
@@ -1,5 +1,5 @@
 /hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1101315,1101629,1101729,1102378,1102869,1103940,1125139,1125170,1125587,1125589,1127362,1131277,1131286,1131290,1131299,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1101315,1101629,1101729,1102378,1102869,1103940,1125139,1125170,1125587,1125589,1127362,1130409,1131277,1131286,1131290,1131299,1131737,1134140
 /hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
 /hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1132788,1133133,1133274,1133282
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java?rev=1136736&r1=1136735&r2=1136736&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java Thu Jun 16 23:10:24 2011
@@ -33,6 +33,7 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +49,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.util.StringUtils;
 
 
 //@InterfaceAudience.Private
@@ -100,10 +102,75 @@ public class DelegationTokenRenewal {
   // global single timer (daemon)
   private static Timer renewalTimer = new Timer(true);
   
+  //delegation token canceler thread
+  private static DelegationTokenCancelThread dtCancelThread =
+    new DelegationTokenCancelThread();
+  static {
+    dtCancelThread.start();
+  }
+
+  
   //managing the list of tokens using Map
   // jobId=>List<tokens>
   private static Set<DelegationTokenToRenew> delegationTokens = 
     Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+  
+  private static class DelegationTokenCancelThread extends Thread {
+    private static class TokenWithConf {
+      Token<DelegationTokenIdentifier> token;
+      Configuration conf;
+      TokenWithConf(Token<DelegationTokenIdentifier> token,  
+          Configuration conf) {
+        this.token = token;
+        this.conf = conf;
+      }
+    }
+    private LinkedBlockingQueue<TokenWithConf> queue =  
+      new LinkedBlockingQueue<TokenWithConf>();
+     
+    public DelegationTokenCancelThread() {
+      super("Delegation Token Canceler");
+      setDaemon(true);
+    }
+    public void cancelToken(Token<DelegationTokenIdentifier> token,  
+        Configuration conf) {
+      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
+      while (!queue.offer(tokenWithConf)) {
+        LOG.warn("Unable to add token " + token + " for cancellation. " +
+        		 "Will retry..");
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public void run() {
+      while (true) {
+        TokenWithConf tokenWithConf = null;
+        try {
+          tokenWithConf = queue.take();
+          DistributedFileSystem dfs = getDFSForToken(tokenWithConf.token,  
+              tokenWithConf.conf);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Canceling token " + tokenWithConf.token.getService() +  
+                " for dfs=" + dfs);
+          }
+          dfs.cancelDelegationToken(tokenWithConf.token);
+        } catch (IOException e) {
+          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
+              StringUtils.stringifyException(e));
+        } catch (InterruptedException ie) {
+          return;
+        } catch (Throwable t) {
+          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
+                   ". Exiting..");
+          System.exit(-1);
+        }
+      }
+    }
+  }
   //adding token
   private static void addTokenToList(DelegationTokenToRenew t) {
     delegationTokens.add(t);
@@ -309,15 +376,7 @@ public class DelegationTokenRenewal {
     Configuration conf = t.conf;
     
     if(token.getKind().equals(kindHdfs)) {
-      try {
-        DistributedFileSystem dfs = getDFSForToken(token, conf);
-        if (LOG.isDebugEnabled())
-          LOG.debug("canceling token " + token.getService() + " for dfs=" +
-              dfs);
-        dfs.cancelDelegationToken(token);
-      } catch (Exception e) {
-        LOG.warn("Failed to cancel " + token, e);
-      }
+      dtCancelThread.cancelToken(token, conf);
     }
   }