You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2013/04/03 20:35:32 UTC
svn commit: r1464147 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java...
Author: cos
Date: Wed Apr 3 18:35:31 2013
New Revision: 1464147
URL: http://svn.apache.org/r1464147
Log:
MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is submitting a job (daryn)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
Propchange: hadoop/common/branches/branch-2/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project:r1463804
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1464147&r1=1464146&r2=1464147&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Apr 3 18:35:31 2013
@@ -125,6 +125,9 @@ Release 2.0.4-alpha - UNRELEASED
BUG FIXES
+ MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is
+ submitting a job (Daryn Sharp via cos)
+
MAPREDUCE-5117. Changed MRClientProtocolPBClientImpl to be closeable and thus
fix failures in renewal of HistoryServer's delegations tokens. (Siddharth
Seth via vinodkv)
Propchange: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt:r1463804
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1464147&r1=1464146&r2=1464147&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Wed Apr 3 18:35:31 2013
@@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner
public class JobClient extends CLI {
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
- /* notes that get delegation token was called. Again this is hack for oozie
- * to make sure we add history server delegation tokens to the credentials
- * for the job. Since the api only allows one delegation token to be returned,
- * we have to add this hack.
- */
- private boolean getDelegationTokenCalled = false;
- /* do we need a HS delegation token for this client */
- static final String HS_DELEGATION_TOKEN_REQUIRED
- = "mapreduce.history.server.delegationtoken.required";
static{
ConfigUtil.loadResources();
@@ -569,10 +560,6 @@ public class JobClient extends CLI {
try {
conf.setBooleanIfUnset("mapred.mapper.new-api", false);
conf.setBooleanIfUnset("mapred.reducer.new-api", false);
- if (getDelegationTokenCalled) {
- conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled);
- getDelegationTokenCalled = false;
- }
Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () {
@Override
public Job run() throws IOException, ClassNotFoundException,
@@ -1171,7 +1158,6 @@ public class JobClient extends CLI {
*/
public Token<DelegationTokenIdentifier>
getDelegationToken(final Text renewer) throws IOException, InterruptedException {
- getDelegationTokenCalled = true;
return clientUgi.doAs(new
PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
public Token<DelegationTokenIdentifier> run() throws IOException,
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1464147&r1=1464146&r2=1464147&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Wed Apr 3 18:35:31 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -87,6 +88,10 @@ public class ResourceMgrDelegate extends
return oldMetrics;
}
+ InetSocketAddress getConnectAddress() {
+ return rmAddress;
+ }
+
@SuppressWarnings("rawtypes")
public Token getDelegationToken(Text renewer) throws IOException,
InterruptedException {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1464147&r1=1464146&r2=1464147&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Wed Apr 3 18:35:31 2013
@@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
@@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMTokenSelector;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
@@ -90,7 +92,7 @@ import com.google.common.annotations.Vis
/**
* This class enables the current JobClient (0.22 hadoop) to run on YARN.
*/
-@SuppressWarnings({ "rawtypes", "unchecked" })
+@SuppressWarnings("unchecked")
public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
@@ -101,14 +103,6 @@ public class YARNRunner implements Clien
private Configuration conf;
private final FileContext defaultFileContext;
- /* usually is false unless the jobclient get delegation token is
- * called. This is a hack wherein we do return a token from RM
- * on getDelegationtoken but due to the restricted api on jobclient
- * we just add a job history DT token when submitting a job.
- */
- private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED =
- false;
-
/**
* Yarn runner incapsulates the client interface of
* yarn
@@ -186,6 +180,28 @@ public class YARNRunner implements Clien
}
@VisibleForTesting
+ void addHistoyToken(Credentials ts) throws IOException, InterruptedException {
+ /* check if we have a hsproxy, if not, no need */
+ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
+ if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
+ /*
+ * note that get delegation token was called. Again this is hack for oozie
+ * to make sure we add history server delegation tokens to the credentials
+ */
+ RMTokenSelector tokenSelector = new RMTokenSelector();
+ Text service = SecurityUtil.buildTokenService(resMgrDelegate
+ .getConnectAddress());
+ if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) {
+ Text hsService = SecurityUtil.buildTokenService(hsProxy
+ .getConnectAddress());
+ if (ts.getToken(hsService) == null) {
+ ts.addToken(hsService, getDelegationTokenFromHS(hsProxy));
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
throws IOException, InterruptedException {
GetDelegationTokenRequest request = recordFactory
@@ -263,18 +279,8 @@ public class YARNRunner implements Clien
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
- /* check if we have a hsproxy, if not, no need */
- MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
- if (hsProxy != null) {
- // JobClient will set this flag if getDelegationToken is called, if so, get
- // the delegation tokens for the HistoryServer also.
- if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED,
- DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
- Token hsDT = getDelegationTokenFromHS(hsProxy);
- ts.addToken(hsDT.getService(), hsDT);
- }
- }
-
+ addHistoyToken(ts);
+
// Upload only in security mode: TODO
Path applicationTokensFile =
new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1464147&r1=1464146&r2=1464147&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Wed Apr 3 18:35:31 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.mapred;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -30,6 +32,7 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.List;
@@ -39,28 +42,24 @@ import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.ClientCache;
-import org.apache.hadoop.mapred.ClientServiceDelegate;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Master;
-import org.apache.hadoop.mapred.ResourceMgrDelegate;
-import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
@@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
@@ -146,7 +151,7 @@ public class TestYARNRunner extends Test
}
- @Test
+ @Test(timeout=20000)
public void testJobKill() throws Exception {
clientDelegate = mock(ClientServiceDelegate.class);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
@@ -171,7 +176,7 @@ public class TestYARNRunner extends Test
verify(clientDelegate).killJob(jobId);
}
- @Test
+ @Test(timeout=20000)
public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).
thenReturn(appId);
@@ -193,7 +198,7 @@ public class TestYARNRunner extends Test
}
}
- @Test
+ @Test(timeout=20000)
public void testResourceMgrDelegate() throws Exception {
/* we not want a mock of resource mgr delegate */
final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
@@ -259,8 +264,88 @@ public class TestYARNRunner extends Test
delegate.getQueueAclsForCurrentUser();
verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class));
}
-
- @Test
+
+ @Test(timeout=20000)
+ public void testGetHSDelegationToken() throws Exception {
+ try {
+ Configuration conf = new Configuration();
+
+ // Setup mock service
+ InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444);
+ Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress);
+
+ InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200);
+ Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress);
+
+ // Setup mock rm token
+ RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier(
+ new Text("owner"), new Text("renewer"), new Text("real"));
+ Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>(
+ new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice);
+ token.setKind(RMDelegationTokenIdentifier.KIND_NAME);
+
+ // Setup mock history token
+ DelegationToken historyToken = BuilderUtils.newDelegationToken(
+ new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(),
+ new byte[0], hsTokenSevice.toString());
+ GetDelegationTokenResponse getDtResponse = Records
+ .newRecord(GetDelegationTokenResponse.class);
+ getDtResponse.setDelegationToken(historyToken);
+
+ // mock services
+ MRClientProtocol mockHsProxy = mock(MRClientProtocol.class);
+ doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress();
+ doReturn(getDtResponse).when(mockHsProxy).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+
+ ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
+ doReturn(mockRmAddress).when(rmDelegate).getConnectAddress();
+
+ ClientCache clientCache = mock(ClientCache.class);
+ doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy();
+
+ Credentials creds = new Credentials();
+
+ YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache);
+
+ // No HS token if no RM token
+ yarnRunner.addHistoyToken(creds);
+ verify(mockHsProxy, times(0)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+
+ // No HS token if RM token, but secirity disabled.
+ creds.addToken(new Text("rmdt"), token);
+ yarnRunner.addHistoyToken(creds);
+ verify(mockHsProxy, times(0)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+
+ conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ creds = new Credentials();
+
+ // No HS token if no RM token, security enabled
+ yarnRunner.addHistoyToken(creds);
+ verify(mockHsProxy, times(0)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+
+ // HS token if RM token present, security enabled
+ creds.addToken(new Text("rmdt"), token);
+ yarnRunner.addHistoyToken(creds);
+ verify(mockHsProxy, times(1)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+
+ // No additional call to get HS token if RM and HS token present
+ yarnRunner.addHistoyToken(creds);
+ verify(mockHsProxy, times(1)).getDelegationToken(
+ any(GetDelegationTokenRequest.class));
+ } finally {
+ // Back to defaults.
+ UserGroupInformation.setConfiguration(new Configuration());
+ }
+ }
+
+ @Test(timeout=20000)
public void testHistoryServerToken() throws Exception {
//Set the master principal in the config
conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL");
@@ -303,7 +388,7 @@ public class TestYARNRunner extends Test
});
}
- @Test
+ @Test(timeout=20000)
public void testAMAdminCommandOpts() throws Exception {
JobConf jobConf = new JobConf();
@@ -366,7 +451,7 @@ public class TestYARNRunner extends Test
assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex);
}
}
- @Test
+ @Test(timeout=20000)
public void testWarnCommandOpts() throws Exception {
Logger logger = Logger.getLogger(YARNRunner.class);