You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/21 01:17:58 UTC

git commit: TEZ-949. Handle Session Tokens for Recovery. (hitesh)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 2775c8ad0 -> 9bca4c96b


TEZ-949. Handle Session Tokens for Recovery. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9bca4c96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9bca4c96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9bca4c96

Branch: refs/heads/master
Commit: 9bca4c96b2e35c9a2dcb7dde927b22976f6b6c26
Parents: 2775c8a
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Mar 20 17:17:27 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Mar 20 17:17:27 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezClient.java   |  13 ++
 .../org/apache/tez/client/TezClientUtils.java   |  19 +++
 .../java/org/apache/tez/client/TezSession.java  |   7 +
 .../common/security/JobTokenSecretManager.java  | 137 ++++++++++++++++++
 .../apache/tez/common/security/TokenCache.java  |   6 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  26 ++--
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   2 +-
 .../common/security/JobTokenSecretManager.java  | 138 -------------------
 .../common/security/SecureShuffleUtils.java     |   1 +
 .../library/shuffle/common/ShuffleUtils.java    |   2 +-
 10 files changed, 194 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
index 95dc798..61de7e0 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -21,11 +21,14 @@ package org.apache.tez.client;
 import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -33,6 +36,9 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -46,6 +52,8 @@ public class TezClient {
   private final YarnConfiguration yarnConf;
   private YarnClient yarnClient;
   Map<String, LocalResource> tezJarResources = null;
+  private JobTokenSecretManager jobTokenSecretManager =
+      new JobTokenSecretManager();
 
   /**
    * <p>
@@ -88,6 +96,11 @@ public class TezClient {
       if (credentials == null) {
         credentials = new Credentials();
       }
+
+      // Add session token for shuffle
+      TezClientUtils.createSessionToken(appId.toString(),
+          jobTokenSecretManager, credentials);
+
       // Add credentials for tez-local resources.
       Map<String, LocalResource> tezJarResources = getTezJarResources(credentials);
       ApplicationSubmissionContext appContext = TezClientUtils.createApplicationSubmissionContext(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index d47ff80..ca37313 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.Vector;
 import java.util.Map.Entry;
 
@@ -49,10 +50,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -75,6 +78,8 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.impl.LogUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -752,4 +757,18 @@ public class TezClientUtils {
     }
     return proxy;
   }
+
+  @Private
+  public static void createSessionToken(String tokenIdentifier,
+      JobTokenSecretManager jobTokenSecretManager,
+      Credentials credentials) {
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+        tokenIdentifier));
+    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+        jobTokenSecretManager);
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setSessionToken(sessionToken, credentials);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index f16e509..055218e 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.common.TezYARNUtils;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -68,6 +69,8 @@ public class TezSession {
   /** Tokens which will be required for all DAGs submitted to this session. */
   private Credentials sessionCredentials = new Credentials();
   private long clientTimeout;
+  private JobTokenSecretManager jobTokenSecretManager =
+      new JobTokenSecretManager();
 
   public TezSession(String sessionName,
       ApplicationId applicationId,
@@ -111,6 +114,10 @@ public class TezSession {
             getNewApplicationResponse().getApplicationId();
       }
 
+      // Add session token for shuffle
+      TezClientUtils.createSessionToken(applicationId.toString(),
+          jobTokenSecretManager, sessionCredentials);
+
       ApplicationSubmissionContext appContext =
           TezClientUtils.createApplicationSubmissionContext(
               sessionConfig.getTezConfiguration(), applicationId,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java b/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
new file mode 100644
index 0000000..d793b82
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/security/JobTokenSecretManager.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.security;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * SecretManager for job token. It can be used to cache generated job tokens.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
+  private final SecretKey masterKey;
+  private final Map<String, SecretKey> currentJobTokens;
+
+  /**
+   * Convert the byte[] to a secret key
+   * @param key the byte[] to create the secret key from
+   * @return the secret key
+   */
+  public static SecretKey createSecretKey(byte[] key) {
+    return SecretManager.createSecretKey(key);
+  }
+  
+  /**
+   * Compute the HMAC hash of the message using the key
+   * @param msg the message to hash
+   * @param key the key to use
+   * @return the computed hash
+   */
+  public static byte[] computeHash(byte[] msg, SecretKey key) {
+    return createPassword(msg, key);
+  }
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenSecretManager() {
+    this.masterKey = generateSecret();
+    this.currentJobTokens = new TreeMap<String, SecretKey>();
+  }
+  
+  /**
+   * Create a new password/secret for the given job token identifier.
+   * @param identifier the job token identifier
+   * @return token password/secret
+   */
+  @Override
+  public byte[] createPassword(JobTokenIdentifier identifier) {
+    byte[] result = createPassword(identifier.getBytes(), masterKey);
+    return result;
+  }
+
+  /**
+   * Add the job token of a job to cache
+   * @param jobId the job that owns the token
+   * @param token the job token
+   */
+  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
+    SecretKey tokenSecret = createSecretKey(token.getPassword());
+    synchronized (currentJobTokens) {
+      currentJobTokens.put(jobId, tokenSecret);
+    }
+  }
+
+  /**
+   * Remove the cached job token of a job from cache
+   * @param jobId the job whose token is to be removed
+   */
+  public void removeTokenForJob(String jobId) {
+    synchronized (currentJobTokens) {
+      currentJobTokens.remove(jobId);
+    }
+  }
+  
+  /**
+   * Look up the token password/secret for the given jobId.
+   * @param jobId the jobId to look up
+   * @return token password/secret as SecretKey
+   * @throws InvalidToken
+   */
+  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
+    SecretKey tokenSecret = null;
+    synchronized (currentJobTokens) {
+      tokenSecret = currentJobTokens.get(jobId);
+    }
+    if (tokenSecret == null) {
+      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
+    }
+    return tokenSecret;
+  }
+  
+  /**
+   * Look up the token password/secret for the given job token identifier.
+   * @param identifier the job token identifier to look up
+   * @return token password/secret as byte[]
+   * @throws InvalidToken
+   */
+  @Override
+  public byte[] retrievePassword(JobTokenIdentifier identifier)
+      throws InvalidToken {
+    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
+  }
+
+  /**
+   * Create an empty job token identifier
+   * @return a newly created empty job token identifier
+   */
+  @Override
+  public JobTokenIdentifier createIdentifier() {
+    return new JobTokenIdentifier();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
index 59472a9..6c0b1fc 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
@@ -131,6 +131,10 @@ public class TokenCache {
   @SuppressWarnings("unchecked")
   @InterfaceAudience.Private
   public static Token<JobTokenIdentifier> getSessionToken(Credentials credentials) {
-    return (Token<JobTokenIdentifier>) credentials.getToken(SESSION_TOKEN);
+    Token<?> token = credentials.getToken(SESSION_TOKEN);
+    if (token == null) {
+      return null;
+    }
+    return (Token<JobTokenIdentifier>) token;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 6db1647..cb9f2fc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -51,7 +50,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -138,7 +136,7 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.Graph;
-import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.codehaus.jettison.json.JSONException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -222,7 +220,6 @@ public class DAGAppMaster extends AbstractService {
   private Path recoveryDataDir;
   private Path currentRecoveryDataDir;
   private FileSystem recoveryFS;
-  private int recoveryBufferSize;
 
   protected boolean isLastAMRetry = false;
 
@@ -295,17 +292,17 @@ public class DAGAppMaster extends AbstractService {
     containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
     addIfService(containerHeartbeatHandler, true);
 
-    String sessionTokenUUID = UUID.randomUUID().toString();
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
-        sessionTokenUUID));
-    sessionToken = new Token<JobTokenIdentifier>(identifier,
-        jobTokenSecretManager);
-    sessionToken.setService(identifier.getJobId());
-    TokenCache.setSessionToken(sessionToken, amTokens);
+    sessionToken =
+        TokenCache.getSessionToken(amTokens);
+    if (sessionToken == null) {
+      throw new RuntimeException("Could not find session token in AM Credentials");
+    }
+
     // Prepare the TaskAttemptListener server for authentication of Containers
     // TaskAttemptListener gets the information via jobTokenSecretManager.
-    jobTokenSecretManager.addTokenForJob(sessionTokenUUID, sessionToken);
-    LOG.info("Adding session token to jobTokenSecretManager for sessionTokenUUID: " + sessionTokenUUID);
+    LOG.info("Adding session token to jobTokenSecretManager for application");
+    jobTokenSecretManager.addTokenForJob(
+        appAttemptID.getApplicationId().toString(), sessionToken);
 
     //service to handle requests to TaskUmbilicalProtocol
     taskAttemptListener = createTaskAttemptListener(context,
@@ -363,9 +360,6 @@ public class DAGAppMaster extends AbstractService {
     currentRecoveryDataDir = new Path(recoveryDataDir,
         Integer.toString(this.appAttemptID.getAttemptId()));
     recoveryFS = FileSystem.get(recoveryDataDir.toUri(), conf);
-    recoveryBufferSize = conf.getInt(
-        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
-        TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
 
     if (isSession) {
       FileInputStream sessionResourcesStream = null;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 73133a0..2e5ca02 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -59,7 +59,7 @@ import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.JobTokenSecretManager;
 
 import com.google.common.collect.Maps;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
deleted file mode 100644
index 66153af..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/JobTokenSecretManager.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.security;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.Token;
-import org.apache.tez.common.security.JobTokenIdentifier;
-
-/**
- * SecretManager for job token. It can be used to cache generated job tokens.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class JobTokenSecretManager extends SecretManager<JobTokenIdentifier> {
-  private final SecretKey masterKey;
-  private final Map<String, SecretKey> currentJobTokens;
-
-  /**
-   * Convert the byte[] to a secret key
-   * @param key the byte[] to create the secret key from
-   * @return the secret key
-   */
-  public static SecretKey createSecretKey(byte[] key) {
-    return SecretManager.createSecretKey(key);
-  }
-  
-  /**
-   * Compute the HMAC hash of the message using the key
-   * @param msg the message to hash
-   * @param key the key to use
-   * @return the computed hash
-   */
-  public static byte[] computeHash(byte[] msg, SecretKey key) {
-    return createPassword(msg, key);
-  }
-  
-  /**
-   * Default constructor
-   */
-  public JobTokenSecretManager() {
-    this.masterKey = generateSecret();
-    this.currentJobTokens = new TreeMap<String, SecretKey>();
-  }
-  
-  /**
-   * Create a new password/secret for the given job token identifier.
-   * @param identifier the job token identifier
-   * @return token password/secret
-   */
-  @Override
-  public byte[] createPassword(JobTokenIdentifier identifier) {
-    byte[] result = createPassword(identifier.getBytes(), masterKey);
-    return result;
-  }
-
-  /**
-   * Add the job token of a job to cache
-   * @param jobId the job that owns the token
-   * @param token the job token
-   */
-  public void addTokenForJob(String jobId, Token<JobTokenIdentifier> token) {
-    SecretKey tokenSecret = createSecretKey(token.getPassword());
-    synchronized (currentJobTokens) {
-      currentJobTokens.put(jobId, tokenSecret);
-    }
-  }
-
-  /**
-   * Remove the cached job token of a job from cache
-   * @param jobId the job whose token is to be removed
-   */
-  public void removeTokenForJob(String jobId) {
-    synchronized (currentJobTokens) {
-      currentJobTokens.remove(jobId);
-    }
-  }
-  
-  /**
-   * Look up the token password/secret for the given jobId.
-   * @param jobId the jobId to look up
-   * @return token password/secret as SecretKey
-   * @throws InvalidToken
-   */
-  public SecretKey retrieveTokenSecret(String jobId) throws InvalidToken {
-    SecretKey tokenSecret = null;
-    synchronized (currentJobTokens) {
-      tokenSecret = currentJobTokens.get(jobId);
-    }
-    if (tokenSecret == null) {
-      throw new InvalidToken("Can't find job token for job " + jobId + " !!");
-    }
-    return tokenSecret;
-  }
-  
-  /**
-   * Look up the token password/secret for the given job token identifier.
-   * @param identifier the job token identifier to look up
-   * @return token password/secret as byte[]
-   * @throws InvalidToken
-   */
-  @Override
-  public byte[] retrievePassword(JobTokenIdentifier identifier)
-      throws InvalidToken {
-    return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
-  }
-
-  /**
-   * Create an empty job token identifier
-   * @return a newly created empty job token identifier
-   */
-  @Override
-  public JobTokenIdentifier createIdentifier() {
-    return new JobTokenIdentifier();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
index 299305a..a8e4ceb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -30,6 +30,7 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.tez.common.security.JobTokenSecretManager;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9bca4c96/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index 13db6f5..0c7cb2e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
 
 public class ShuffleUtils {