You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/13 17:54:42 UTC
svn commit: r1492718 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/
had...
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceOperations.java Thu Jun 13 15:54:38 2013
@@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
/**
* This class contains a set of methods to work with services, especially
@@ -33,74 +38,6 @@ public final class ServiceOperations {
}
/**
- * Verify that that a service is in a given state.
- * @param state the actual state a service is in
- * @param expectedState the desired state
- * @throws IllegalStateException if the service state is different from
- * the desired state
- */
- public static void ensureCurrentState(Service.STATE state,
- Service.STATE expectedState) {
- if (state != expectedState) {
- throw new IllegalStateException("For this operation, the " +
- "current service state must be "
- + expectedState
- + " instead of " + state);
- }
- }
-
- /**
- * Initialize a service.
- * <p/>
- * The service state is checked <i>before</i> the operation begins.
- * This process is <i>not</i> thread safe.
- * @param service a service that must be in the state
- * {@link Service.STATE#NOTINITED}
- * @param configuration the configuration to initialize the service with
- * @throws RuntimeException on a state change failure
- * @throws IllegalStateException if the service is in the wrong state
- */
-
- public static void init(Service service, Configuration configuration) {
- Service.STATE state = service.getServiceState();
- ensureCurrentState(state, Service.STATE.NOTINITED);
- service.init(configuration);
- }
-
- /**
- * Start a service.
- * <p/>
- * The service state is checked <i>before</i> the operation begins.
- * This process is <i>not</i> thread safe.
- * @param service a service that must be in the state
- * {@link Service.STATE#INITED}
- * @throws RuntimeException on a state change failure
- * @throws IllegalStateException if the service is in the wrong state
- */
-
- public static void start(Service service) {
- Service.STATE state = service.getServiceState();
- ensureCurrentState(state, Service.STATE.INITED);
- service.start();
- }
-
- /**
- * Initialize then start a service.
- * <p/>
- * The service state is checked <i>before</i> the operation begins.
- * This process is <i>not</i> thread safe.
- * @param service a service that must be in the state
- * {@link Service.STATE#NOTINITED}
- * @param configuration the configuration to initialize the service with
- * @throws RuntimeException on a state change failure
- * @throws IllegalStateException if the service is in the wrong state
- */
- public static void deploy(Service service, Configuration configuration) {
- init(service, configuration);
- start(service);
- }
-
- /**
* Stop a service.
* <p/>Do nothing if the service is null or not
* in a state in which it can be/needs to be stopped.
@@ -111,10 +48,7 @@ public final class ServiceOperations {
*/
public static void stop(Service service) {
if (service != null) {
- Service.STATE state = service.getServiceState();
- if (state == Service.STATE.STARTED) {
- service.stop();
- }
+ service.stop();
}
}
@@ -127,14 +61,93 @@ public final class ServiceOperations {
* @return any exception that was caught; null if none was.
*/
public static Exception stopQuietly(Service service) {
+ return stopQuietly(LOG, service);
+ }
+
+ /**
+ * Stop a service; if it is null do nothing. Exceptions are caught and
+ * logged at warn level. (but not Throwables). This operation is intended to
+ * be used in cleanup operations
+ *
+ * @param log the log to warn at
+ * @param service a service; may be null
+ * @return any exception that was caught; null if none was.
+ * @see ServiceOperations#stopQuietly(Service)
+ */
+ public static Exception stopQuietly(Log log, Service service) {
try {
stop(service);
} catch (Exception e) {
- LOG.warn("When stopping the service " + service.getName()
- + " : " + e,
+ log.warn("When stopping the service " + service.getName()
+ + " : " + e,
e);
return e;
}
return null;
}
+
+
+ /**
+ * Class to manage a list of {@link ServiceStateChangeListener} instances,
+ * including a notification loop that is robust against changes to the list
+ * during the notification process.
+ */
+ public static class ServiceListeners {
+ /**
+ * List of state change listeners; it is final to guarantee
+ * that it will never be null.
+ */
+ private final List<ServiceStateChangeListener> listeners =
+ new ArrayList<ServiceStateChangeListener>();
+
+ /**
+ * Thread-safe addition of a new listener to the end of a list.
+ * Attempts to re-register a listener that is already registered
+ * will be ignored.
+ * @param l listener
+ */
+ public synchronized void add(ServiceStateChangeListener l) {
+ if(!listeners.contains(l)) {
+ listeners.add(l);
+ }
+ }
+
+ /**
+ * Remove any registration of a listener from the listener list.
+ * @param l listener
+ * @return true if the listener was found (and then removed)
+ */
+ public synchronized boolean remove(ServiceStateChangeListener l) {
+ return listeners.remove(l);
+ }
+
+ /**
+ * Reset the listener list
+ */
+ public synchronized void reset() {
+ listeners.clear();
+ }
+
+ /**
+ * Change to a new state and notify all listeners.
+ * This method will block until all notifications have been issued.
+ * It caches the list of listeners before the notification begins,
+ * so additions or removal of listeners will not be visible.
+ * @param service the service that has changed state
+ */
+ public void notifyListeners(Service service) {
+ //take a very fast snapshot of the callback list
+ //very much like CopyOnWriteArrayList, only more minimal
+ ServiceStateChangeListener[] callbacks;
+ synchronized (this) {
+ callbacks = listeners.toArray(new ServiceStateChangeListener[listeners.size()]);
+ }
+ //iterate through the listeners outside the synchronized method,
+ //ensuring that listener registration/unregistration doesn't break anything
+ for (ServiceStateChangeListener l : callbacks) {
+ l.stateChanged(service);
+ }
+ }
+ }
+
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateException.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.yarn.YarnRuntimeException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception that is raised on state change operations.
+ */
+public class ServiceStateException extends YarnRuntimeException {
+
+ public ServiceStateException(String message) {
+ super(message);
+ }
+
+ public ServiceStateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ServiceStateException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Convert any exception into a {@link RuntimeException}.
+ * If the caught exception already is of that type -including
+ * a {@link YarnException} it is typecast to a {@link RuntimeException}
+ * and returned.
+ *
+ * All other exception types are wrapped in a new instance of
+ * ServiceStateException
+ * @param fault exception or throwable
+ * @return a ServiceStateException to rethrow
+ */
+ public static RuntimeException convert(Throwable fault) {
+ if (fault instanceof RuntimeException) {
+ return (RuntimeException) fault;
+ } else {
+ return new ServiceStateException(fault);
+ }
+ }
+
+ /**
+ * Convert any exception into a {@link RuntimeException}.
+ * If the caught exception already is of that type -including
+ * a {@link YarnException} it is typecast to a {@link RuntimeException}
+ * and returned.
+ *
+ * All other exception types are wrapped in a new instance of
+ * ServiceStateException
+ * @param text text to use if a new exception is created
+ * @param fault exception or throwable
+ * @return a ServiceStateException to rethrow
+ */
+ public static RuntimeException convert(String text, Throwable fault) {
+ if (fault instanceof RuntimeException) {
+ return (RuntimeException) fault;
+ } else {
+ return new ServiceStateException(text, fault);
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/ServiceStateModel.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+/**
+ * Implements the service state model for YARN.
+ */
+public class ServiceStateModel {
+
+ /**
+ * Map of all valid state transitions
+ * [current] [proposed1, proposed2, ...]
+ */
+ private static final boolean[][] statemap =
+ {
+ // uninited inited started stopped
+ /* uninited */ {false, true, false, true},
+ /* inited */ {false, true, true, true},
+ /* started */ {false, false, true, true},
+ /* stopped */ {false, false, false, true},
+ };
+
+ /**
+ * The state of the service
+ */
+ private volatile Service.STATE state;
+
+ /**
+ * The name of the service: used in exceptions
+ */
+ private String name;
+
+ /**
+ * Create the service state model in the {@link Service.STATE#NOTINITED}
+ * state.
+ */
+ public ServiceStateModel(String name) {
+ this(name, Service.STATE.NOTINITED);
+ }
+
+ /**
+ * Create a service state model instance in the chosen state
+ * @param state the starting state
+ */
+ public ServiceStateModel(String name, Service.STATE state) {
+ this.state = state;
+ this.name = name;
+ }
+
+ /**
+ * Query the service state. This is a non-blocking operation.
+ * @return the state
+ */
+ public Service.STATE getState() {
+ return state;
+ }
+
+ /**
+ * Query that the state is in a specific state
+ * @param proposed proposed new state
+ * @return the state
+ */
+ public boolean isInState(Service.STATE proposed) {
+ return state.equals(proposed);
+ }
+
+ /**
+ * Verify that that a service is in a given state.
+ * @param expectedState the desired state
+ * @throws ServiceStateException if the service state is different from
+ * the desired state
+ */
+ public void ensureCurrentState(Service.STATE expectedState) {
+ if (state != expectedState) {
+ throw new ServiceStateException(name+ ": for this operation, the " +
+ "current service state must be "
+ + expectedState
+ + " instead of " + state);
+ }
+ }
+
+ /**
+ * Enter a state -thread safe.
+ *
+ * @param proposed proposed new state
+ * @return the original state
+ * @throws ServiceStateException if the transition is not permitted
+ */
+ public synchronized Service.STATE enterState(Service.STATE proposed) {
+ checkStateTransition(name, state, proposed);
+ Service.STATE oldState = state;
+ //atomic write of the new state
+ state = proposed;
+ return oldState;
+ }
+
+ /**
+ * Check that a state tansition is valid and
+ * throw an exception if not
+ * @param name name of the service (can be null)
+ * @param state current state
+ * @param proposed proposed new state
+ */
+ public static void checkStateTransition(String name,
+ Service.STATE state,
+ Service.STATE proposed) {
+ if (!isValidStateTransition(state, proposed)) {
+ throw new ServiceStateException(name + " cannot enter state "
+ + proposed + " from state " + state);
+ }
+ }
+
+ /**
+ * Is a state transition valid?
+ * There are no checks for current==proposed
+ * as that is considered a non-transition.
+ *
+ * using an array kills off all branch misprediction costs, at the expense
+ * of cache line misses.
+ *
+ * @param current current state
+ * @param proposed proposed new state
+ * @return true if the transition to a new state is valid
+ */
+ public static boolean isValidStateTransition(Service.STATE current,
+ Service.STATE proposed) {
+ boolean[] row = statemap[current.getValue()];
+ return row[proposed.getValue()];
+ }
+
+ /**
+ * return the state text as the toString() value
+ * @return the current state's description
+ */
+ @Override
+ public String toString() {
+ return (name.isEmpty() ? "" : ((name) + ": "))
+ + state.toString();
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java Thu Jun 13 15:54:38 2013
@@ -49,21 +49,21 @@ public abstract class AbstractLiveliness
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
assert !stopped : "starting when already stopped";
checkerThread = new Thread(new PingChecker());
checkerThread.setName("Ping Checker");
checkerThread.start();
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
stopped = true;
if (checkerThread != null) {
checkerThread.interrupt();
}
- super.stop();
+ super.serviceStop();
}
protected abstract void expire(O ob);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableService.java Thu Jun 13 15:54:38 2013
@@ -55,13 +55,7 @@ public class BreakableService extends Ab
}
private int convert(STATE state) {
- switch (state) {
- case NOTINITED: return 0;
- case INITED: return 1;
- case STARTED: return 2;
- case STOPPED: return 3;
- default: return 0;
- }
+ return state.getValue();
}
private void inc(STATE state) {
@@ -75,29 +69,27 @@ public class BreakableService extends Ab
private void maybeFail(boolean fail, String action) {
if (fail) {
- throw new BrokenLifecycleEvent(action);
+ throw new BrokenLifecycleEvent(this, action);
}
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
inc(STATE.INITED);
maybeFail(failOnInit, "init");
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ protected void serviceStart() {
inc(STATE.STARTED);
maybeFail(failOnStart, "start");
- super.start();
}
@Override
- public void stop() {
+ protected void serviceStop() {
inc(STATE.STOPPED);
maybeFail(failOnStop, "stop");
- super.stop();
}
public void setFailOnInit(boolean failOnInit) {
@@ -116,8 +108,13 @@ public class BreakableService extends Ab
* The exception explicitly raised on a failure
*/
public static class BrokenLifecycleEvent extends RuntimeException {
- BrokenLifecycleEvent(String action) {
- super("Lifecycle Failure during " + action);
+
+ final STATE state;
+
+ public BrokenLifecycleEvent(Service service, String action) {
+ super("Lifecycle Failure during " + action + " state is "
+ + service.getServiceState());
+ state = service.getServiceState();
}
}
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/BreakableStateChangeListener.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A state change listener that logs the number of state change events received,
+ * and the last state invoked.
+ *
+ * It can be configured to fail during a state change event
+ */
+public class BreakableStateChangeListener
+ implements ServiceStateChangeListener {
+
+ private final String name;
+
+ private int eventCount;
+ private int failureCount;
+ private Service lastService;
+ private Service.STATE lastState = Service.STATE.NOTINITED;
+ //no callbacks are ever received for this event, so it
+ //can be used as an 'undefined'.
+ private Service.STATE failingState = Service.STATE.NOTINITED;
+ private List<Service.STATE> stateEventList = new ArrayList<Service.STATE>(4);
+
+ public BreakableStateChangeListener() {
+ this( "BreakableStateChangeListener");
+ }
+
+ public BreakableStateChangeListener(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public synchronized void stateChanged(Service service) {
+ eventCount++;
+ lastService = service;
+ lastState = service.getServiceState();
+ stateEventList.add(lastState);
+ if (lastState == failingState) {
+ failureCount++;
+ throw new BreakableService.BrokenLifecycleEvent(service,
+ "Failure entering "
+ + lastState
+ + " for "
+ + service.getName());
+ }
+ }
+
+ public synchronized int getEventCount() {
+ return eventCount;
+ }
+
+ public synchronized Service getLastService() {
+ return lastService;
+ }
+
+ public synchronized Service.STATE getLastState() {
+ return lastState;
+ }
+
+ public synchronized void setFailingState(Service.STATE failingState) {
+ this.failingState = failingState;
+ }
+
+ public synchronized int getFailureCount() {
+ return failureCount;
+ }
+
+ public List<Service.STATE> getStateEventList() {
+ return stateEventList;
+ }
+
+ @Override
+ public synchronized String toString() {
+ String s =
+ name + " - event count = " + eventCount + " last state " + lastState;
+ StringBuilder history = new StringBuilder(stateEventList.size()*10);
+ for (Service.STATE state: stateEventList) {
+ history.append(state).append(" ");
+ }
+ return s + " [ " + history + "]";
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java?rev=1492718&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestGlobalStateChangeListener.java Thu Jun 13 15:54:38 2013
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Test global state changes. It is critical for all tests to clean up the
+ * global listener afterwards to avoid interfering with follow-on tests.
+ *
+ * One listener, {@link #listener} is defined which is automatically
+ * unregistered on cleanup. All other listeners must be unregistered in the
+ * finally clauses of the tests.
+ */
+public class TestGlobalStateChangeListener extends ServiceAssert {
+
+ BreakableStateChangeListener listener = new BreakableStateChangeListener("listener");
+
+
+ private void register() {
+ register(listener);
+ }
+
+ private boolean unregister() {
+ return unregister(listener);
+ }
+
+ private void register(ServiceStateChangeListener l) {
+ AbstractService.registerGlobalListener(l);
+ }
+
+ private boolean unregister(ServiceStateChangeListener l) {
+ return AbstractService.unregisterGlobalListener(l);
+ }
+
+ /**
+ * After every test case reset the list of global listeners.
+ */
+ @After
+ public void cleanup() {
+ AbstractService.resetGlobalListeners();
+ }
+
+ /**
+ * Assert that the last state of the listener is that the test expected.
+ * @param breakable a breakable listener
+ * @param state the expected state
+ */
+ public void assertListenerState(BreakableStateChangeListener breakable,
+ Service.STATE state) {
+ assertEquals("Wrong state in " + breakable, state, breakable.getLastState());
+ }
+
+ /**
+ * Assert that the number of state change notifications matches expectations.
+ * @param breakable the listener
+ * @param count the expected count.
+ */
+ public void assertListenerEventCount(BreakableStateChangeListener breakable,
+ int count) {
+ assertEquals("Wrong event count in " + breakable, count,
+ breakable.getEventCount());
+ }
+
+ /**
+ * Test that register/unregister works
+ */
+ @Test
+ public void testRegisterListener() {
+ register();
+ assertTrue("listener not registered", unregister());
+ }
+
+ /**
+ * Test that double registration results in one registration only.
+ */
+ @Test
+ public void testRegisterListenerTwice() {
+ register();
+ register();
+ assertTrue("listener not registered", unregister());
+ //there should be no listener to unregister the second time
+ assertFalse("listener double registered", unregister());
+ }
+
+ /**
+ * Test that the {@link BreakableStateChangeListener} is picking up
+ * the state changes and that its last event field is as expected.
+ */
+ @Test
+ public void testEventHistory() {
+ register();
+ BreakableService service = new BreakableService();
+ assertListenerState(listener, Service.STATE.NOTINITED);
+ assertEquals(0, listener.getEventCount());
+ service.init(new Configuration());
+ assertListenerState(listener, Service.STATE.INITED);
+ assertSame(service, listener.getLastService());
+ assertListenerEventCount(listener, 1);
+
+ service.start();
+ assertListenerState(listener, Service.STATE.STARTED);
+ assertListenerEventCount(listener, 2);
+
+ service.stop();
+ assertListenerState(listener, Service.STATE.STOPPED);
+ assertListenerEventCount(listener, 3);
+ }
+
+ /**
+ * This test triggers a failure in the listener - the expectation is that the
+ * service has already reached it's desired state, purely because the
+ * notifications take place afterwards.
+ *
+ */
+ @Test
+ public void testListenerFailure() {
+ listener.setFailingState(Service.STATE.INITED);
+ register();
+ BreakableStateChangeListener l2 = new BreakableStateChangeListener();
+ register(l2);
+ BreakableService service = new BreakableService();
+ service.init(new Configuration());
+ //expected notifications to fail
+
+ //still should record its invocation
+ assertListenerState(listener, Service.STATE.INITED);
+ assertListenerEventCount(listener, 1);
+
+ //and second listener didn't get notified of anything
+ assertListenerEventCount(l2, 0);
+
+ //service should still consider itself started
+ assertServiceStateInited(service);
+ service.start();
+ service.stop();
+ }
+
+ /**
+ * Create a chain of listeners and set one in the middle to fail; verify that
+ * those in front got called, and those after did not.
+ */
+ @Test
+ public void testListenerChain() {
+
+ //create and register the listeners
+ LoggingStateChangeListener logListener = new LoggingStateChangeListener();
+ register(logListener);
+ BreakableStateChangeListener l0 = new BreakableStateChangeListener("l0");
+ register(l0);
+ listener.setFailingState(Service.STATE.STARTED);
+ register();
+ BreakableStateChangeListener l3 = new BreakableStateChangeListener("l3");
+ register(l3);
+
+ //create and init a service.
+ BreakableService service = new BreakableService();
+ service.init(new Configuration());
+ assertServiceStateInited(service);
+ assertListenerState(l0, Service.STATE.INITED);
+ assertListenerState(listener, Service.STATE.INITED);
+ assertListenerState(l3, Service.STATE.INITED);
+
+ service.start();
+ //expect that listener l1 and the failing listener are in start, but
+ //not the final one
+ assertServiceStateStarted(service);
+ assertListenerState(l0, Service.STATE.STARTED);
+ assertListenerEventCount(l0, 2);
+ assertListenerState(listener, Service.STATE.STARTED);
+ assertListenerEventCount(listener, 2);
+ //this is the listener that is not expected to have been invoked
+ assertListenerState(l3, Service.STATE.INITED);
+ assertListenerEventCount(l3, 1);
+
+ //stop the service
+ service.stop();
+ //listeners are all updated
+ assertListenerEventCount(l0, 3);
+ assertListenerEventCount(listener, 3);
+ assertListenerEventCount(l3, 2);
+ //can all be unregistered in any order
+ unregister(logListener);
+ unregister(l0);
+ unregister(l3);
+
+ //check that the listeners are all unregistered, even
+ //though they were registered in a different order.
+ //rather than do this by doing unregister checks, a new service is created
+ service = new BreakableService();
+ //this service is initialized
+ service.init(new Configuration());
+ //it is asserted that the event count has not changed for the unregistered
+ //listeners
+ assertListenerEventCount(l0, 3);
+ assertListenerEventCount(l3, 2);
+ //except for the one listener that was not unregistered, which
+ //has incremented by one
+ assertListenerEventCount(listener, 4);
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/service/TestServiceLifecycle.java Thu Jun 13 15:54:38 2013
@@ -19,10 +19,13 @@
package org.apache.hadoop.yarn.service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
public class TestServiceLifecycle extends ServiceAssert {
+ private static Log LOG = LogFactory.getLog(TestServiceLifecycle.class);
/**
* Walk the {@link BreakableService} through it's lifecycle,
@@ -59,13 +62,8 @@ public class TestServiceLifecycle extend
Configuration conf = new Configuration();
conf.set("test.init","t");
svc.init(conf);
- try {
- svc.init(new Configuration());
- fail("Expected a failure, got " + svc);
- } catch (IllegalStateException e) {
- //expected
- }
- assertStateCount(svc, Service.STATE.INITED, 2);
+ svc.init(new Configuration());
+ assertStateCount(svc, Service.STATE.INITED, 1);
assertServiceConfigurationContains(svc, "test.init");
}
@@ -78,21 +76,14 @@ public class TestServiceLifecycle extend
BreakableService svc = new BreakableService();
svc.init(new Configuration());
svc.start();
- try {
- svc.start();
- fail("Expected a failure, got " + svc);
- } catch (IllegalStateException e) {
- //expected
- }
- assertStateCount(svc, Service.STATE.STARTED, 2);
+ svc.start();
+ assertStateCount(svc, Service.STATE.STARTED, 1);
}
/**
* Verify that when a service is stopped more than once, no exception
- * is thrown, and the counter is incremented.
- * This is because the state change operations happen after the counter in
- * the subclass is incremented, even though stop is meant to be a no-op
+ * is thrown.
* @throws Throwable if necessary
*/
@Test
@@ -103,7 +94,7 @@ public class TestServiceLifecycle extend
svc.stop();
assertStateCount(svc, Service.STATE.STOPPED, 1);
svc.stop();
- assertStateCount(svc, Service.STATE.STOPPED, 2);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
}
@@ -124,12 +115,12 @@ public class TestServiceLifecycle extend
//expected
}
//the service state wasn't passed
- assertServiceStateCreated(svc);
+ assertServiceStateStopped(svc);
assertStateCount(svc, Service.STATE.INITED, 1);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
//now try to stop
svc.stop();
- //even after the stop operation, we haven't entered the state
- assertServiceStateCreated(svc);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
}
@@ -151,18 +142,12 @@ public class TestServiceLifecycle extend
//expected
}
//the service state wasn't passed
- assertServiceStateInited(svc);
- assertStateCount(svc, Service.STATE.INITED, 1);
- //now try to stop
- svc.stop();
- //even after the stop operation, we haven't entered the state
- assertServiceStateInited(svc);
+ assertServiceStateStopped(svc);
}
/**
* verify that when a service fails during its stop operation,
- * its state does not change, and the subclass invocation counter
- * increments.
+ * its state does not change.
* @throws Throwable if necessary
*/
@Test
@@ -177,42 +162,302 @@ public class TestServiceLifecycle extend
//expected
}
assertStateCount(svc, Service.STATE.STOPPED, 1);
- assertServiceStateStarted(svc);
- //now try again, and expect it to happen again
+ }
+
+ /**
+ * verify that when a service that is not started is stopped, the
+ * service enters the stopped state
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testStopUnstarted() throws Throwable {
+ BreakableService svc = new BreakableService();
+ svc.stop();
+ assertServiceStateStopped(svc);
+ assertStateCount(svc, Service.STATE.INITED, 0);
+ assertStateCount(svc, Service.STATE.STOPPED, 1);
+ }
+
+ /**
+ * Show that if the service failed during an init
+ * operation, stop was called.
+ */
+
+ @Test
+ public void testStopFailingInitAndStop() throws Throwable {
+ BreakableService svc = new BreakableService(true, false, true);
+ svc.register(new LoggingStateChangeListener());
try {
- svc.stop();
+ svc.init(new Configuration());
fail("Expected a failure, got " + svc);
} catch (BreakableService.BrokenLifecycleEvent e) {
+ assertEquals(Service.STATE.INITED, e.state);
+ }
+ //the service state is stopped
+ assertServiceStateStopped(svc);
+ assertEquals(Service.STATE.INITED, svc.getFailureState());
+
+ Throwable failureCause = svc.getFailureCause();
+ assertNotNull("Null failure cause in " + svc, failureCause);
+ BreakableService.BrokenLifecycleEvent cause =
+ (BreakableService.BrokenLifecycleEvent) failureCause;
+ assertNotNull("null state in " + cause + " raised by " + svc, cause.state);
+ assertEquals(Service.STATE.INITED, cause.state);
+ }
+
+ @Test
+ public void testInitNullConf() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ try {
+ svc.init(null);
+ LOG.warn("Null Configurations are permitted ");
+ } catch (ServiceStateException e) {
//expected
}
- assertStateCount(svc, Service.STATE.STOPPED, 2);
+ }
+
+ @Test
+ public void testServiceNotifications() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ svc.register(listener);
+ svc.init(new Configuration());
+ assertEventCount(listener, 1);
+ svc.start();
+ assertEventCount(listener, 2);
+ svc.stop();
+ assertEventCount(listener, 3);
+ svc.stop();
+ assertEventCount(listener, 3);
}
/**
- * verify that when a service that is not started is stopped, its counter
- * of stop calls is still incremented-and the service remains in its
- * original state..
+ * Test that when a service listener is unregistered, it stops being invoked
* @throws Throwable on a failure
*/
@Test
- public void testStopUnstarted() throws Throwable {
- BreakableService svc = new BreakableService();
+ public void testServiceNotificationsStopOnceUnregistered() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ svc.register(listener);
+ svc.init(new Configuration());
+ assertEventCount(listener, 1);
+ svc.unregister(listener);
+ svc.start();
+ assertEventCount(listener, 1);
svc.stop();
- assertServiceStateCreated(svc);
- assertStateCount(svc, Service.STATE.STOPPED, 1);
+ assertEventCount(listener, 1);
+ svc.stop();
+ }
- //stop failed, now it can be initialised
+ /**
+ * This test uses a service listener that unregisters itself during the callbacks.
+ * This a test that verifies the concurrency logic on the listener management
+ * code, that it doesn't throw any immutable state change exceptions
+ * if you change list membership during the notifications.
+ * The standard <code>AbstractService</code> implementation copies the list
+ * to an array in a <code>synchronized</code> block then iterates through
+ * the copy precisely to prevent this problem.
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testServiceNotificationsUnregisterDuringCallback() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener =
+ new SelfUnregisteringBreakableStateChangeListener();
+ BreakableStateChangeListener l2 =
+ new BreakableStateChangeListener();
+ svc.register(listener);
+ svc.register(l2);
svc.init(new Configuration());
-
- //and try to stop again, with no state change but an increment
+ assertEventCount(listener, 1);
+ assertEventCount(l2, 1);
+ svc.unregister(listener);
+ svc.start();
+ assertEventCount(listener, 1);
+ assertEventCount(l2, 2);
svc.stop();
- assertServiceStateInited(svc);
- assertStateCount(svc, Service.STATE.STOPPED, 2);
+ assertEventCount(listener, 1);
+ svc.stop();
+ }
+
+ private static class SelfUnregisteringBreakableStateChangeListener
+ extends BreakableStateChangeListener {
+
+ @Override
+ public synchronized void stateChanged(Service service) {
+ super.stateChanged(service);
+ service.unregister(this);
+ }
+ }
- //once started, the service can be stopped reliably
+ private void assertEventCount(BreakableStateChangeListener listener,
+ int expected) {
+ assertEquals(listener.toString(), expected, listener.getEventCount());
+ }
+
+ @Test
+ public void testServiceFailingNotifications() throws Throwable {
+ BreakableService svc = new BreakableService(false, false, false);
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ listener.setFailingState(Service.STATE.STARTED);
+ svc.register(listener);
+ svc.init(new Configuration());
+ assertEventCount(listener, 1);
+ //start this; the listener failed but this won't show
svc.start();
- ServiceOperations.stop(svc);
- assertServiceStateStopped(svc);
- assertStateCount(svc, Service.STATE.STOPPED, 3);
+ //counter went up
+ assertEventCount(listener, 2);
+ assertEquals(1, listener.getFailureCount());
+ //stop the service -this doesn't fail
+ svc.stop();
+ assertEventCount(listener, 3);
+ assertEquals(1, listener.getFailureCount());
+ svc.stop();
+ }
+
+ /**
+ * This test verifies that you can block waiting for something to happen
+ * and use notifications to manage it
+ * @throws Throwable on a failure
+ */
+ @Test
+ public void testListenerWithNotifications() throws Throwable {
+ //this tests that a listener can get notified when a service is stopped
+ AsyncSelfTerminatingService service = new AsyncSelfTerminatingService(2000);
+ NotifyingListener listener = new NotifyingListener();
+ service.register(listener);
+ service.init(new Configuration());
+ service.start();
+ assertServiceInState(service, Service.STATE.STARTED);
+ long start = System.currentTimeMillis();
+ synchronized (listener) {
+ listener.wait(20000);
+ }
+ long duration = System.currentTimeMillis() - start;
+ assertEquals(Service.STATE.STOPPED, listener.notifyingState);
+ assertServiceInState(service, Service.STATE.STOPPED);
+ assertTrue("Duration of " + duration + " too long", duration < 10000);
+ }
+
+ @Test
+ public void testSelfTerminatingService() throws Throwable {
+ SelfTerminatingService service = new SelfTerminatingService();
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ service.register(listener);
+ service.init(new Configuration());
+ assertEventCount(listener, 1);
+ //start the service
+ service.start();
+ //and expect an event count of exactly two
+ assertEventCount(listener, 2);
+ }
+
+ @Test
+ public void testStartInInitService() throws Throwable {
+ Service service = new StartInInitService();
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ service.register(listener);
+ service.init(new Configuration());
+ assertServiceInState(service, Service.STATE.STARTED);
+ assertEventCount(listener, 1);
+ }
+
+ @Test
+ public void testStopInInitService() throws Throwable {
+ Service service = new StopInInitService();
+ BreakableStateChangeListener listener = new BreakableStateChangeListener();
+ service.register(listener);
+ service.init(new Configuration());
+ assertServiceInState(service, Service.STATE.STOPPED);
+ assertEventCount(listener, 1);
+ }
+
+ /**
+ * Listener that wakes up all threads waiting on it
+ */
+ private static class NotifyingListener implements ServiceStateChangeListener {
+ public Service.STATE notifyingState = Service.STATE.NOTINITED;
+
+ public synchronized void stateChanged(Service service) {
+ notifyingState = service.getServiceState();
+ this.notifyAll();
+ }
+ }
+
+ /**
+ * Service that terminates itself after starting and sleeping for a while
+ */
+ private static class AsyncSelfTerminatingService extends AbstractService
+ implements Runnable {
+ final int timeout;
+ private AsyncSelfTerminatingService(int timeout) {
+ super("AsyncSelfTerminatingService");
+ this.timeout = timeout;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ new Thread(this).start();
+ super.serviceStart();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(timeout);
+ } catch (InterruptedException ignored) {
+
+ }
+ this.stop();
+ }
+ }
+
+ /**
+ * Service that terminates itself in startup
+ */
+ private static class SelfTerminatingService extends AbstractService {
+ private SelfTerminatingService() {
+ super("SelfTerminatingService");
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ //start
+ super.serviceStart();
+ //then stop
+ stop();
+ }
+ }
+
+ /**
+ * Service that starts itself in init
+ */
+ private static class StartInInitService extends AbstractService {
+ private StartInInitService() {
+ super("StartInInitService");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ start();
+ }
}
+
+ /**
+ * Service that starts itself in init
+ */
+ private static class StopInInitService extends AbstractService {
+ private StopInInitService() {
+ super("StopInInitService");
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ stop();
+ }
+ }
+
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java Thu Jun 13 15:54:38 2013
@@ -21,10 +21,15 @@ package org.apache.hadoop.yarn.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnRuntimeException;
+import org.apache.hadoop.yarn.service.BreakableService;
import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.service.ServiceStateException;
import org.junit.Before;
import org.junit.Test;
@@ -34,6 +39,16 @@ public class TestCompositeService {
private static final int FAILED_SERVICE_SEQ_NUMBER = 2;
+ private static final Log LOG = LogFactory.getLog(TestCompositeService.class);
+
+ /**
+ * flag to state policy of CompositeService, and hence
+ * what to look for after trying to stop a service from another state
+ * (e.g inited)
+ */
+ private static final boolean STOP_ONLY_STARTED_SERVICES =
+ CompositeServiceImpl.isPolicyToStopOnlyStartedServices();
+
@Before
public void setup() {
CompositeServiceImpl.resetCounter();
@@ -59,6 +74,9 @@ public class TestCompositeService {
// Initialise the composite service
serviceManager.init(conf);
+ //verify they were all inited
+ assertInState(STATE.INITED, services);
+
// Verify the init() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
assertEquals("For " + services[i]
@@ -67,11 +85,11 @@ public class TestCompositeService {
}
// Reset the call sequence numbers
- for (int i = 0; i < NUM_OF_SERVICES; i++) {
- services[i].reset();
- }
+ resetServices(services);
serviceManager.start();
+ //verify they were all started
+ assertInState(STATE.STARTED, services);
// Verify the start() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
@@ -79,13 +97,12 @@ public class TestCompositeService {
+ " service, start() call sequence number should have been ", i,
services[i].getCallSequenceNumber());
}
+ resetServices(services);
- // Reset the call sequence numbers
- for (int i = 0; i < NUM_OF_SERVICES; i++) {
- services[i].reset();
- }
serviceManager.stop();
+ //verify they were all stopped
+ assertInState(STATE.STOPPED, services);
// Verify the stop() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
@@ -104,6 +121,13 @@ public class TestCompositeService {
}
}
+ private void resetServices(CompositeServiceImpl[] services) {
+ // Reset the call sequence numbers
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ services[i].reset();
+ }
+ }
+
@Test
public void testServiceStartup() {
ServiceManager serviceManager = new ServiceManager("ServiceManager");
@@ -131,7 +155,7 @@ public class TestCompositeService {
fail("Exception should have been thrown due to startup failure of last service");
} catch (YarnRuntimeException e) {
for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
- if (i >= FAILED_SERVICE_SEQ_NUMBER) {
+ if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) {
// Failed service state should be INITED
assertEquals("Service state should have been ", STATE.INITED,
services[NUM_OF_SERVICES - 1].getServiceState());
@@ -171,15 +195,147 @@ public class TestCompositeService {
try {
serviceManager.stop();
} catch (YarnRuntimeException e) {
- for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
- assertEquals("Service state should have been ", STATE.STOPPED,
- services[NUM_OF_SERVICES].getServiceState());
- }
}
+ assertInState(STATE.STOPPED, services);
}
+ /**
+ * Assert that all services are in the same expected state
+ * @param expected expected state value
+ * @param services services to examine
+ */
+ private void assertInState(STATE expected, CompositeServiceImpl[] services) {
+ assertInState(expected, services,0, services.length);
+ }
+
+ /**
+ * Assert that all services are in the same expected state
+ * @param expected expected state value
+ * @param services services to examine
+ * @param start start offset
+ * @param finish finish offset: the count stops before this number
+ */
+ private void assertInState(STATE expected,
+ CompositeServiceImpl[] services,
+ int start, int finish) {
+ for (int i = start; i < finish; i++) {
+ Service service = services[i];
+ assertInState(expected, service);
+ }
+ }
+
+ private void assertInState(STATE expected, Service service) {
+ assertEquals("Service state should have been " + expected + " in "
+ + service,
+ expected,
+ service.getServiceState());
+ }
+
+ /**
+ * Shut down from not-inited: expect nothing to have happened
+ */
+ @Test
+ public void testServiceStopFromNotInited() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+ serviceManager.stop();
+ assertInState(STATE.NOTINITED, services);
+ }
+
+ /**
+ * Shut down from inited
+ */
+ @Test
+ public void testServiceStopFromInited() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+ serviceManager.init(new Configuration());
+ serviceManager.stop();
+ if (STOP_ONLY_STARTED_SERVICES) {
+ //this policy => no services were stopped
+ assertInState(STATE.INITED, services);
+ } else {
+ assertInState(STATE.STOPPED, services);
+ }
+ }
+
+ /**
+ * Use a null configuration & expect a failure
+ * @throws Throwable
+ */
+ @Test
+ public void testInitNullConf() throws Throwable {
+ ServiceManager serviceManager = new ServiceManager("testInitNullConf");
+
+ CompositeServiceImpl service = new CompositeServiceImpl(0);
+ serviceManager.addTestService(service);
+ try {
+ serviceManager.init(null);
+ LOG.warn("Null Configurations are permitted " + serviceManager);
+ } catch (ServiceStateException e) {
+ //expected
+ }
+ }
+
+ /**
+ * Walk the service through their lifecycle without any children;
+ * verify that it all works.
+ */
+ @Test
+ public void testServiceLifecycleNoChildren() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+ serviceManager.init(new Configuration());
+ serviceManager.start();
+ serviceManager.stop();
+ }
+
+ @Test
+ public void testAddServiceInInit() throws Throwable {
+ BreakableService child = new BreakableService();
+ assertInState(STATE.NOTINITED, child);
+ CompositeServiceAddingAChild composite =
+ new CompositeServiceAddingAChild(child);
+ composite.init(new Configuration());
+ assertInState(STATE.INITED, child);
+ }
+
+ public static class CompositeServiceAddingAChild extends CompositeService{
+ Service child;
+
+ public CompositeServiceAddingAChild(Service child) {
+ super("CompositeServiceAddingAChild");
+ this.child = child;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addService(child);
+ super.serviceInit(conf);
+ }
+ }
+
public static class CompositeServiceImpl extends CompositeService {
+ public static boolean isPolicyToStopOnlyStartedServices() {
+ return STOP_ONLY_STARTED_SERVICES;
+ }
+
private static int counter = -1;
private int callSequenceNumber = -1;
@@ -193,30 +349,30 @@ public class TestCompositeService {
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
counter++;
callSequenceNumber = counter;
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
if (throwExceptionOnStart) {
throw new YarnRuntimeException("Fake service start exception");
}
counter++;
callSequenceNumber = counter;
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
counter++;
callSequenceNumber = counter;
if (throwExceptionOnStop) {
throw new YarnRuntimeException("Fake service stop exception");
}
- super.stop();
+ super.serviceStop();
}
public static int getCounter() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Thu Jun 13 15:54:38 2013
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.commons.logging.Log;
@@ -75,7 +74,7 @@ public class DeletionService extends Abs
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DeletionService #%d")
.build();
@@ -90,21 +89,23 @@ public class DeletionService extends Abs
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
sched.setKeepAliveTime(60L, SECONDS);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
- sched.shutdown();
- boolean terminated = false;
- try {
- terminated = sched.awaitTermination(10, SECONDS);
- } catch (InterruptedException e) {
- }
- if (terminated != true) {
- sched.shutdownNow();
+ protected void serviceStop() throws Exception {
+ if (sched != null) {
+ sched.shutdown();
+ boolean terminated = false;
+ try {
+ terminated = sched.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) {
+ }
+ if (terminated != true) {
+ sched.shutdownNow();
+ }
}
- super.stop();
+ super.serviceStop();
}
/**
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Thu Jun 13 15:54:38 2013
@@ -113,7 +113,7 @@ public class LocalDirsHandlerService ext
*
*/
@Override
- public void init(Configuration config) {
+ protected void serviceInit(Configuration config) throws Exception {
// Clone the configuration as we may do modifications to dirs-list
Configuration conf = new Configuration(config);
diskHealthCheckInterval = conf.getLong(
@@ -126,7 +126,7 @@ public class LocalDirsHandlerService ext
YarnConfiguration.NM_MIN_HEALTHY_DISKS_FRACTION,
YarnConfiguration.DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION);
lastDisksCheckTime = System.currentTimeMillis();
- super.init(conf);
+ super.serviceInit(conf);
FileContext localFs;
try {
@@ -150,24 +150,24 @@ public class LocalDirsHandlerService ext
* Method used to start the disk health monitoring, if enabled.
*/
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
if (isDiskHealthCheckerEnabled) {
dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true);
dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask,
diskHealthCheckInterval, diskHealthCheckInterval);
}
- super.start();
+ super.serviceStart();
}
/**
* Method used to terminate the disk health monitoring service.
*/
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (dirsHandlerScheduler != null) {
dirsHandlerScheduler.cancel();
}
- super.stop();
+ super.serviceStop();
}
/**
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java Thu Jun 13 15:54:38 2013
@@ -39,13 +39,13 @@ public class NodeHealthCheckerService ex
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
if (NodeHealthScriptRunner.shouldRun(conf)) {
nodeHealthScriptRunner = new NodeHealthScriptRunner();
addService(nodeHealthScriptRunner);
}
addService(dirsHandler);
- super.init(conf);
+ super.serviceInit(conf);
}
/**
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthScriptRunner.java Thu Jun 13 15:54:38 2013
@@ -197,7 +197,7 @@ public class NodeHealthScriptRunner exte
* Method which initializes the values for the script path and interval time.
*/
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.conf = conf;
this.nodeHealthScript =
conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
@@ -209,6 +209,7 @@ public class NodeHealthScriptRunner exte
String[] args = conf.getStrings(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS,
new String[] {});
timer = new NodeHealthMonitorExecutor(args);
+ super.serviceInit(conf);
}
/**
@@ -216,7 +217,7 @@ public class NodeHealthScriptRunner exte
*
*/
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
// if health script path is not configured don't start the thread.
if (!shouldRun(conf)) {
LOG.info("Not starting node health monitor");
@@ -226,6 +227,7 @@ public class NodeHealthScriptRunner exte
// Start the timer task immediately and
// then periodically at interval time.
nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+ super.serviceStart();
}
/**
@@ -233,11 +235,13 @@ public class NodeHealthScriptRunner exte
*
*/
@Override
- public void stop() {
+ protected void serviceStop() {
if (!shouldRun(conf)) {
return;
}
- nodeHealthScriptScheduler.cancel();
+ if (nodeHealthScriptScheduler != null) {
+ nodeHealthScriptScheduler.cancel();
+ }
if (shexec != null) {
Process p = shexec.getProcess();
if (p != null) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Thu Jun 13 15:54:38 2013
@@ -128,7 +128,7 @@ public class NodeManager extends Composi
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -192,31 +192,36 @@ public class NodeManager extends Composi
YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
SHUTDOWN_CLEANUP_SLOP_MS;
- super.init(conf);
+ super.serviceInit(conf);
// TODO add local dirs to del
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (isStopping.getAndSet(true)) {
return;
}
-
- cleanupContainers(NodeManagerEventType.SHUTDOWN);
- super.stop();
+ if (context != null) {
+ cleanupContainers(NodeManagerEventType.SHUTDOWN);
+ }
+ super.serviceStop();
DefaultMetricsSystem.shutdown();
}
+ public String getName() {
+ return "NodeManager";
+ }
+
protected void resyncWithRM() {
//we do not want to block dispatcher thread here
new Thread() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Thu Jun 13 15:54:38 2013
@@ -80,7 +80,7 @@ public class NodeStatusUpdaterImpl exten
private InetSocketAddress rmAddress;
private Resource totalResource;
private int httpPort;
- private boolean isStopped;
+ private volatile boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
@@ -109,7 +109,7 @@ public class NodeStatusUpdaterImpl exten
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.rmAddress = conf.getSocketAddr(
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
@@ -146,11 +146,11 @@ public class NodeStatusUpdaterImpl exten
" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
// NodeManager is the last service to start, so NodeId is available.
this.nodeId = this.context.getNodeId();
@@ -159,7 +159,7 @@ public class NodeStatusUpdaterImpl exten
// Registration has to be in start so that ContainerManager can get the
// perNM tokens needed to authenticate ContainerTokens.
registerWithRM();
- super.start();
+ super.serviceStart();
startStatusUpdater();
} catch (Exception e) {
String errorMessage = "Unexpected error starting NodeStatusUpdater";
@@ -169,10 +169,10 @@ public class NodeStatusUpdaterImpl exten
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
// Interrupt the updater.
this.isStopped = true;
- super.stop();
+ super.serviceStop();
}
protected void rebootNodeStatusUpdater() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Thu Jun 13 15:54:38 2013
@@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -82,7 +81,7 @@ public class AuxServices extends Abstrac
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
Collection<String> auxNames = conf.getStringCollection(
YarnConfiguration.NM_AUX_SERVICES);
for (final String sName : auxNames) {
@@ -110,11 +109,11 @@ public class AuxServices extends Abstrac
throw e;
}
}
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void start() {
+ public void serviceStart() throws Exception {
// TODO fork(?) services running as configured user
// monitor for health, shutdown/restart(?) if any should die
for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
@@ -127,11 +126,11 @@ public class AuxServices extends Abstrac
serviceMeta.put(name, meta);
}
}
- super.start();
+ super.serviceStart();
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
try {
synchronized (serviceMap) {
for (Service service : serviceMap.values()) {
@@ -144,7 +143,7 @@ public class AuxServices extends Abstrac
serviceMeta.clear();
}
} finally {
- super.stop();
+ super.serviceStop();
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu Jun 13 15:54:38 2013
@@ -177,13 +177,13 @@ public class ContainerManagerImpl extend
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
LogHandler logHandler =
createLogHandler(conf, this.context, this.deletionService);
addIfService(logHandler);
dispatcher.register(LogHandlerEventType.class, logHandler);
- super.init(conf);
+ super.serviceInit(conf);
}
private void addIfService(Object object) {
@@ -220,7 +220,7 @@ public class ContainerManagerImpl extend
}
@Override
- public void start() {
+ protected void serviceStart() throws Exception {
// Enqueue user dirs in deletion context
@@ -254,7 +254,7 @@ public class ContainerManagerImpl extend
connectAddress.getPort());
((NodeManager.NMContext)context).setNodeId(nodeId);
LOG.info("ContainerManager started at " + connectAddress);
- super.start();
+ super.serviceStart();
}
void refreshServiceAcls(Configuration configuration,
@@ -263,14 +263,14 @@ public class ContainerManagerImpl extend
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
if (auxiliaryServices.getServiceState() == STARTED) {
auxiliaryServices.unregister(this);
}
if (server != null) {
server.stop();
}
- super.stop();
+ super.serviceStop();
}
// Get the remoteUGI corresponding to the api call.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Thu Jun 13 15:54:38 2013
@@ -91,20 +91,20 @@ public class ContainersLauncher extends
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
try {
//TODO Is this required?
FileContext.getLocalFSFileContext(conf);
} catch (UnsupportedFileSystemException e) {
throw new YarnRuntimeException("Failed to start ContainersLauncher", e);
}
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
containerLauncher.shutdownNow();
- super.stop();
+ super.serviceStop();
}
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu Jun 13 15:54:38 2013
@@ -194,7 +194,7 @@ public class ResourceLocalizationService
}
@Override
- public void init(Configuration conf) {
+ public void serviceInit(Configuration conf) throws Exception {
this.validateConf(conf);
this.publicRsrc =
new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
@@ -239,7 +239,7 @@ public class ResourceLocalizationService
localizerTracker = createLocalizerTracker(conf);
addService(localizerTracker);
dispatcher.register(LocalizerEventType.class, localizerTracker);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
@@ -248,7 +248,7 @@ public class ResourceLocalizationService
}
@Override
- public void start() {
+ public void serviceStart() throws Exception {
cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
server = createServer();
@@ -257,7 +257,7 @@ public class ResourceLocalizationService
getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
server.getListenerAddress());
LOG.info("Localizer started on port " + server.getPort());
- super.start();
+ super.serviceStart();
}
LocalizerTracker createLocalizerTracker(Configuration conf) {
@@ -288,12 +288,12 @@ public class ResourceLocalizationService
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
if (server != null) {
server.stop();
}
cacheCleanup.shutdown();
- super.stop();
+ super.serviceStop();
}
@Override
@@ -536,9 +536,9 @@ public class ResourceLocalizationService
}
@Override
- public synchronized void start() {
+ public synchronized void serviceStart() throws Exception {
publicLocalizer.start();
- super.start();
+ super.serviceStart();
}
public LocalizerHeartbeatResponse processHeartbeat(LocalizerStatus status) {
@@ -559,12 +559,12 @@ public class ResourceLocalizationService
}
@Override
- public void stop() {
+ public void serviceStop() throws Exception {
for (LocalizerRunner localizer : privLocalizers.values()) {
localizer.interrupt();
}
publicLocalizer.interrupt();
- super.stop();
+ super.serviceStop();
}
@Override
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Thu Jun 13 15:54:38 2013
@@ -114,7 +114,7 @@ public class LogAggregationService exten
.build());
}
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -122,22 +122,22 @@ public class LogAggregationService exten
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
// NodeId is only available during start, the following cannot be moved
// anywhere else.
this.nodeId = this.context.getNodeId();
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
LOG.info(this.getName() + " waiting for pending aggregation during exit");
stopAggregators();
- super.stop();
+ super.serviceStop();
}
private void stopAggregators() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java Thu Jun 13 15:54:38 2013
@@ -69,16 +69,16 @@ public class NonAggregatingLogHandler ex
}
@Override
- public void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
// Default 3 hours.
this.deleteDelaySeconds =
conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
sched = createScheduledThreadPoolExecutor(conf);
- super.init(conf);
+ super.serviceInit(conf);
}
@Override
- public void stop() {
+ protected void serviceStop() throws Exception {
if (sched != null) {
sched.shutdown();
boolean isShutdown = false;
@@ -92,7 +92,7 @@ public class NonAggregatingLogHandler ex
sched.shutdownNow();
}
}
- super.stop();
+ super.serviceStop();
}
@SuppressWarnings("unchecked")
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1492718&r1=1492717&r2=1492718&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Thu Jun 13 15:54:38 2013
@@ -85,7 +85,7 @@ public class ContainersMonitorImpl exten
}
@Override
- public synchronized void init(Configuration conf) {
+ protected void serviceInit(Configuration conf) throws Exception {
this.monitoringInterval =
conf.getLong(YarnConfiguration.NM_CONTAINER_MON_INTERVAL_MS,
YarnConfiguration.DEFAULT_NM_CONTAINER_MON_INTERVAL_MS);
@@ -151,7 +151,7 @@ public class ContainersMonitorImpl exten
1) + "). Thrashing might happen.");
}
}
- super.init(conf);
+ super.serviceInit(conf);
}
private boolean isEnabled() {
@@ -175,15 +175,15 @@ public class ContainersMonitorImpl exten
}
@Override
- public synchronized void start() {
+ protected void serviceStart() throws Exception {
if (this.isEnabled()) {
this.monitoringThread.start();
}
- super.start();
+ super.serviceStart();
}
@Override
- public synchronized void stop() {
+ protected void serviceStop() throws Exception {
if (this.isEnabled()) {
this.monitoringThread.interrupt();
try {
@@ -192,7 +192,7 @@ public class ContainersMonitorImpl exten
;
}
}
- super.stop();
+ super.serviceStop();
}
private static class ProcessTreeInfo {