You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/02/03 04:06:56 UTC
svn commit: r905875 - in /hadoop/mapreduce/trunk: ./ .eclipse.templates/
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/protocol/
src/java/org/apache/hadoop/mapreduce/security/
src/java/org/apache/hadoop/mapreduce/security/toke...
Author: ddas
Date: Wed Feb 3 03:06:52 2010
New Revision: 905875
URL: http://svn.apache.org/viewvc?rev=905875&view=rev
Log:
MAPREDUCE-1335. Adds SASL Kerberos/Digest authentication in MapReduce. Contributed by Kan Zhang.
Added:
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Modified:
hadoop/mapreduce/trunk/.eclipse.templates/.classpath
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
Modified: hadoop/mapreduce/trunk/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/.eclipse.templates/.classpath?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/.eclipse.templates/.classpath (original)
+++ hadoop/mapreduce/trunk/.eclipse.templates/.classpath Wed Feb 3 03:06:52 2010
@@ -57,6 +57,7 @@
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/slf4j-api-1.5.8.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/slf4j-simple-1.5.8.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/xmlenc-0.52.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/Hadoop/test/mockito-all-1.8.0.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/index/common/lucene-core-2.3.1.jar"/>
<classpathentry kind="lib" path="build/test/classes"/>
<classpathentry kind="lib" path="build/classes"/>
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Feb 3 03:06:52 2010
@@ -46,6 +46,9 @@
for loading the tokens in the user's ugi. This is required
for the copying of files from the hdfs. (ddas)
+ MAPREDUCE-1335. Adds SASL Kerberos/Digest authentication in MapReduce.
+ (Kan Zhang via ddas)
+
IMPROVEMENTS
MAPREDUCE-1198. Alternatively schedule different types of tasks in
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java Wed Feb 3 03:06:52 2010
@@ -21,11 +21,14 @@
import java.io.IOException;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol for admin operations. This is a framework-public interface and is
* NOT_TO_BE_USED_BY_USERS_DIRECTLY.
*/
+@KerberosInfo(JobContext.JOB_JOBTRACKER_ID)
public interface AdminOperationsProtocol extends VersionedProtocol {
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Wed Feb 3 03:06:52 2010
@@ -28,18 +28,19 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.jvm.JvmMetrics;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.LogManager;
@@ -79,6 +80,11 @@
TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() +
"; from file=" + jobTokenFile);
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+ jt.setService(new Text(address.getAddress().getHostAddress() + ":"
+ + address.getPort()));
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ current.addToken(jt);
TaskUmbilicalProtocol umbilical =
(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
@@ -157,8 +163,6 @@
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
JobConf job = new JobConf(task.getJobFile());
- // set job shuffle token
- Token<? extends TokenIdentifier> jt = TokenCache.getJobToken(ts);
// set the jobTokenFile into task
task.setJobTokenSecret(JobTokenSecretManager.
createSecretKey(jt.getPassword()));
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed Feb 3 03:06:52 2010
@@ -21,11 +21,14 @@
import java.io.IOException;
import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol that a TaskTracker and the central JobTracker use to communicate.
* The JobTracker is the Server, which implements this protocol.
*/
+@KerberosInfo(JobContext.JOB_JOBTRACKER_ID)
interface InterTrackerProtocol extends VersionedProtocol {
/**
* version 3 introduced to replace
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb 3 03:06:52 2010
@@ -653,8 +653,8 @@
maxMapSlots : maxReduceSlots;
//set the num handlers to max*2 since canCommit may wait for the duration
//of a heartbeat RPC
- this.taskReportServer =
- RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
+ this.taskReportServer = RPC.getServer(this.getClass(), this, bindAddress,
+ tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
this.taskReportServer.start();
// get the assigned address
@@ -987,7 +987,6 @@
* job as a starting point.
* @throws IOException
*/
- @SuppressWarnings("unchecked")
JobConf localizeJobFiles(Task t, RunningJob rjob)
throws IOException, InterruptedException {
JobID jobId = t.getJobID();
@@ -1001,8 +1000,7 @@
rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
- Token<JobTokenIdentifier> jt =
- (Token<JobTokenIdentifier>)TokenCache.getJobToken(ts);
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
if (jt != null) { //could be null in the case of some unit tests
getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Feb 3 03:06:52 2010
@@ -22,11 +22,14 @@
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
+import org.apache.hadoop.security.token.TokenInfo;
/** Protocol that task child process uses to contact its parent process. The
* parent is a daemon which which polls the central master for a new map or
* reduce task and runs it as a child process. All communication between child
* and parent is via this protocol. */
+@TokenInfo(JobTokenSelector.class)
public interface TaskUmbilicalProtocol extends VersionedProtocol {
/**
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Wed Feb 3 03:06:52 2010
@@ -24,6 +24,7 @@
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
@@ -35,12 +36,14 @@
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.mapreduce.server.jobtracker.State;
+import org.apache.hadoop.security.KerberosInfo;
/**
* Protocol that a JobClient and the central JobTracker use to communicate. The
* JobClient can use these methods to submit a Job for execution, and learn about
* the current system status.
*/
+@KerberosInfo(JobContext.JOB_JOBTRACKER_ID)
public interface ClientProtocol extends VersionedProtocol {
/*
*Changing the versionID to 2L since the getTaskCompletionEvents method has
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Wed Feb 3 03:06:52 2010
@@ -36,6 +36,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.TokenStorage;
import org.apache.hadoop.security.token.Token;
@@ -230,9 +231,10 @@
*
* @return job token
*/
+ @SuppressWarnings("unchecked")
@InterfaceAudience.Private
- public static Token<? extends TokenIdentifier> getJobToken(TokenStorage ts) {
- return ts.getToken(JOB_TOKEN);
+ public static Token<JobTokenIdentifier> getJobToken(TokenStorage ts) {
+ return (Token<JobTokenIdentifier>) ts.getToken(JOB_TOKEN);
}
static String buildDTServiceName(URI uri) {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java Wed Feb 3 03:06:52 2010
@@ -35,6 +35,13 @@
final static Text KIND_NAME = new Text("mapreduce.job");
/**
+ * Default constructor
+ */
+ public JobTokenIdentifier() {
+ this.jobid = new Text();
+ }
+
+ /**
* Create a job token identifier from a jobid
* @param jobid the jobid to use
*/
@@ -48,6 +55,12 @@
return KIND_NAME;
}
+ /** {@inheritDoc} */
+ @Override
+ public Text getUsername() {
+ return getJobId();
+ }
+
/**
* Get the jobid
* @return the jobid
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java Wed Feb 3 03:06:52 2010
@@ -124,4 +124,12 @@
return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
}
+ /**
+ * Create an empty job token identifier
+ * @return a newly created empty job token identifier
+ */
+ @Override
+ public JobTokenIdentifier createIdentifier() {
+ return new JobTokenIdentifier();
+ }
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java?rev=905875&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java Wed Feb 3 03:06:52 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
+@InterfaceAudience.Private
+public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Token<JobTokenIdentifier> selectToken(Text service,
+ Collection<Token<? extends TokenIdentifier>> tokens) {
+ if (service == null) {
+ return null;
+ }
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
+ && service.equals(token.getService())) {
+ return (Token<JobTokenIdentifier>) token;
+ }
+ }
+ return null;
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=905875&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java Wed Feb 3 03:06:52 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.*;
+import org.apache.commons.logging.impl.Log4JLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.log4j.Level;
+import org.junit.Test;
+
+/** Unit tests for using Job Token over RPC. */
+public class TestUmbilicalProtocolWithJobToken {
+ private static final String ADDRESS = "0.0.0.0";
+
+ public static final Log LOG = LogFactory
+ .getLog(TestUmbilicalProtocolWithJobToken.class);
+
+ private static Configuration conf;
+ static {
+ conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ static {
+ ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) SaslRpcClient.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ @Test
+ public void testJobTokenRpc() throws Exception {
+ TaskUmbilicalProtocol mockTT = mock(TaskUmbilicalProtocol.class);
+ when(mockTT.getProtocolVersion(anyString(), anyLong())).thenReturn(
+ TaskUmbilicalProtocol.versionID);
+
+ JobTokenSecretManager sm = new JobTokenSecretManager();
+ final Server server = RPC.getServer(TaskUmbilicalProtocol.class, mockTT,
+ ADDRESS, 0, 5, true, conf, sm);
+
+ server.start();
+
+ final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ String jobId = current.getUserName();
+ JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
+ Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId, sm);
+ sm.addTokenForJob(jobId, token);
+ Text host = new Text(addr.getAddress().getHostAddress() + ":"
+ + addr.getPort());
+ token.setService(host);
+ LOG.info("Service IP address for token is " + host);
+ current.addToken(token);
+ current.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ TaskUmbilicalProtocol proxy = null;
+ try {
+ proxy = (TaskUmbilicalProtocol) RPC.getProxy(
+ TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,
+ addr, conf);
+ proxy.ping(null);
+ } finally {
+ server.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+}