You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/03 00:15:10 UTC
hadoop git commit: MAPREDUCE-6324. Fixed MapReduce uber jobs to not
fail the udpate of AM-RM tokens when they roll-over. Contributed by Jason
Lowe.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6.1 6ade6b505 -> 262275d56
MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM tokens when they roll-over. Contributed by Jason Lowe.
(cherry picked from commit 9fc32c5c4d1d5f50c605bdb0e3b13f44c86660c8)
(cherry picked from commit 32dc13d907a416049bdb7deff429725bd6dbcb49)
(cherry picked from commit aad56fe3a2f29e73a013b9afa9b44b151a34e0f3)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/262275d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/262275d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/262275d5
Branch: refs/heads/branch-2.6.1
Commit: 262275d561a3d138e67942a35b7c15f1ef01bfc8
Parents: 6ade6b5
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Mon Apr 27 14:58:16 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Sep 2 15:12:21 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/local/LocalContainerAllocator.java | 28 +++-
.../app/local/TestLocalContainerAllocator.java | 152 +++++++++++++++++--
3 files changed, 172 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/262275d5/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 0c1ae62..f4716d8 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -32,6 +32,9 @@ Release 2.6.1 - UNRELEASED
MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options
which is a regression from MR1 (zxu via rkanter)
+ MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM
+ tokens when they roll-over. (Jason Lowe via vinodkv)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/262275d5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
index 74dfb39..aed1023 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.mapreduce.v2.app.local;
+import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -35,17 +37,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
/**
* Allocates containers locally. Doesn't allocate a real container;
@@ -99,8 +106,9 @@ public class LocalContainerAllocator extends RMCommunicator
AllocateRequest.newInstance(this.lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>(), null);
+ AllocateResponse allocateResponse = null;
try {
- scheduler.allocate(allocateRequest);
+ allocateResponse = scheduler.allocate(allocateRequest);
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
} catch (ApplicationAttemptNotFoundException e) {
@@ -131,6 +139,24 @@ public class LocalContainerAllocator extends RMCommunicator
// continue to attempt to contact the RM.
throw e;
}
+
+ if (allocateResponse != null) {
+ this.lastResponseID = allocateResponse.getResponseId();
+ Token token = allocateResponse.getAMRMToken();
+ if (token != null) {
+ updateAMRMToken(token);
+ }
+ }
+ }
+
+ private void updateAMRMToken(Token token) throws IOException {
+ org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+ new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
+ .getIdentifier().array(), token.getPassword().array(), new Text(
+ token.getKind()), new Text(token.getService()));
+ UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+ currentUGI.addToken(amrmToken);
+ amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/262275d5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 90dbe48..f901ed8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -22,23 +22,43 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
@@ -48,8 +68,13 @@ public class TestLocalContainerAllocator {
public void testRMConnectionRetry() throws Exception {
// verify the connection exception is thrown
// if we haven't exhausted the retry interval
+ ApplicationMasterProtocol mockScheduler =
+ mock(ApplicationMasterProtocol.class);
+ when(mockScheduler.allocate(isA(AllocateRequest.class)))
+ .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
Configuration conf = new Configuration();
- LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
+ LocalContainerAllocator lca =
+ new StubbedLocalContainerAllocator(mockScheduler);
lca.init(conf);
lca.start();
try {
@@ -63,7 +88,7 @@ public class TestLocalContainerAllocator {
// verify YarnRuntimeException is thrown when the retry interval has expired
conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
- lca = new StubbedLocalContainerAllocator();
+ lca = new StubbedLocalContainerAllocator(mockScheduler);
lca.init(conf);
lca.start();
try {
@@ -76,12 +101,84 @@ public class TestLocalContainerAllocator {
}
}
+ @Test
+ public void testAllocResponseId() throws Exception {
+ ApplicationMasterProtocol scheduler = new MockScheduler();
+ Configuration conf = new Configuration();
+ LocalContainerAllocator lca =
+ new StubbedLocalContainerAllocator(scheduler);
+ lca.init(conf);
+ lca.start();
+
+ // do two heartbeats to verify the response ID is being tracked
+ lca.heartbeat();
+ lca.heartbeat();
+ lca.close();
+ }
+
+ @Test
+ public void testAMRMTokenUpdate() throws Exception {
+ Configuration conf = new Configuration();
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(1, 1), 1);
+ AMRMTokenIdentifier oldTokenId = new AMRMTokenIdentifier(attemptId, 1);
+ AMRMTokenIdentifier newTokenId = new AMRMTokenIdentifier(attemptId, 2);
+ Token<AMRMTokenIdentifier> oldToken = new Token<AMRMTokenIdentifier>(
+ oldTokenId.getBytes(), "oldpassword".getBytes(), oldTokenId.getKind(),
+ new Text());
+ Token<AMRMTokenIdentifier> newToken = new Token<AMRMTokenIdentifier>(
+ newTokenId.getBytes(), "newpassword".getBytes(), newTokenId.getKind(),
+ new Text());
+
+ MockScheduler scheduler = new MockScheduler();
+ scheduler.amToken = newToken;
+
+ final LocalContainerAllocator lca =
+ new StubbedLocalContainerAllocator(scheduler);
+ lca.init(conf);
+ lca.start();
+
+ UserGroupInformation testUgi = UserGroupInformation.createUserForTesting(
+ "someuser", new String[0]);
+ testUgi.addToken(oldToken);
+ testUgi.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ lca.heartbeat();
+ return null;
+ }
+ });
+ lca.close();
+
+ // verify there is only one AMRM token in the UGI and it matches the
+ // updated token from the RM
+ int tokenCount = 0;
+ Token<? extends TokenIdentifier> ugiToken = null;
+ for (Token<? extends TokenIdentifier> token : testUgi.getTokens()) {
+ if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+ ugiToken = token;
+ ++tokenCount;
+ }
+ }
+
+ Assert.assertEquals("too many AMRM tokens", 1, tokenCount);
+ Assert.assertArrayEquals("token identifier not updated",
+ newToken.getIdentifier(), ugiToken.getIdentifier());
+ Assert.assertArrayEquals("token password not updated",
+ newToken.getPassword(), ugiToken.getPassword());
+ Assert.assertEquals("AMRM token service not updated",
+ new Text(ClientRMProxy.getAMRMTokenService(conf)),
+ ugiToken.getService());
+ }
+
private static class StubbedLocalContainerAllocator
extends LocalContainerAllocator {
+ private ApplicationMasterProtocol scheduler;
- public StubbedLocalContainerAllocator() {
+ public StubbedLocalContainerAllocator(ApplicationMasterProtocol scheduler) {
super(mock(ClientService.class), createAppContext(),
"nmhost", 1, 2, null);
+ this.scheduler = scheduler;
}
@Override
@@ -99,13 +196,6 @@ public class TestLocalContainerAllocator {
@Override
protected ApplicationMasterProtocol createSchedulerProxy() {
- ApplicationMasterProtocol scheduler = mock(ApplicationMasterProtocol.class);
- try {
- when(scheduler.allocate(isA(AllocateRequest.class)))
- .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
- } catch (YarnException e) {
- } catch (IOException e) {
- }
return scheduler;
}
@@ -126,4 +216,46 @@ public class TestLocalContainerAllocator {
return ctx;
}
}
+
+ private static class MockScheduler implements ApplicationMasterProtocol {
+ int responseId = 0;
+ Token<AMRMTokenIdentifier> amToken = null;
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ Assert.assertEquals("response ID mismatch",
+ responseId, request.getResponseId());
+ ++responseId;
+ org.apache.hadoop.yarn.api.records.Token yarnToken = null;
+ if (amToken != null) {
+ yarnToken = org.apache.hadoop.yarn.api.records.Token.newInstance(
+ amToken.getIdentifier(), amToken.getKind().toString(),
+ amToken.getPassword(), amToken.getService().toString());
+ }
+ return AllocateResponse.newInstance(responseId,
+ Collections.<ContainerStatus>emptyList(),
+ Collections.<Container>emptyList(),
+ Collections.<NodeReport>emptyList(),
+ Resources.none(), null, 1, null,
+ Collections.<NMToken>emptyList(),
+ yarnToken,
+ Collections.<ContainerResourceIncrease>emptyList(),
+ Collections.<ContainerResourceDecrease>emptyList());
+ }
+ }
}