You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/13 17:58:42 UTC
svn commit: r1492721 [2/3] - in
/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/di...
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java Thu Jun 13 15:58:38 2013
@@ -55,13 +55,7 @@ public class BreakableService extends Ab
}
private int convert(STATE state) {
- switch (state) {
- case NOTINITED: return 0;
- case INITED: return 1;
- case STARTED: return 2;
- case STOPPED: return 3;
- default: return 0;
- }
+ return state.getValue();
}
private void inc(STATE state) {
@@ -75,29 +69,27 @@ public class BreakableService extends Ab
private void maybeFail(boolean fail, String action) {
if (fail) {
- throw new BrokenLifecycleEvent(action);
+ throw new BrokenLifecycleEvent(this, action);
}
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
inc(STATE.INITED);
maybeFail(failOnInit, "init");
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ protected void serviceStart() {
inc(STATE.STARTED);
maybeFail(failOnStart, "start");
- super.start();
}
@Override
- public void stop() {
+ protected void serviceStop() {
inc(STATE.STOPPED);
maybeFail(failOnStop, "stop");
- super.stop();
}
public void setFailOnInit(boolean failOnInit) {
@@ -116,8 +108,13 @@ public class BreakableService extends Ab
* The exception explicitly raised on a failure
*/
public static class BrokenLifecycleEvent extends RuntimeException {
- BrokenLifecycleEvent(String action) {
- super("Lifecycle Failure during " + action);
+
+ final STATE state;
+
+ public BrokenLifecycleEvent(Service service, String action) {
+ super("Lifecycle Failure during " + action + " state is "
+ + service.getServiceState());
+ state = service.getServiceState();
}
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java Thu Jun 13 15:58:38 2013
@@ -19,10 +19,13 @@
package org.apache.hadoop.yarn.service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
public class TestServiceLifecycle extends ServiceAssert {
+ private static Log LOG = LogFactory.getLog(TestServiceLifecycle.class);
/**
* Walk the {@link BreakableService} through it's lifecycle,
@@ -59,13 +62,8 @@ public class TestServiceLifecycle extend
Configuration conf = new Configuration();
conf.set("test.init","t");
svc.init(conf);
- try {
- svc.init(new Configuration());
- fail("Expected a failure, got " + svc);
- } catch (IllegalStateException e) {
- //expected
- }
- assertStateCount(svc, Service.STATE.INITED, 2);
+ svc.init(new Configuration());
+ assertStateCount(svc, Service.STATE.INITED, 1);
assertServiceConfigurationContains(svc, "test.init");
}
@@ -78,21 +76,14 @@ public class TestServiceLifecycle extend
BreakableService svc = new BreakableService();
svc.init(new Configuration());
svc.start();
- try {
- svc.start();
- fail("Expected a failure, got " + svc);
- } catch (IllegalStateException e) {
- //expected
- }
- assertStateCount(svc, Service.STATE.STARTED, 2);
+ svc.start();
+ assertStateCount(svc, Service.STATE.STARTED, 1);
}
/**
* Verify that when a service is stopped more than once, no exception
- * is thrown, and the counter is incremented.
- * This is because the state change operations happen after the counter in
- * the subclass is incremented, even though stop is meant to be a no-op
+ * is thrown.
* @throws Throwable if necessary
*/
@Test
@@ -103,7 +94,7 @@ public class TestServiceLifecycle extend
svc.stop();
assertStateCount(svc, Service.STATE.STOPPED, 1);
svc.stop();
- assertStateCount(svc, Service.STATE.STOPPED, 2);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
}
@@ -124,12 +115,12 @@ public class TestServiceLifecycle extend
//expected
}
//the service state wasn't passed
- assertServiceStateCreated(svc);
+ assertServiceStateStopped(svc);
assertStateCount(svc, Service.STATE.INITED, 1);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
//now try to stop
svc.stop();
- //even after the stop operation, we haven't entered the state
- assertServiceStateCreated(svc);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
}
@@ -151,18 +142,12 @@ public class TestServiceLifecycle extend
//expected
}
//the service state wasn't passed
- assertServiceStateInited(svc);
- assertStateCount(svc, Service.STATE.INITED, 1);
- //now try to stop
- svc.stop();
- //even after the stop operation, we haven't entered the state
- assertServiceStateInited(svc);
+ assertServiceStateStopped(svc);
}
/**
* verify that when a service fails during its stop operation,
- * its state does not change, and the subclass invocation counter
- * increments.
+ * its state does not change.
* @throws Throwable if necessary
*/
@Test
@@ -177,42 +162,302 @@ public class TestServiceLifecycle extend
//expected
}
assertStateCount(svc, Service.STATE.STOPPED, 1);
- assertServiceStateStarted(svc);
- //now try again, and expect it to happen again
+ }
+
+ /**
+ * verify that when a service that is not started is stopped, the
+ * service enters the stopped state
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testStopUnstarted() throws Throwable {
+ BreakableService svc = new BreakableService();
+ svc.stop();
+ assertServiceStateStopped(svc);
+ assertStateCount(svc, Service.STATE.INITED, 0);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
+ }
+
+ /**
+ * Show that if the service failed during an init
+ * operation, stop was called.
+ */
+
+ @Test
+ public void testStopFailingInitAndStop() throws Throwable {
+ BreakableService svc = new BreakableService(true, false, true);
+ svc.register(new LoggingStateChangeListener());
try {
- svc.stop();
+ svc.init(new Configuration());
fail("Expected a failure, got " + svc);
} catch (BreakableService.BrokenLifecycleEvent e) {
+ assertEquals(Service.STATE.INITED, e.state);
+ }
+ //the service state is stopped
+ assertServiceStateStopped(svc);
+ assertEquals(Service.STATE.INITED, svc.getFailureState());
+
+ Throwable failureCause = svc.getFailureCause();
+ assertNotNull("Null failure cause in " + svc, failureCause);
+ BreakableService.BrokenLifecycleEvent cause =
+ (BreakableService.BrokenLifecycleEvent) failureCause;
+ assertNotNull("null state in " + cause + " raised by " + svc, cause.state);
+ assertEquals(Service.STATE.INITED, cause.state);
+ }
+
+ @Test
+ public void testInitNullConf() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ try {
+ svc.init(null);
+ LOG.warn("Null Configurations are permitted ");
+ } catch (ServiceStateException e) {
//expected
}
- assertStateCount(svc, Service.STATE.STOPPED, 2);
+ }
+
+ @Test
+ public void testServiceNotifications() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ svc.register(listener);
+ svc.init(new Configuration());
+ assertEventCount(listener, 1);
+ svc.start();
+ assertEventCount(listener, 2);
+ svc.stop();
+ assertEventCount(listener, 3);
+ svc.stop();
+ assertEventCount(listener, 3);
}
/**
- * verify that when a service that is not started is stopped, its counter
- * of stop calls is still incremented-and the service remains in its
- * original state..
+ * Test that when a service listener is unregistered, it stops being invoked
* @throws Throwable on a failure
*/
@Test
- public void testStopUnstarted() throws Throwable {
- BreakableService svc = new BreakableService();
+ public void testServiceNotificationsStopOnceUnregistered() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ svc.register(listener);
+ svc.init(new Configuration());
+ assertEventCount(listener, 1);
+ svc.unregister(listener);
+ svc.start();
+ assertEventCount(listener, 1);
svc.stop();
- assertServiceStateCreated(svc);
- assertStateCount(svc, Service.STATE.STOPPED, 1);
+ assertEventCount(listener, 1);
+ svc.stop();
+ }
- //stop failed, now it can be initialised
+ /**
+ * This test uses a service listener that unregisters itself during the callbacks.
+ * This a test that verifies the concurrency logic on the listener management
+ * code, that it doesn't throw any immutable state change exceptions
+ * if you change list membership during the notifications.
+ * The standard <code>AbstractService</code> implementation copies the list
+ * to an array in a <code>synchronized</code> block then iterates through
+ * the copy precisely to prevent this problem.
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testServiceNotificationsUnregisterDuringCallback() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener =
+ new SelfUnregisteringBreakableStateChangeListener();
+ BreakableStateChangeListener l2 =
+ new BreakableStateChangeListener();
+ svc.register(listener);
+ svc.register(l2);
svc.init(new Configuration());
-
- //and try to stop again, with no state change but an increment
+ assertEventCount(listener, 1);
+ assertEventCount(l2, 1);
+ svc.unregister(listener);
+ svc.start();
+ assertEventCount(listener, 1);
+ assertEventCount(l2, 2);
svc.stop();
- assertServiceStateInited(svc);
- assertStateCount(svc, Service.STATE.STOPPED, 2);
+ assertEventCount(listener, 1);
+ svc.stop();
+ }
+
+ private static class SelfUnregisteringBreakableStateChangeListener
+ extends BreakableStateChangeListener {
+
+ @Override
+ public synchronized void stateChanged(Service service) {
+ super.stateChanged(service);
+ service.unregister(this);
+ }
+ }
- //once started, the service can be stopped reliably
+ private void assertEventCount(BreakableStateChangeListener listener,
+ int expected) {
+ assertEquals(listener.toString(), expected, listener.getEventCount());
+ }
+
+ @Test
+ public void testServiceFailingNotifications() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ listener.setFailingState(Service.STATE.STARTED);
+ svc.register(listener);
+ svc.init(new Configuration());
+ assertEventCount(listener, 1);
+ //start this; the listener failed but this won't show
svc.start();
- ServiceOperations.stop(svc);
- assertServiceStateStopped(svc);
- assertStateCount(svc, Service.STATE.STOPPED, 3);
+ //counter went up
+ assertEventCount(listener, 2);
+ assertEquals(1, listener.getFailureCount());
+ //stop the service -this doesn't fail
+ svc.stop();
+ assertEventCount(listener, 3);
+ assertEquals(1, listener.getFailureCount());
+ svc.stop();
+ }
+
+ /**
+ * This test verifies that you can block waiting for something to happen
+ * and use notifications to manage it
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testListenerWithNotifications() throws Throwable {
+ //this tests that a listener can get notified when a service is stopped
+ AsyncSelfTerminatingService service = new AsyncSelfTerminatingService(2000);
+ NotifyingListener listener = new NotifyingListener();
+ service.register(listener);
+ service.init(new Configuration());
+ service.start();
+ assertServiceInState(service, Service.STATE.STARTED);
+ long start = System.currentTimeMillis();
+ synchronized (listener) {
+ listener.wait(20000);
+ }
+ long duration = System.currentTimeMillis() - start;
+ assertEquals(Service.STATE.STOPPED, listener.notifyingState);
+ assertServiceInState(service, Service.STATE.STOPPED);
+ assertTrue("Duration of " + duration + " too long", duration < 10000);
+ }
+
+ @Test
+ public void testSelfTerminatingService() throws Throwable {
+ SelfTerminatingService service = new SelfTerminatingService();
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ service.register(listener);
+ service.init(new Configuration());
+ assertEventCount(listener, 1);
+ //start the service
+ service.start();
+ //and expect an event count of exactly two
+ assertEventCount(listener, 2);
+ }
+
+ @Test
+ public void testStartInInitService() throws Throwable {
+ Service service = new StartInInitService();
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ service.register(listener);
+ service.init(new Configuration());
+ assertServiceInState(service, Service.STATE.STARTED);
+ assertEventCount(listener, 1);
+ }
+
+ @Test
+ public void testStopInInitService() throws Throwable {
+ Service service = new StopInInitService();
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ service.register(listener);
+ service.init(new Configuration());
+ assertServiceInState(service, Service.STATE.STOPPED);
+ assertEventCount(listener, 1);
+ }
+
+ /**
+ * Listener that wakes up all threads waiting on it
+ */
+ private static class NotifyingListener implements ServiceStateChangeListener {
+ public Service.STATE notifyingState = Service.STATE.NOTINITED;
+
+ public synchronized void stateChanged(Service service) {
+ notifyingState = service.getServiceState();
+ this.notifyAll();
+ }
+ }
+
+ /**
+ * Service that terminates itself after starting and sleeping for a while
+ */
+ private static class AsyncSelfTerminatingService extends AbstractService
+ implements Runnable {
+ final int timeout;
+ private AsyncSelfTerminatingService(int timeout) {
+ super("AsyncSelfTerminatingService");
+ this.timeout = timeout;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ new Thread(this).start();
+ super.serviceStart();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(timeout);
+ } catch (InterruptedException ignored) {
+
+ }
+ this.stop();
+ }
+ }
+
+ /**
+ * Service that terminates itself in startup
+ */
+ private static class SelfTerminatingService extends AbstractService {
+ private SelfTerminatingService() {
+ super("SelfTerminatingService");
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ //start
+ super.serviceStart();
+ //then stop
+ stop();
+ }
+ }
+
+ /**
+ * Service that starts itself in init
+ */
+ private static class StartInInitService extends AbstractService {
+ private StartInInitService() {
+ super("StartInInitService");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ start();
+ }
}
+
+ /**
+ * Service that starts itself in init
+ */
+ private static class StopInInitService extends AbstractService {
+ private StopInInitService() {
+ super("StopInInitService");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ stop();
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java Thu Jun 13 15:58:38 2013
@@ -21,10 +21,15 @@ package org.apache.hadoop.yarn.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnRuntimeException;
+import org.apache.hadoop.yarn.service.BreakableService;
import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.service.ServiceStateException;
import org.junit.Before;
import org.junit.Test;
@@ -34,6 +39,16 @@ public class TestCompositeService {
private static final int FAILED_SERVICE_SEQ_NUMBER = 2;
+ private static final Log LOG = LogFactory.getLog(TestCompositeService.class);
+
+ /**
+ * flag to state policy of CompositeService, and hence
+ * what to look for after trying to stop a service from another state
+ * (e.g inited)
+ */
+ private static final boolean STOP_ONLY_STARTED_SERVICES =
+ CompositeServiceImpl.isPolicyToStopOnlyStartedServices();
+
@Before
public void setup() {
CompositeServiceImpl.resetCounter();
@@ -59,6 +74,9 @@ public class TestCompositeService {
// Initialise the composite service
serviceManager.init(conf);
+ //verify they were all inited
+ assertInState(STATE.INITED, services);
+
// Verify the init() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
assertEquals("For " + services[i]
@@ -67,11 +85,11 @@ public class TestCompositeService {
}
// Reset the call sequence numbers
- for (int i = 0; i < NUM_OF_SERVICES; i++) {
- services[i].reset();
- }
+ resetServices(services);
serviceManager.start();
+ //verify they were all started
+ assertInState(STATE.STARTED, services);
// Verify the start() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
@@ -79,13 +97,12 @@ public class TestCompositeService {
+ " service, start() call sequence number should have been ", i,
services[i].getCallSequenceNumber());
}
+ resetServices(services);
- // Reset the call sequence numbers
- for (int i = 0; i < NUM_OF_SERVICES; i++) {
- services[i].reset();
- }
serviceManager.stop();
+ //verify they were all stopped
+ assertInState(STATE.STOPPED, services);
// Verify the stop() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
@@ -104,6 +121,13 @@ public class TestCompositeService {
}
}
+ private void resetServices(CompositeServiceImpl[] services) {
+ // Reset the call sequence numbers
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ services[i].reset();
+ }
+ }
+
@Test
public void testServiceStartup() {
ServiceManager serviceManager = new ServiceManager("ServiceManager");
@@ -131,7 +155,7 @@ public class TestCompositeService {
fail("Exception should have been thrown due to startup failure of last service");
} catch (YarnRuntimeException e) {
for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
- if (i >= FAILED_SERVICE_SEQ_NUMBER) {
+ if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) {
// Failed service state should be INITED
assertEquals("Service state should have been ", STATE.INITED,
services[NUM_OF_SERVICES - 1].getServiceState());
@@ -171,15 +195,147 @@ public class TestCompositeService {
try {
serviceManager.stop();
} catch (YarnRuntimeException e) {
- for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
- assertEquals("Service state should have been ", STATE.STOPPED,
- services[NUM_OF_SERVICES].getServiceState());
- }
}
+ assertInState(STATE.STOPPED, services);
}
+ /**
+ * Assert that all services are in the same expected state
+ * @param expected expected state value
+ * @param services services to examine
+ */
+ private void assertInState(STATE expected, CompositeServiceImpl[] services) {
+ assertInState(expected, services,0, services.length);
+ }
+
+ /**
+ * Assert that all services are in the same expected state
+ * @param expected expected state value
+ * @param services services to examine
+ * @param start start offset
+ * @param finish finish offset: the count stops before this number
+ */
+ private void assertInState(STATE expected,
+ CompositeServiceImpl[] services,
+ int start, int finish) {
+ for (int i = start; i < finish; i++) {
+ Service service = services[i];
+ assertInState(expected, service);
+ }
+ }
+
+ private void assertInState(STATE expected, Service service) {
+ assertEquals("Service state should have been " + expected + " in "
+ + service,
+ expected,
+ service.getServiceState());
+ }
+
+ /**
+ * Shut down from not-inited: expect nothing to have happened
+ */
+ @Test
+ public void testServiceStopFromNotInited() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+ serviceManager.stop();
+ assertInState(STATE.NOTINITED, services);
+ }
+
+ /**
+ * Shut down from inited
+ */
+ @Test
+ public void testServiceStopFromInited() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+ serviceManager.init(new Configuration());
+ serviceManager.stop();
+ if (STOP_ONLY_STARTED_SERVICES) {
+ //this policy => no services were stopped
+ assertInState(STATE.INITED, services);
+ } else {
+ assertInState(STATE.STOPPED, services);
+ }
+ }
+
+ /**
+ * Use a null configuration & expect a failure
+ * @throws Throwable
+ */
+ @Test
+ public void testInitNullConf() throws Throwable {
+ ServiceManager serviceManager = new ServiceManager("testInitNullConf");
+
+ CompositeServiceImpl service = new CompositeServiceImpl(0);
+ serviceManager.addTestService(service);
+ try {
+ serviceManager.init(null);
+ LOG.warn("Null Configurations are permitted " + serviceManager);
+ } catch (ServiceStateException e) {
+ //expected
+ }
+ }
+
+ /**
+ * Walk the service through their lifecycle without any children;
+ * verify that it all works.
+ */
+ @Test
+ public void testServiceLifecycleNoChildren() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+ serviceManager.init(new Configuration());
+ serviceManager.start();
+ serviceManager.stop();
+ }
+
+ @Test
+ public void testAddServiceInInit() throws Throwable {
+ BreakableService child = new BreakableService();
+ assertInState(STATE.NOTINITED, child);
+ CompositeServiceAddingAChild composite =
+ new CompositeServiceAddingAChild(child);
+ composite.init(new Configuration());
+ assertInState(STATE.INITED, child);
+ }
+
+ public static class CompositeServiceAddingAChild extends CompositeService{
+ Service child;
+
+ public CompositeServiceAddingAChild(Service child) {
+ super("CompositeServiceAddingAChild");
+ this.child = child;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addService(child);
+ super.serviceInit(conf);
+ }
+ }
+
public static class CompositeServiceImpl extends CompositeService {
+ public static boolean isPolicyToStopOnlyStartedServices() {
+ return STOP_ONLY_STARTED_SERVICES;
+ }
+
private static int counter = -1;
private int callSequenceNumber = -1;
@@ -193,30 +349,30 @@ public class TestCompositeService {
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
counter++;
callSequenceNumber = counter;
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
if (throwExceptionOnStart) {
throw new YarnRuntimeException("Fake service start exception");
}
counter++;
callSequenceNumber = counter;
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
counter++;
callSequenceNumber = counter;
if (throwExceptionOnStop) {
throw new YarnRuntimeException("Fake service stop exception");
}
- super.stop();
+ super.serviceStop();
}
public static int getCounter() {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Thu Jun 13 15:58:38 2013
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.commons.logging.Log;
@@ -75,7 +74,7 @@ public class DeletionService extends Abs
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DeletionService #%d")
.build();
@@ -90,21 +89,23 @@ public class DeletionService extends Abs
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
- sched.shutdown();
- boolean terminated = false;
- try {
- terminated = sched.awaitTermination(10, SECONDS);
- } catch (InterruptedException e) {
- }
- if (terminated != true) {
- sched.shutdownNow();
+ protected void serviceStop() throws Exception {
+ if (sched != null) {
+ sched.shutdown();
+ boolean terminated = false;
+ try {
+ terminated = sched.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) {
+ }
+ if (terminated != true) {
+ sched.shutdownNow();
+ }
}
- super.stop();
+ super.serviceStop();
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Thu Jun 13 15:58:38 2013
@@ -113,7 +113,7 @@ public class LocalDirsHandlerService ext
*
*/
@Override
- public void init(Configuration config) {
+ protected void serviceInit(Configuration config) throws Exception {
// Clone the configuration as we may do modifications to dirs-list
Configuration conf = new Configuration(config);
diskHealthCheckInterval = conf.getLong(
@@ -126,7 +126,7 @@ public class LocalDirsHandlerService ext
YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION,
YarnConfiguration.DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION);
lastDisksCheckTime = System.currentTimeMillis();
- super.init(conf);
+ super.serviceInit(conf);
FileContext localFs;
try {
@@ -150,24 +150,24 @@ public class LocalDirsHandlerService ext
* Method used to start the disk health monitoring, if enabled.
*/
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
if (isDiskHealthCheckerEnabled) {
dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true);
dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask,
diskHealthCheckInterval, diskHealthCheckInterval);
}
- super.start();
+ super.serviceStart();
}
/**
* Method used to terminate the disk health monitoring service.
*/
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (dirsHandlerScheduler != null) {
dirsHandlerScheduler.cancel();
}
- super.stop();
+ super.serviceStop();
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java Thu Jun 13 15:58:38 2013
@@ -39,13 +39,13 @@ public class NodeHealthCheckerService ex
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
if (NodeHealthScriptRunner.shouldRun(conf)) {
nodeHealthScriptRunner = new NodeHealthScriptRunner();
addService(nodeHealthScriptRunner);
}
addService(dirsHandler);
- super.init(conf);
+ super.serviceInit(conf);
}
/**
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java Thu Jun 13 15:58:38 2013
@@ -197,7 +197,7 @@ public class NodeHealthScriptRunner exte
* Method which initializes the values for the script path and interval time.
*/
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
this.nodeHealthScript =
conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
@@ -209,6 +209,7 @@ public class NodeHealthScriptRunner exte
String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
new String[] {});
timer = new NodeHealthMonitorExecutor(args);
+ super.serviceInit(conf);
}
/**
@@ -216,7 +217,7 @@ public class NodeHealthScriptRunner exte
*
*/
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
// if health script path is not configured don't start the thread.
if (!shouldRun(conf)) {
LOG.info("Not starting node health monitor");
@@ -226,6 +227,7 @@ public class NodeHealthScriptRunner exte
// Start the timer task immediately and
// then periodically at interval time.
nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+ super.serviceStart();
}
/**
@@ -233,11 +235,13 @@ public class NodeHealthScriptRunner exte
*
*/
@Override
- public void stop() {
+ protected void serviceStop() {
if (!shouldRun(conf)) {
return;
}
- nodeHealthScriptScheduler.cancel();
+ if (nodeHealthScriptScheduler != null) {
+ nodeHealthScriptScheduler.cancel();
+ }
if (shexec != null) {
Process p = shexec.getProcess();
if (p != null) {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Thu Jun 13 15:58:38 2013
@@ -128,7 +128,7 @@ public class NodeManager extends Composi
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -192,31 +192,36 @@ public class NodeManager extends Composi
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
- super.init(conf);
+ super.serviceInit(conf);
// TODO add local dirs to del
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (isStopping.getAndSet(true)) {
return;
}
-
- cleanupContainers(NodeManagerEventType.SHUTDOWN);
- super.stop();
+ if (context != null) {
+ cleanupContainers(NodeManagerEventType.SHUTDOWN);
+ }
+ super.serviceStop();
DefaultMetricsSystem.shutdown();
}
+ public String getName() {
+ return "NodeManager";
+ }
+
protected void resyncWithRM() {
//we do not want to block dispatcher thread here
new Thread() {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Thu Jun 13 15:58:38 2013
@@ -80,7 +80,7 @@ public class NodeStatusUpdaterImpl exten
private InetSocketAddress rmAddress;
private Resource totalResource;
private int httpPort;
- private boolean isStopped;
+ private volatile boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
@@ -109,7 +109,7 @@ public class NodeStatusUpdaterImpl exten
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
@@ -146,11 +146,11 @@ public class NodeStatusUpdaterImpl exten
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
// NodeManager is the last service to start, so NodeId is available.
this.nodeId = this.context.getNodeId();
@@ -159,7 +159,7 @@ public class NodeStatusUpdaterImpl exten
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
registerWithRM();
- super.start();
+ super.serviceStart();
startStatusUpdater();
} catch (Exception e) {
String errorMessage = "Unexpected error starting NodeStatusUpdater";
@@ -169,10 +169,10 @@ public class NodeStatusUpdaterImpl exten
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
// Interrupt the updater.
this.isStopped = true;
- super.stop();
+ super.serviceStop();
}
protected void rebootNodeStatusUpdater() {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Thu Jun 13 15:58:38 2013
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -82,7 +81,7 @@ public class AuxServices extends Abstrac
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
Collection<String> auxNames = conf.getStringCollection(
YarnConfiguration.NM_AUX_SERVICES);
for (final String sName : auxNames) {
@@ -110,11 +109,11 @@ public class AuxServices extends Abstrac
throw e;
}
}
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ public void serviceStart() throws Exception {
// TODO fork(?) services running as configured user
// monitor for health, shutdown/restart(?) if any should die
for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
@@ -127,11 +126,11 @@ public class AuxServices extends Abstrac
serviceMeta.put(name, meta);
}
}
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
try {
synchronized (serviceMap) {
for (Service service : serviceMap.values()) {
@@ -144,7 +143,7 @@ public class AuxServices extends Abstrac
serviceMeta.clear();
}
} finally {
- super.stop();
+ super.serviceStop();
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu Jun 13 15:58:38 2013
@@ -177,13 +177,13 @@ public class ContainerManagerImpl extend
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
- super.init(conf);
+ super.serviceInit(conf);
}
private void addIfService(Object object) {
@@ -220,7 +220,7 @@ public class ContainerManagerImpl extend
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
// Enqueue user dirs in deletion context
@@ -254,7 +254,7 @@ public class ContainerManagerImpl extend
connectAddress.getPort());
((NodeManager.NMContext)context).setNodeId(nodeId);
LOG.info("ContainerManager started at " + connectAddress);
- super.start();
+ super.serviceStart();
}
void refreshServiceAcls(Configuration configuration,
@@ -263,14 +263,14 @@ public class ContainerManagerImpl extend
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
if (auxiliaryServices.getServiceState() == STARTED) {
auxiliaryServices.unregister(this);
}
if (server != null) {
server.stop();
}
- super.stop();
+ super.serviceStop();
}
// Get the remoteUGI corresponding to the api call.
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Thu Jun 13 15:58:38 2013
@@ -91,20 +91,20 @@ public class ContainersLauncher extends
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
try {
//TODO Is this required?
FileContext.getLocalFSFileContext(conf);
} catch (UnsupportedFileSystemException e) {
throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
}
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
containerLauncher.shutdownNow();
- super.stop();
+ super.serviceStop();
}
@Override
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Jun 13 15:58:38 2013
@@ -194,7 +194,7 @@ public class ResourceLocalizationService
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
this.publicRsrc =
new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
@@ -239,7 +239,7 @@ public class ResourceLocalizationService
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
@@ -248,7 +248,7 @@ public class ResourceLocalizationService
}
@Override
- public void start() {
+ public void serviceStart() throws Exception {
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer();
@@ -257,7 +257,7 @@ public class ResourceLocalizationService
getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
- super.start();
+ super.serviceStart();
}
LocalizerTracker createLocalizerTracker(Configuration conf) {
@@ -288,12 +288,12 @@ public class ResourceLocalizationService
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
if (server != null) {
server.stop();
}
cacheCleanup.shutdown();
- super.stop();
+ super.serviceStop();
}
@Override
@@ -536,9 +536,9 @@ public class ResourceLocalizationService
}
@Override
- public synchronized void start() {
+ public synchronized void serviceStart() throws Exception {
publicLocalizer.start();
- super.start();
+ super.serviceStart();
}
public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
@@ -559,12 +559,12 @@ public class ResourceLocalizationService
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
for (LocalizerRunner localizer : privLocalizers.values()) {
localizer.interrupt();
}
publicLocalizer.interrupt();
- super.stop();
+ super.serviceStop();
}
@Override
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Thu Jun 13 15:58:38 2013
@@ -114,7 +114,7 @@ public class LogAggregationService exten
.build());
}
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -122,22 +122,22 @@ public class LogAggregationService exten
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
// NodeId is only available during start, the following cannot be moved
// anywhere else.
this.nodeId = this.context.getNodeId();
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
stopAggregators();
- super.stop();
+ super.serviceStop();
}
private void stopAggregators() {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java Thu Jun 13 15:58:38 2013
@@ -69,16 +69,16 @@ public class NonAggregatingLogHandler ex
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
sched = createScheduledThreadPoolExecutor(conf);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (sched != null) {
sched.shutdown();
boolean isShutdown = false;
@@ -92,7 +92,7 @@ public class NonAggregatingLogHandler ex
sched.shutdownNow();
}
}
- super.stop();
+ super.serviceStop();
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Thu Jun 13 15:58:38 2013
@@ -85,7 +85,7 @@ public class ContainersMonitorImpl exten
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.monitoringInterval =
conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
@@ -151,7 +151,7 @@ public class ContainersMonitorImpl exten
1) + "). Thrashing might happen.");
}
}
- super.init(conf);
+ super.serviceInit(conf);
}
private boolean isEnabled() {
@@ -175,15 +175,15 @@ public class ContainersMonitorImpl exten
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
if (this.isEnabled()) {
this.monitoringThread.start();
}
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
if (this.isEnabled()) {
this.monitoringThread.interrupt();
try {
@@ -192,7 +192,7 @@ public class ContainersMonitorImpl exten
;
}
}
- super.stop();
+ super.serviceStop();
}
private static class ProcessTreeInfo {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Thu Jun 13 15:58:38 2013
@@ -53,12 +53,7 @@ public class WebServer extends AbstractS
}
@Override
- public synchronized void init(Configuration conf) {
- super.init(conf);
- }
-
- @Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
String bindAddress = getConfig().get(YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
LOG.info("Instantiating NMWebApp at " + bindAddress);
@@ -70,9 +65,9 @@ public class WebServer extends AbstractS
} catch (Exception e) {
String msg = "NMWebapps failed to start.";
LOG.error(msg, e);
- throw new YarnRuntimeException(msg);
+ throw new YarnRuntimeException(msg, e);
}
- super.start();
+ super.serviceStart();
}
public int getPort() {
@@ -80,11 +75,12 @@ public class WebServer extends AbstractS
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
if (this.webApp != null) {
+ LOG.debug("Stopping webapp");
this.webApp.stop();
}
- super.stop();
+ super.serviceStop();
}
public static class NMWebApp extends WebApp implements YarnWebParams {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java Thu Jun 13 15:58:38 2013
@@ -138,9 +138,9 @@ public class TestDeletionService {
}
DeletionService del =
new DeletionService(new FakeDefaultContainerExecutor());
- del.init(new Configuration());
- del.start();
try {
+ del.init(new Configuration());
+ del.start();
for (Path p : content) {
assertTrue(lfs.util().exists(new Path(baseDirs.get(0), p)));
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
@@ -176,9 +176,9 @@ public class TestDeletionService {
conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, -1);
exec.setConf(conf);
DeletionService del = new DeletionService(exec);
- del.init(conf);
- del.start();
try {
+ del.init(conf);
+ del.start();
for (Path p : dirs) {
del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo", p,
null);
@@ -201,9 +201,9 @@ public class TestDeletionService {
DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class));
Configuration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60);
- del.init(conf);
- del.start();
try {
+ del.init(conf);
+ del.start();
del.delete("dingo", new Path("/does/not/exist"));
} finally {
del.stop();
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java Thu Jun 13 15:58:38 2013
@@ -73,7 +73,8 @@ public class TestLocalDirsHandlerService
Assert.fail("Service should have thrown an exception due to wrong URI");
} catch (YarnRuntimeException e) {
}
- Assert.assertTrue("Service should not be inited", dirSvc.getServiceState()
- .compareTo(STATE.NOTINITED) == 0);
+ Assert.assertEquals("Service should not be inited",
+ STATE.STOPPED,
+ dirSvc.getServiceState());
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManager.java Thu Jun 13 15:58:38 2013
@@ -49,6 +49,8 @@ public class TestNodeManager {
} catch (YarnRuntimeException e) {
//PASS
assert(e.getCause().getMessage().contains("dummy executor init called"));
+ } finally {
+ nm.stop();
}
}
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Thu Jun 13 15:58:38 2013
@@ -83,6 +83,7 @@ public class TestNodeManagerShutdown {
static final String user = "nobody";
private FileContext localFS;
private ContainerId cId;
+ private NodeManager nm;
@Before
public void setup() throws UnsupportedFileSystemException {
@@ -98,13 +99,16 @@ public class TestNodeManagerShutdown {
@After
public void tearDown() throws IOException, InterruptedException {
+ if (nm != null) {
+ nm.stop();
+ }
localFS.delete(new Path(basedir.getPath()), true);
}
@Test
public void testKillContainersOnShutdown() throws IOException,
YarnException {
- NodeManager nm = new TestNodeManager();
+ nm = new TestNodeManager();
nm.init(createNMConfig());
nm.start();
startContainer(nm, cId, localFS, tmpDir, processStartFile);
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Thu Jun 13 15:58:38 2013
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -78,8 +79,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
-import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.service.ServiceOperations;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -110,9 +111,7 @@ public class TestNodeStatusUpdater {
public void tearDown() {
this.registeredNodes.clear();
heartBeatID = 0;
- if (nm != null && nm.getServiceState() == STATE.STARTED) {
- nm.stop();
- }
+ ServiceOperations.stop(nm);
DefaultMetricsSystem.shutdown();
}
@@ -316,9 +315,11 @@ public class TestNodeStatusUpdater {
public ResourceTracker resourceTracker =
new MyResourceTracker(this.context);
private Context context;
- private final long waitStartTime;
+ private long waitStartTime;
private final long rmStartIntervalMS;
private final boolean rmNeverStart;
+ private volatile boolean triggered = false;
+ private long durationWhenTriggered = -1;
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
@@ -331,14 +332,50 @@ public class TestNodeStatusUpdater {
}
@Override
+ protected void serviceStart() throws Exception {
+ //record the startup time
+ this.waitStartTime = System.currentTimeMillis();
+ super.serviceStart();
+ }
+
+ @Override
protected ResourceTracker getRMClient() {
- if(System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
- || rmNeverStart) {
- throw new YarnRuntimeException("Faking RM start failure as start " +
- "delay timer has not expired.");
- } else {
- return resourceTracker;
+ if (!triggered) {
+ long t = System.currentTimeMillis();
+ long duration = t - waitStartTime;
+ if (duration <= rmStartIntervalMS
+ || rmNeverStart) {
+ throw new YarnRuntimeException("Faking RM start failure as start " +
+ "delay timer has not expired.");
+ } else {
+ //triggering
+ triggered = true;
+ durationWhenTriggered = duration;
+ }
}
+ return resourceTracker;
+ }
+
+ private boolean isTriggered() {
+ return triggered;
+ }
+
+ private long getWaitStartTime() {
+ return waitStartTime;
+ }
+
+ private long getDurationWhenTriggered() {
+ return durationWhenTriggered;
+ }
+
+ @Override
+ public String toString() {
+ return "MyNodeStatusUpdater4{" +
+ "rmNeverStart=" + rmNeverStart +
+ ", triggered=" + triggered +
+ ", duration=" + durationWhenTriggered +
+ ", rmStartIntervalMS=" + rmStartIntervalMS +
+ '}';
}
}
@@ -390,13 +427,10 @@ public class TestNodeStatusUpdater {
}
@Override
- public void stop() {
- super.stop();
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
isStopped = true;
- try {
- syncBarrier.await();
- } catch (Exception e) {
- }
+ syncBarrier.await(10000, TimeUnit.MILLISECONDS);
}
}
//
@@ -580,7 +614,9 @@ public class TestNodeStatusUpdater {
nodeStatus.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
- heartBeatNodeAction, null, null, null, 1000L);
+ heartBeatNodeAction,
+ null, null, null,
+ 1000L);
return nhResponse;
}
}
@@ -724,7 +760,7 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
Assert.assertEquals(numCleanups.get(), 1);
}
-
+
@Test
public void testNodeDecommision() throws Exception {
nm = getNodeManager(NodeAction.SHUTDOWN);
@@ -749,12 +785,35 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
}
+ private abstract class NodeManagerWithCustomNodeStatusUpdater extends NodeManager {
+ private NodeStatusUpdater updater;
+
+ private NodeManagerWithCustomNodeStatusUpdater() {
+ }
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker) {
+ updater = createUpdater(context, dispatcher, healthChecker);
+ return updater;
+ }
+
+ public NodeStatusUpdater getUpdater() {
+ return updater;
+ }
+
+ abstract NodeStatusUpdater createUpdater(Context context,
+ Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker);
+ }
+
@Test
- public void testNMShutdownForRegistrationFailure() {
+ public void testNMShutdownForRegistrationFailure() throws Exception {
- nm = new NodeManager() {
+ nm = new NodeManagerWithCustomNodeStatusUpdater() {
@Override
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ protected NodeStatusUpdater createUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
context, dispatcher, healthChecker, metrics);
@@ -765,14 +824,14 @@ public class TestNodeStatusUpdater {
return nodeStatusUpdater;
}
};
- verifyNodeStartFailure("org.apache.hadoop.yarn.YarnRuntimeException: "
- + "Recieved SHUTDOWN signal from Resourcemanager ,"
+ verifyNodeStartFailure(
+ "Recieved SHUTDOWN signal from Resourcemanager ,"
+ "Registration of NodeManager failed, "
+ "Message from ResourceManager: RM Shutting Down Node");
}
@Test (timeout = 150000)
- public void testNMConnectionToRM() {
+ public void testNMConnectionToRM() throws Exception {
final long delta = 50000;
final long connectionWaitSecs = 5;
final long connectionRetryIntervalSecs = 1;
@@ -786,9 +845,10 @@ public class TestNodeStatusUpdater {
connectionRetryIntervalSecs);
//Test NM try to connect to RM Several times, but finally fail
- nm = new NodeManager() {
+ NodeManagerWithCustomNodeStatusUpdater nmWithUpdater;
+ nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
@Override
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ protected NodeStatusUpdater createUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
context, dispatcher, healthChecker, metrics,
@@ -802,19 +862,25 @@ public class TestNodeStatusUpdater {
nm.start();
Assert.fail("NM should have failed to start due to RM connect failure");
} catch(Exception e) {
- Assert.assertTrue("NM should have tried re-connecting to RM during " +
+ long t = System.currentTimeMillis();
+ long duration = t - waitStartTime;
+ boolean waitTimeValid = (duration >= connectionWaitSecs * 1000)
+ && (duration < (connectionWaitSecs * 1000 + delta));
+ if(!waitTimeValid) {
+ //either the exception was too early, or it had a different cause.
+ //reject with the inner stack trace
+ throw new Exception("NM should have tried re-connecting to RM during " +
"period of at least " + connectionWaitSecs + " seconds, but " +
"stopped retrying within " + (connectionWaitSecs + delta/1000) +
- " seconds", (System.currentTimeMillis() - waitStartTime
- >= connectionWaitSecs*1000) && (System.currentTimeMillis()
- - waitStartTime < (connectionWaitSecs*1000+delta)));
+ " seconds: " + e, e);
+ }
}
//Test NM connect to RM, fail at first several attempts,
//but finally success.
- nm = new NodeManager() {
+ nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
@Override
- protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ protected NodeStatusUpdater createUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
@@ -822,20 +888,33 @@ public class TestNodeStatusUpdater {
return nodeStatusUpdater;
}
};
-
nm.init(conf);
+ NodeStatusUpdater updater = nmWithUpdater.getUpdater();
+ Assert.assertNotNull("Updater not yet created ", updater);
waitStartTime = System.currentTimeMillis();
try {
nm.start();
} catch (Exception ex){
- Assert.fail("NM should have started successfully " +
- "after connecting to RM.");
- }
- Assert.assertTrue("NM should have connected to RM within " + delta/1000
- +" seconds of RM starting up.",
- (System.currentTimeMillis() - waitStartTime >= rmStartIntervalMS)
- && (System.currentTimeMillis() - waitStartTime
- < (rmStartIntervalMS+delta)));
+ LOG.error("NM should have started successfully " +
+ "after connecting to RM.", ex);
+ throw ex;
+ }
+ long duration = System.currentTimeMillis() - waitStartTime;
+ MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
+ Assert.assertTrue("Updater was never started",
+ myUpdater.getWaitStartTime()>0);
+ Assert.assertTrue("NM started before updater triggered",
+ myUpdater.isTriggered());
+ Assert.assertTrue("NM should have connected to RM after "
+ +"the start interval of " + rmStartIntervalMS
+ +": actual " + duration
+ + " " + myUpdater,
+ (duration >= rmStartIntervalMS));
+ Assert.assertTrue("NM should have connected to RM less than "
+ + (rmStartIntervalMS + delta)
+ +" milliseconds of RM starting up: actual " + duration
+ + " " + myUpdater,
+ (duration < (rmStartIntervalMS + delta)));
}
/**
@@ -846,7 +925,7 @@ public class TestNodeStatusUpdater {
* only after NM_EXPIRY interval. See MAPREDUCE-2749.
*/
@Test
- public void testNoRegistrationWhenNMServicesFail() {
+ public void testNoRegistrationWhenNMServicesFail() throws Exception {
nm = new NodeManager() {
@Override
@@ -865,7 +944,7 @@ public class TestNodeStatusUpdater {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, aclsManager, diskhandler) {
@Override
- public void start() {
+ protected void serviceStart() {
// Simulating failure of starting RPC server
throw new YarnRuntimeException("Starting of RPC Server failed");
}
@@ -961,7 +1040,7 @@ public class TestNodeStatusUpdater {
nm.init(conf);
nm.start();
try {
- syncBarrier.await();
+ syncBarrier.await(10000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
}
Assert.assertTrue(((MyNodeManager2) nm).isStopped);
@@ -1053,20 +1132,25 @@ public class TestNodeStatusUpdater {
}
}
- private void verifyNodeStartFailure(String errMessage) {
+ private void verifyNodeStartFailure(String errMessage) throws Exception {
+ Assert.assertNotNull("nm is null", nm);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
try {
nm.start();
Assert.fail("NM should have failed to start. Didn't get exception!!");
} catch (Exception e) {
- Assert.assertEquals(errMessage, e.getCause()
- .getMessage());
+ //the version in trunk looked in the cause for equality
+ // and assumed failures were nested.
+ //this version assumes that error strings propagate to the base and
+ //use a contains() test only. It should be less brittle
+ if(!e.getMessage().contains(errMessage)) {
+ throw e;
+ }
}
- // the state change to stopped occurs only if the startup is success, else
- // state change doesn't occur
- Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
+ // the service should be stopped
+ Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
.getServiceState());
Assert.assertEquals("Number of registered nodes is wrong!", 0,
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Thu Jun 13 15:58:38 2013
@@ -202,8 +202,7 @@ public abstract class BaseContainerManag
@After
public void tearDown() throws IOException, InterruptedException {
- if (containerManager != null
- && containerManager.getServiceState() == STATE.STARTED) {
+ if (containerManager != null) {
containerManager.stop();
}
createContainerExecutor().deleteAsUser(user,
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Thu Jun 13 15:58:38 2013
@@ -63,20 +63,20 @@ public class TestAuxServices {
}
public ArrayList<Integer> getAppIdsStopped() {
- return (ArrayList)this.stoppedApps.clone();
+ return (ArrayList<Integer>)this.stoppedApps.clone();
}
- @Override
- public void init(Configuration conf) {
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
remaining_init = conf.getInt(idef + ".expected.init", 0);
remaining_stop = conf.getInt(idef + ".expected.stop", 0);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
assertEquals(0, remaining_init);
assertEquals(0, remaining_stop);
- super.stop();
+ super.serviceStop();
}
@Override
public void initApp(String user, ApplicationId appId, ByteBuffer data) {
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1492721&r1=1492720&r2=1492721&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Thu Jun 13 15:58:38 2013
@@ -237,7 +237,7 @@ public class TestResourceLocalizationSer
dirsHandler.init(conf);
DeletionService delService = new DeletionService(exec);
- delService.init(null);
+ delService.init(new Configuration());
delService.start();
ResourceLocalizationService rawService =