You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/03/18 18:01:36 UTC

hadoop git commit: YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 9b623fbaf -> 92b7e0d41


YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92b7e0d4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92b7e0d4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92b7e0d4

Branch: refs/heads/trunk
Commit: 92b7e0d41302b6b110927f99de5c2b4a4a93c5fd
Parents: 9b623fb
Author: Eric Payne <ep...@apache.org>
Authored: Fri Mar 18 16:11:06 2016 +0000
Committer: Eric Payne <ep...@apache.org>
Committed: Fri Mar 18 16:12:47 2016 +0000

----------------------------------------------------------------------
 .../hadoop/yarn/client/ProtocolHATestBase.java  |  3 +-
 .../hadoop/yarn/client/TestRMFailover.java      |  2 -
 .../yarn/client/api/impl/TestYarnClient.java    | 18 +++++
 .../nodemanager/NodeStatusUpdaterImpl.java      | 50 +++++++------
 .../hadoop/yarn/server/MiniYARNCluster.java     | 75 +++++++-------------
 .../yarn/server/TestMiniYARNClusterForHA.java   |  4 --
 6 files changed, 75 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index cf7fcc5..f336b0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -217,7 +217,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
   protected void verifyConnections() throws InterruptedException,
       YarnException {
     assertTrue("NMs failed to connect to the RM",
-        cluster.waitForNodeManagersToConnect(20000));
+        cluster.waitForNodeManagersToConnect(5000));
     verifyClientConnection();
   }
 
@@ -279,7 +279,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     cluster.resetStartFailoverFlag(false);
     cluster.init(conf);
     cluster.start();
-    getAdminService(0).transitionToActive(req);
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
     verifyConnections();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
index cbc220a..f323351 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
@@ -142,7 +142,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     cluster.init(conf);
     cluster.start();
-    getAdminService(0).transitionToActive(req);
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
     verifyConnections();
 
@@ -231,7 +230,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
     conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     cluster.init(conf);
     cluster.start();
