You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/03 00:15:10 UTC

hadoop git commit: MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM tokens when they roll-over. Contributed by Jason Lowe.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6.1 6ade6b505 -> 262275d56


MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM tokens when they roll-over. Contributed by Jason Lowe.

(cherry picked from commit 9fc32c5c4d1d5f50c605bdb0e3b13f44c86660c8)
(cherry picked from commit 32dc13d907a416049bdb7deff429725bd6dbcb49)
(cherry picked from commit aad56fe3a2f29e73a013b9afa9b44b151a34e0f3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/262275d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/262275d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/262275d5

Branch: refs/heads/branch-2.6.1
Commit: 262275d561a3d138e67942a35b7c15f1ef01bfc8
Parents: 6ade6b5
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Mon Apr 27 14:58:16 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Sep 2 15:12:21 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../v2/app/local/LocalContainerAllocator.java   |  28 +++-
 .../app/local/TestLocalContainerAllocator.java  | 152 +++++++++++++++++--
 3 files changed, 172 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/262275d5/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 0c1ae62..f4716d8 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -32,6 +32,9 @@ Release 2.6.1 - UNRELEASED
     MAPREDUCE-6238. MR2 can't run local jobs with -libjars command options
     which is a regression from MR1 (zxu via rkanter)
 
+    MAPREDUCE-6324. Fixed MapReduce uber jobs to not fail the udpate of AM-RM
+    tokens when they roll-over. (Jason Lowe via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/262275d5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
index 74dfb39..aed1023 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.mapreduce.v2.app.local;
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -35,17 +37,22 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
 /**
  * Allocates containers locally. Doesn't allocate a real container;
@@ -99,8 +106,9 @@ public class LocalContainerAllocator extends RMCommunicator
         AllocateRequest.newInstance(this.lastResponseID,
           super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>(), null);
+    AllocateResponse allocateResponse = null;
     try {
-      scheduler.allocate(allocateRequest);
+      allocateResponse = scheduler.allocate(allocateRequest);
       // Reset retry count if no exception occurred.
       retrystartTime = System.currentTimeMillis();
     } catch (ApplicationAttemptNotFoundException e) {
@@ -131,6 +139,24 @@ public class LocalContainerAllocator extends RMCommunicator
       // continue to attempt to contact the RM.
       throw e;
     }
+
+    if (allocateResponse != null) {
+      this.lastResponseID = allocateResponse.getResponseId();
+      Token token = allocateResponse.getAMRMToken();
+      if (token != null) {
+        updateAMRMToken(token);
+      }
+    }
+  }
+
+  private void updateAMRMToken(Token token) throws IOException {
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
+        new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
+          .getIdentifier().array(), token.getPassword().array(), new Text(
+          token.getKind()), new Text(token.getService()));
+    UserGroupInformation currentUGI = UserGroupInformation.getCurrentUser();
+    currentUGI.addToken(amrmToken);
+    amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/262275d5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 90dbe48..f901ed8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -22,23 +22,43 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -48,8 +68,13 @@ public class TestLocalContainerAllocator {
   public void testRMConnectionRetry() throws Exception {
     // verify the connection exception is thrown
     // if we haven't exhausted the retry interval
+    ApplicationMasterProtocol mockScheduler =
+        mock(ApplicationMasterProtocol.class);
+    when(mockScheduler.allocate(isA(AllocateRequest.class)))
+      .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
     Configuration conf = new Configuration();
-    LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
+    LocalContainerAllocator lca =
+        new StubbedLocalContainerAllocator(mockScheduler);
     lca.init(conf);
     lca.start();
     try {
@@ -63,7 +88,7 @@ public class TestLocalContainerAllocator {
 
     // verify YarnRuntimeException is thrown when the retry interval has expired
     conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
-    lca = new StubbedLocalContainerAllocator();
+    lca = new StubbedLocalContainerAllocator(mockScheduler);
     lca.init(conf);
     lca.start();
     try {
@@ -76,12 +101,84 @@ public class TestLocalContainerAllocator {
     }
   }
 
+  @Test
+  public void testAllocResponseId() throws Exception {
+    ApplicationMasterProtocol scheduler = new MockScheduler();
+    Configuration conf = new Configuration();
+    LocalContainerAllocator lca =
+        new StubbedLocalContainerAllocator(scheduler);
+    lca.init(conf);
+    lca.start();
+
+    // do two heartbeats to verify the response ID is being tracked
+    lca.heartbeat();
+    lca.heartbeat();
+    lca.close();
+  }
+
+  @Test
+  public void testAMRMTokenUpdate() throws Exception {
+    Configuration conf = new Configuration();
+    ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(1, 1), 1);
+    AMRMTokenIdentifier oldTokenId = new AMRMTokenIdentifier(attemptId, 1);
+    AMRMTokenIdentifier newTokenId = new AMRMTokenIdentifier(attemptId, 2);
+    Token<AMRMTokenIdentifier> oldToken = new Token<AMRMTokenIdentifier>(
+        oldTokenId.getBytes(), "oldpassword".getBytes(), oldTokenId.getKind(),
+        new Text());
+    Token<AMRMTokenIdentifier> newToken = new Token<AMRMTokenIdentifier>(
+        newTokenId.getBytes(), "newpassword".getBytes(), newTokenId.getKind(),
+        new Text());
+
+    MockScheduler scheduler = new MockScheduler();
+    scheduler.amToken = newToken;
+
+    final LocalContainerAllocator lca =
+        new StubbedLocalContainerAllocator(scheduler);
+    lca.init(conf);
+    lca.start();
+
+    UserGroupInformation testUgi = UserGroupInformation.createUserForTesting(
+        "someuser", new String[0]);
+    testUgi.addToken(oldToken);
+    testUgi.doAs(new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            lca.heartbeat();
+            return null;
+          }
+    });
+    lca.close();
+
+    // verify there is only one AMRM token in the UGI and it matches the
+    // updated token from the RM
+    int tokenCount = 0;
+    Token<? extends TokenIdentifier> ugiToken = null;
+    for (Token<? extends TokenIdentifier> token : testUgi.getTokens()) {
+      if (AMRMTokenIdentifier.KIND_NAME.equals(token.getKind())) {
+        ugiToken = token;
+        ++tokenCount;
+      }
+    }
+
+    Assert.assertEquals("too many AMRM tokens", 1, tokenCount);
+    Assert.assertArrayEquals("token identifier not updated",
+        newToken.getIdentifier(), ugiToken.getIdentifier());
+    Assert.assertArrayEquals("token password not updated",
+        newToken.getPassword(), ugiToken.getPassword());
+    Assert.assertEquals("AMRM token service not updated",
+        new Text(ClientRMProxy.getAMRMTokenService(conf)),
+        ugiToken.getService());
+  }
+
   private static class StubbedLocalContainerAllocator
     extends LocalContainerAllocator {
+    private ApplicationMasterProtocol scheduler;
 
-    public StubbedLocalContainerAllocator() {
+    public StubbedLocalContainerAllocator(ApplicationMasterProtocol scheduler) {
       super(mock(ClientService.class), createAppContext(),
           "nmhost", 1, 2, null);
+      this.scheduler = scheduler;
     }
 
     @Override
@@ -99,13 +196,6 @@ public class TestLocalContainerAllocator {
 
     @Override
     protected ApplicationMasterProtocol createSchedulerProxy() {
-      ApplicationMasterProtocol scheduler = mock(ApplicationMasterProtocol.class);
-      try {
-        when(scheduler.allocate(isA(AllocateRequest.class)))
-          .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
-      } catch (YarnException e) {
-      } catch (IOException e) {
-      }
       return scheduler;
     }
 
@@ -126,4 +216,46 @@ public class TestLocalContainerAllocator {
       return ctx;
     }
   }
+
+  private static class MockScheduler implements ApplicationMasterProtocol {
+    int responseId = 0;
+    Token<AMRMTokenIdentifier> amToken = null;
+
+    @Override
+    public RegisterApplicationMasterResponse registerApplicationMaster(
+        RegisterApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public FinishApplicationMasterResponse finishApplicationMaster(
+        FinishApplicationMasterRequest request) throws YarnException,
+        IOException {
+      return null;
+    }
+
+    @Override
+    public AllocateResponse allocate(AllocateRequest request)
+        throws YarnException, IOException {
+      Assert.assertEquals("response ID mismatch",
+          responseId, request.getResponseId());
+      ++responseId;
+      org.apache.hadoop.yarn.api.records.Token yarnToken = null;
+      if (amToken != null) {
+        yarnToken = org.apache.hadoop.yarn.api.records.Token.newInstance(
+            amToken.getIdentifier(), amToken.getKind().toString(),
+            amToken.getPassword(), amToken.getService().toString());
+      }
+      return AllocateResponse.newInstance(responseId,
+          Collections.<ContainerStatus>emptyList(),
+          Collections.<Container>emptyList(),
+          Collections.<NodeReport>emptyList(),
+          Resources.none(), null, 1, null,
+          Collections.<NMToken>emptyList(),
+          yarnToken,
+          Collections.<ContainerResourceIncrease>emptyList(),
+          Collections.<ContainerResourceDecrease>emptyList());
+    }
+  }
 }