You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2014/03/14 19:15:50 UTC
svn commit: r1577647 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/a...
Author: jlowe
Date: Fri Mar 14 18:15:50 2014
New Revision: 1577647
URL: http://svn.apache.org/r1577647
Log:
MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed before registering with RM. Contributed by Rohith
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1577647&r1=1577646&r2=1577647&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Mar 14 18:15:50 2014
@@ -237,6 +237,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5688. TestStagingCleanup fails intermittently with JDK7 (Mit
Desai via jeagles)
+ MAPREDUCE-5769. Unregistration to RM should not be called if AM is crashed
+ before registering with RM (Rohith via jlowe)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1577647&r1=1577646&r2=1577647&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Fri Mar 14 18:15:50 2014
@@ -87,6 +87,7 @@ public abstract class RMCommunicator ext
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
private volatile boolean shouldUnregister = true;
+ private boolean isApplicationMasterRegistered = false;
public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator");
@@ -153,6 +154,7 @@ public abstract class RMCommunicator ext
}
RegisterApplicationMasterResponse response =
scheduler.registerApplicationMaster(request);
+ isApplicationMasterRegistered = true;
maxContainerCapability = response.getMaximumResourceCapability();
this.context.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
@@ -249,7 +251,7 @@ public abstract class RMCommunicator ext
LOG.warn("InterruptedException while stopping", ie);
}
}
- if(shouldUnregister) {
+ if (isApplicationMasterRegistered && shouldUnregister) {
unregister();
}
super.serviceStop();
@@ -328,4 +330,9 @@ public abstract class RMCommunicator ext
LOG.info("RMCommunicator notified that iSignalled is: "
+ isSignalled);
}
+
+ @VisibleForTesting
+ protected boolean isApplicationMasterRegistered() {
+ return isApplicationMasterRegistered;
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1577647&r1=1577646&r2=1577647&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Mar 14 18:15:50 2014
@@ -1386,7 +1386,7 @@ public class TestRMContainerAllocator {
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
= new ArrayList<JobUpdatedNodesEvent>();
private MyResourceManager rm;
-
+ private boolean isUnregistered = false;
private static AppContext createAppContext(
ApplicationAttemptId appAttemptId, Job job) {
AppContext context = mock(AppContext.class);
@@ -1478,6 +1478,7 @@ public class TestRMContainerAllocator {
@Override
protected void unregister() {
+ isUnregistered=true;
}
@Override
@@ -1527,6 +1528,15 @@ public class TestRMContainerAllocator {
protected void startAllocatorThread() {
// override to NOT start thread
}
+
+ @Override
+ protected boolean isApplicationMasterRegistered() {
+ return super.isApplicationMasterRegistered();
+ }
+
+ public boolean isUnregistered() {
+ return isUnregistered;
+ }
}
@@ -1776,6 +1786,51 @@ public class TestRMContainerAllocator {
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
}
+ @Test
+ public void testUnregistrationOnlyIfRegistered() throws Exception {
+ Configuration conf = new Configuration();
+ final MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher rmDispatcher =
+ (DrainDispatcher) rm.getRMContext().getDispatcher();
+
+ // Submit the application
+ RMApp rmApp = rm.submitApp(1024);
+ rmDispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
+ amNodeManager.nodeHeartbeat(true);
+ rmDispatcher.await();
+
+ final ApplicationAttemptId appAttemptId =
+ rmApp.getCurrentAppAttempt().getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ rmDispatcher.await();
+
+ MRApp mrApp =
+ new MRApp(appAttemptId, ContainerId.newInstance(appAttemptId, 0), 10,
+ 0, false, this.getClass().getName(), true, 1) {
+ @Override
+ protected Dispatcher createDispatcher() {
+ return new DrainDispatcher();
+ }
+
+ protected ContainerAllocator createContainerAllocator(
+ ClientService clientService, AppContext context) {
+ return new MyContainerAllocator(rm, appAttemptId, context);
+ };
+ };
+
+ mrApp.submit(conf);
+ DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
+ MyContainerAllocator allocator =
+ (MyContainerAllocator) mrApp.getContainerAllocator();
+ amDispatcher.await();
+ Assert.assertTrue(allocator.isApplicationMasterRegistered());
+ mrApp.stop();
+ Assert.assertTrue(allocator.isUnregistered());
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();