You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/03/21 21:14:50 UTC
[37/50] [abbrv] hadoop git commit: YARN-4686. MiniYARNCluster.start()
returns before cluster is completely started. Contributed by Eric Badger.
YARN-4686. MiniYARNCluster.start() returns before cluster is completely started. Contributed by Eric Badger.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92b7e0d4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92b7e0d4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92b7e0d4
Branch: refs/heads/YARN-3368
Commit: 92b7e0d41302b6b110927f99de5c2b4a4a93c5fd
Parents: 9b623fb
Author: Eric Payne <ep...@apache.org>
Authored: Fri Mar 18 16:11:06 2016 +0000
Committer: Eric Payne <ep...@apache.org>
Committed: Fri Mar 18 16:12:47 2016 +0000
----------------------------------------------------------------------
.../hadoop/yarn/client/ProtocolHATestBase.java | 3 +-
.../hadoop/yarn/client/TestRMFailover.java | 2 -
.../yarn/client/api/impl/TestYarnClient.java | 18 +++++
.../nodemanager/NodeStatusUpdaterImpl.java | 50 +++++++------
.../hadoop/yarn/server/MiniYARNCluster.java | 75 +++++++-------------
.../yarn/server/TestMiniYARNClusterForHA.java | 4 --
6 files changed, 75 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index cf7fcc5..f336b0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -217,7 +217,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
protected void verifyConnections() throws InterruptedException,
YarnException {
assertTrue("NMs failed to connect to the RM",
- cluster.waitForNodeManagersToConnect(20000));
+ cluster.waitForNodeManagersToConnect(5000));
verifyClientConnection();
}
@@ -279,7 +279,6 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
cluster.resetStartFailoverFlag(false);
cluster.init(conf);
cluster.start();
- getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
index cbc220a..f323351 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
@@ -142,7 +142,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf);
cluster.start();
- getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
@@ -231,7 +230,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster.init(conf);
cluster.start();
- getAdminService(0).transitionToActive(req);
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
index 2c34b99..2d11d8a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
@@ -1195,6 +1195,24 @@ public class TestYarnClient {
client.init(yarnConf);
client.start();
+ int attempts;
+ for(attempts = 10; attempts > 0; attempts--) {
+ if (cluster.getResourceManager().getRMContext().getReservationSystem()
+ .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity()
+ .getMemory() > 0) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (attempts <= 0) {
+ Assert.fail("Exhausted attempts in checking if node capacity was "
+ + "added to the plan");
+ }
+
// create a reservation
Clock clock = new UTCClock();
long arrival = clock.getTime();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 5806731..ad983fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -96,6 +96,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private static final Log LOG = LogFactory.getLog(NodeStatusUpdaterImpl.class);
private final Object heartbeatMonitor = new Object();
+ private final Object shutdownMonitor = new Object();
private final Context context;
private final Dispatcher dispatcher;
@@ -240,15 +241,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override
protected void serviceStop() throws Exception {
// the isStopped check is for avoiding multiple unregistrations.
- if (this.registeredWithRM && !this.isStopped
- && !isNMUnderSupervisionWithRecoveryEnabled()
- && !context.getDecommissioned() && !failedToConnect) {
- unRegisterNM();
+ synchronized(shutdownMonitor) {
+ if (this.registeredWithRM && !this.isStopped
+ && !isNMUnderSupervisionWithRecoveryEnabled()
+ && !context.getDecommissioned() && !failedToConnect) {
+ unRegisterNM();
+ }
+ // Interrupt the updater.
+ this.isStopped = true;
+ stopRMProxy();
+ super.serviceStop();
}
- // Interrupt the updater.
- this.isStopped = true;
- stopRMProxy();
- super.serviceStop();
}
private boolean isNMUnderSupervisionWithRecoveryEnabled() {
@@ -275,19 +278,24 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
// Interrupt the updater.
- this.isStopped = true;
-
- try {
- statusUpdater.join();
- registerWithRM();
- statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
- this.isStopped = false;
- statusUpdater.start();
- LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
- } catch (Exception e) {
- String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
- LOG.error(errorMessage, e);
- throw new YarnRuntimeException(e);
+ synchronized(shutdownMonitor) {
+ if(this.isStopped) {
+ LOG.info("Currently being shutdown. Aborting reboot");
+ return;
+ }
+ this.isStopped = true;
+ try {
+ statusUpdater.join();
+ registerWithRM();
+ statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
+ statusUpdater.start();
+ this.isStopped = false;
+ LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
+ } catch (Exception e) {
+ String errorMessage = "Unexpected error rebooting NodeStatusUpdater";
+ LOG.error(errorMessage, e);
+ throw new YarnRuntimeException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 024adc6..74b7732 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -76,6 +78,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -275,6 +278,12 @@ public class MiniYARNCluster extends CompositeService {
conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
}
+ @Override
+ protected synchronized void serviceStart() throws Exception {
+ super.serviceStart();
+ this.waitForNodeManagersToConnect(5000);
+ }
+
private void setNonHARMConfigurationWithEphemeralPorts(Configuration conf) {
String hostname = MiniYARNCluster.getHostname();
conf.set(YarnConfiguration.RM_ADDRESS, hostname + ":0");
@@ -314,19 +323,7 @@ public class MiniYARNCluster extends CompositeService {
private synchronized void startResourceManager(final int index) {
try {
- Thread rmThread = new Thread() {
- public void run() {
- resourceManagers[index].start();
- }
- };
- rmThread.setName("RM-" + index);
- rmThread.start();
- int waitCount = 0;
- while (resourceManagers[index].getServiceState() == STATE.INITED
- && waitCount++ < 60) {
- LOG.info("Waiting for RM to start...");
- Thread.sleep(1500);
- }
+ resourceManagers[index].start();
if (resourceManagers[index].getServiceState() != STATE.STARTED) {
// RM could have failed.
throw new IOException(
@@ -456,6 +453,11 @@ public class MiniYARNCluster extends CompositeService {
@Override
protected synchronized void serviceStart() throws Exception {
startResourceManager(index);
+ if(index == 0) {
+ resourceManagers[index].getRMContext().getRMAdminService()
+ .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED));
+ }
Configuration conf = resourceManagers[index].getConfig();
LOG.info("Starting resourcemanager " + index);
LOG.info("MiniYARN ResourceManager address: " +
@@ -565,26 +567,12 @@ public class MiniYARNCluster extends CompositeService {
}
protected synchronized void serviceStart() throws Exception {
- try {
- new Thread() {
- public void run() {
- nodeManagers[index].start();
- }
- }.start();
- int waitCount = 0;
- while (nodeManagers[index].getServiceState() == STATE.INITED
- && waitCount++ < 60) {
- LOG.info("Waiting for NM " + index + " to start...");
- Thread.sleep(1000);
- }
- if (nodeManagers[index].getServiceState() != STATE.STARTED) {
- // RM could have failed.
- throw new IOException("NodeManager " + index + " failed to start");
- }
- super.serviceStart();
- } catch (Throwable t) {
- throw new YarnRuntimeException(t);
+ nodeManagers[index].start();
+ if (nodeManagers[index].getServiceState() != STATE.STARTED) {
+ // NM could have failed.
+ throw new IOException("NodeManager " + index + " failed to start");
}
+ super.serviceStart();
}
@Override
@@ -715,7 +703,7 @@ public class MiniYARNCluster extends CompositeService {
/**
* Wait for all the NodeManagers to connect to the ResourceManager.
*
- * @param timeout Time to wait (sleeps in 100 ms intervals) in milliseconds.
+ * @param timeout Time to wait (sleeps in 10 ms intervals) in milliseconds.
* @return true if all NodeManagers connect to the (Active)
* ResourceManager, false otherwise.
* @throws YarnException
@@ -724,17 +712,19 @@ public class MiniYARNCluster extends CompositeService {
public boolean waitForNodeManagersToConnect(long timeout)
throws YarnException, InterruptedException {
GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
- for (int i = 0; i < timeout / 100; i++) {
+ for (int i = 0; i < timeout / 10; i++) {
ResourceManager rm = getResourceManager();
if (rm == null) {
throw new YarnException("Can not find the active RM.");
}
else if (nodeManagers.length == rm.getClientRMService()
- .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
+ .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
+ LOG.info("All Node Managers connected in MiniYARNCluster");
return true;
}
- Thread.sleep(100);
+ Thread.sleep(10);
}
+ LOG.info("Node Managers did not connect within 5000ms");
return false;
}
@@ -769,18 +759,7 @@ public class MiniYARNCluster extends CompositeService {
@Override
protected synchronized void serviceStart() throws Exception {
-
- new Thread() {
- public void run() {
- appHistoryServer.start();
- };
- }.start();
- int waitCount = 0;
- while (appHistoryServer.getServiceState() == STATE.INITED
- && waitCount++ < 60) {
- LOG.info("Waiting for Timeline Server to start...");
- Thread.sleep(1500);
- }
+ appHistoryServer.start();
if (appHistoryServer.getServiceState() != STATE.STARTED) {
// AHS could have failed.
IOException ioe = new IOException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92b7e0d4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
index e84d62e..384d1cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYARNClusterForHA.java
@@ -44,10 +44,6 @@ public class TestMiniYARNClusterForHA {
cluster.init(conf);
cluster.start();
- cluster.getResourceManager(0).getRMContext().getRMAdminService()
- .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_USER));
-
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
}