You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ka...@apache.org on 2014/01/10 21:15:01 UTC

svn commit: r1557248 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/serv...

Author: kasha
Date: Fri Jan 10 20:15:00 2014
New Revision: 1557248

URL: http://svn.apache.org/r1557248
Log:
YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong via kasha)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1557248&r1=1557247&r2=1557248&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Jan 10 20:15:00 2014
@@ -313,6 +313,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1293. Fixed TestContainerLaunch#testInvalidEnvSyntaxDiagnostics failure
     caused by non-English system locale. (Tsuyoshi OZAWA via jianhe)
 
+    YARN-1574. RMDispatcher should be reset on transition to standby. (Xuan Gong
+    via kasha)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java?rev=1557248&r1=1557247&r2=1557248&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java Fri Jan 10 20:15:00 2014
@@ -337,7 +337,34 @@ public class TestCompositeService {
     assertEquals("Incorrect number of services",
         1, testService.getServices().size());
   }
-  
+
+  @Test
+  public void testRemoveService() {
+    CompositeService testService = new CompositeService("TestService") {
+      @Override
+      public void serviceInit(Configuration conf) {
+        Integer notAService = new Integer(0);
+        assertFalse("Added an integer as a service",
+            addIfService(notAService));
+
+        Service service1 = new AbstractService("Service1") {};
+        addIfService(service1);
+
+        Service service2 = new AbstractService("Service2") {};
+        addIfService(service2);
+
+        Service service3 = new AbstractService("Service3") {};
+        addIfService(service3);
+
+        removeService(service1);
+      }
+    };
+
+    testService.init(new Configuration());
+    assertEquals("Incorrect number of services",
+        2, testService.getServices().size());
+  }
+
   public static class CompositeServiceAddingAChild extends CompositeService{
     Service child;
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1557248&r1=1557247&r2=1557248&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Jan 10 20:15:00 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -180,13 +181,11 @@ public class ResourceManager extends Com
     this.conf = conf;
     this.rmContext = new RMContextImpl();
 
-    rmDispatcher = createDispatcher();
+    // register the handlers for all AlwaysOn services using setupDispatcher().
+    rmDispatcher = setupDispatcher();
     addIfService(rmDispatcher);
     rmContext.setDispatcher(rmDispatcher);
 
-    rmDispatcher.register(RMFatalEventType.class,
-        new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
-
     adminService = createAdminService();
     addService(adminService);
     rmContext.setRMAdminService(adminService);
@@ -832,6 +831,7 @@ public class ResourceManager extends Com
         HAServiceProtocol.HAServiceState.ACTIVE) {
       stopActiveServices();
       if (initialize) {
+        resetDispatcher();
         createAndInitActiveServices();
       }
     }
@@ -994,4 +994,24 @@ public class ResourceManager extends Com
       YarnConfiguration.YARN_HTTP_POLICY_KEY,
       YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
   }
+
+  /**
+   * Register the handlers for alwaysOn services
+   */
+  private Dispatcher setupDispatcher() {
+    Dispatcher dispatcher = createDispatcher();
+    dispatcher.register(RMFatalEventType.class,
+        new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
+    return dispatcher;
+  }
+
+  private void resetDispatcher() {
+    Dispatcher dispatcher = setupDispatcher();
+    ((Service)dispatcher).init(this.conf);
+    ((Service)dispatcher).start();
+    removeService((Service)rmDispatcher);
+    rmDispatcher = dispatcher;
+    addIfService(rmDispatcher);
+    rmContext.setDispatcher(rmDispatcher);
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java?rev=1557248&r1=1557247&r2=1557248&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java Fri Jan 10 20:15:00 2014
@@ -26,8 +26,11 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.ha.HealthCheckFailedException;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -222,4 +225,81 @@ public class TestRMHA {
     checkMonitorHealth();
     checkActiveRMFunctionality();
   }
+
+  @Test
+  public void testRMDispatcherForHA() throws IOException {
+    String errorMessageForEventHandler =
+        "Expect to get the same number of handlers";
+    String errorMessageForService = "Expect to get the same number of services";
+    Configuration conf = new YarnConfiguration(configuration);
+    rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return new MyCountingDispatcher();
+      }
+    };
+    rm.init(conf);
+    int expectedEventHandlerCount =
+        ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+            .getEventHandlerCount();
+    int expectedServiceCount = rm.getServices().size();
+    assertTrue(expectedEventHandlerCount != 0);
+
+    StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
+        HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
+        rm.adminService.getServiceStatus().getState());
+    assertFalse("RM is ready to become active before being started",
+        rm.adminService.getServiceStatus().isReadyToBecomeActive());
+    rm.start();
+
+    //call transitions to standby and active a couple of times
+    rm.adminService.transitionToStandby(requestInfo);
+    rm.adminService.transitionToActive(requestInfo);
+    rm.adminService.transitionToStandby(requestInfo);
+    rm.adminService.transitionToActive(requestInfo);
+    rm.adminService.transitionToStandby(requestInfo);
+
+    rm.adminService.transitionToActive(requestInfo);
+    assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
+        ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+        .getEventHandlerCount());
+    assertEquals(errorMessageForService, expectedServiceCount,
+        rm.getServices().size());
+
+    rm.adminService.transitionToStandby(requestInfo);
+    assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
+        ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
+        .getEventHandlerCount());
+    assertEquals(errorMessageForService, expectedServiceCount,
+        rm.getServices().size());
+
+    rm.stop();
+  }
+
+  @SuppressWarnings("rawtypes")
+  class MyCountingDispatcher extends AbstractService implements Dispatcher {
+
+    private int eventHandlerCount;
+
+    public MyCountingDispatcher() {
+      super("MyCountingDispatcher");
+      this.eventHandlerCount = 0;
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return null;
+    }
+
+    @Override
+    public void register(Class<? extends Enum> eventType, EventHandler handler) {
+      this.eventHandlerCount ++;
+    }
+
+    public int getEventHandlerCount() {
+      return this.eventHandlerCount;
+    }
+  }
 }