-    getAdminService(0).transitionToActive(req);
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
     verifyConnections();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 2c34b99..2d11d8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -1195,6 +1195,24 @@ public class TestYarnClient {
       client.init(yarnConf);
       client.start();
 
+      int attempts;
+      for(attempts = 10; attempts > 0; attempts--) {
+        if (cluster.getResourceManager().getRMContext().getReservationSystem()
+            .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
+            .getMemory() > 0) {
+          break;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      if (attempts <= 0) {
+        Assert.fail("Exhausted attempts in checking if node capacity was "
+            + "added to the plan");
+      }
+
       // create a reservation
       Clock clock = new UTCClock();
       long arrival = clock.getTime();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 5806731..ad983fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
 
   private final Object heartbeatMonitor = new Object();
+  private final Object shutdownMonitor = new Object();
 
   private final Context context;
   private final Dispatcher dispatcher;
@@ -240,15 +241,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   @Override
   protected void serviceStop() throws Exception {
     // the isStopped check is for avoiding multiple unregistrations.
-    if (this.registeredWithRM && !this.isStopped
-        && !isNMUnderSupervisionWithRecoveryEnabled()
-        && !context.getDecommissioned() && !failedToConnect) {
-      unRegisterNM();
+    synchronized(shutdownMonitor) {
+      if (this.registeredWithRM && !this.isStopped
+          && !isNMUnderSupervisionWithRecoveryEnabled()
+          && !context.getDecommissioned() && !failedToConnect) {
+        unRegisterNM();
+      }
+      // Interrupt the updater.
+      this.isStopped = true;
+      stopRMProxy();
+      super.serviceStop();
     }
-    // Interrupt the updater.
-    this.isStopped = true;
-    stopRMProxy();
-    super.serviceStop();
   }
 
   private boolean isNMUnderSupervisionWithRecoveryEnabled() {
@@ -275,19 +278,24 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
     // Interrupt the updater.
-    this.isStopped = true;
-
-    try {
-      statusUpdater.join();
-      registerWithRM();
-      statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
-      this.isStopped = false;
-      statusUpdater.start();
-      LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
-    } catch (Exception e) {
-      String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
-      LOG.error(errorMessage, e);
-      throw new YarnRuntimeException(e);
+    synchronized(shutdownMonitor) {
+      if(this.isStopped) {
+        LOG.info("Currently being shutdown. Aborting reboot");
+        return;
+      }
+      this.isStopped = true;
+      try {
+        statusUpdater.join();
+        registerWithRM();
+        statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
+        statusUpdater.start();
+        this.isStopped = false;
+        LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
+      } catch (Exception e) {
+        String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
+        LOG.error(errorMessage, e);
+        throw new YarnRuntimeException(e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 024adc6..74b7732 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -275,6 +278,12 @@ public class MiniYARNCluster extends CompositeService {
         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
   }
 
+  @Override
+  protected synchronized void serviceStart() throws Exception {
+    super.serviceStart();
+    this.waitForNodeManagersToConnect(5000);
+  }
+
   private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
     String hostname = MiniYARNCluster.getHostname();
     conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
@@ -314,19 +323,7 @@ public class MiniYARNCluster extends CompositeService {
 
   private synchronized void startResourceManager(final int index) {
     try {
-      Thread rmThread = new Thread() {
-        public void run() {
-          resourceManagers[index].start();
-        }
-      };
-      rmThread.setName("RM-" + index);
-      rmThread.start();
-      int waitCount = 0;
-      while (resourceManagers[index].getServiceState() == STATE.INITED
-          && waitCount++ < 60) {
-        LOG.info("Waiting for RM to start...");
-        Thread.sleep(1500);
-      }
+      resourceManagers[index].start();
       if (resourceManagers[index].getServiceState() != STATE.STARTED) {
         // RM could have failed.
         throw new IOException(
@@ -456,6 +453,11 @@ public class MiniYARNCluster extends CompositeService {
     @Override
     protected synchronized void serviceStart() throws Exception {
       startResourceManager(index);
+      if(index == 0) {
+        resourceManagers[index].getRMContext().getRMAdminService()
+          .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED));
+      }
       Configuration conf = resourceManagers[index].getConfig();
       LOG.info("Starting resourcemanager " + index);
       LOG.info("MiniYARN ResourceManager address: " +
@@ -565,26 +567,12 @@ public class MiniYARNCluster extends CompositeService {
     }
 
     protected synchronized void serviceStart() throws Exception {
-      try {
-        new Thread() {
-          public void run() {
-            nodeManagers[index].start();
-          }
-        }.start();
-        int waitCount = 0;
-        while (nodeManagers[index].getServiceState() == STATE.INITED
-            && waitCount++ < 60) {
-          LOG.info("Waiting for NM " + index + " to start...");
-          Thread.sleep(1000);
-        }
-        if (nodeManagers[index].getServiceState() != STATE.STARTED) {
-          // RM could have failed.
-          throw new IOException("NodeManager " + index + " failed to start");
-        }
-        super.serviceStart();
-      } catch (Throwable t) {
-        throw new YarnRuntimeException(t);
+      nodeManagers[index].start();
+      if (nodeManagers[index].getServiceState() != STATE.STARTED) {
+        // NM could have failed.
+        throw new IOException("NodeManager " + index + " failed to start");
       }
+      super.serviceStart();
     }
 
     @Override
@@ -715,7 +703,7 @@ public class MiniYARNCluster extends CompositeService {
   /**
    * Wait for all the NodeManagers to connect to the ResourceManager.
    *
-   * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+   * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds.
    * @return true if all NodeManagers connect to the (Active)
    * ResourceManager, false otherwise.
    * @throws YarnException
@@ -724,17 +712,19 @@ public class MiniYARNCluster extends CompositeService {
   public boolean waitForNodeManagersToConnect(long timeout)
       throws YarnException, InterruptedException {
     GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-    for (int i = 0; i < timeout / 100; i++) {
+    for (int i = 0; i < timeout / 10; i++) {
       ResourceManager rm = getResourceManager();
       if (rm == null) {
         throw new YarnException("Can not find the active RM.");
       }
       else if (nodeManagers.length == rm.getClientRMService()
-            .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
+          .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
+        LOG.info("All Node Managers connected in MiniYARNCluster");
         return true;
       }
-      Thread.sleep(100);
+      Thread.sleep(10);
     }
+    LOG.info("Node Managers did not connect within 5000ms");
     return false;
   }
 
@@ -769,18 +759,7 @@ public class MiniYARNCluster extends CompositeService {
 
     @Override
     protected synchronized void serviceStart() throws Exception {
-
-      new Thread() {
-        public void run() {
-          appHistoryServer.start();
-        };
-      }.start();
-      int waitCount = 0;
-      while (appHistoryServer.getServiceState() == STATE.INITED
-          && waitCount++ < 60) {
-        LOG.info("Waiting for Timeline Server to start...");
-        Thread.sleep(1500);
-      }
+      appHistoryServer.start();
       if (appHistoryServer.getServiceState() != STATE.STARTED) {
         // AHS could have failed.
         IOException ioe = new IOException(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
index e84d62e..384d1cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
@@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA {
     cluster.init(conf);
     cluster.start();
 
-    cluster.getResourceManager(0).getRMContext().getRMAdminService()
-        .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
-            HAServiceProtocol.RequestSource.REQUEST_BY_USER));
-
     assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
   }