You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/02/02 20:51:40 UTC
svn commit: r1563694 - in
/hadoop/common/trunk/hadoop-common-project/hadoop-common: CHANGES.txt
src/main/java/org/apache/hadoop/service/CompositeService.java
src/test/java/org/apache/hadoop/service/TestCompositeService.java
Author: kasha
Date: Sun Feb 2 19:51:39 2014
New Revision: 1563694
URL: http://svn.apache.org/r1563694
Log:
HADOOP-10085. CompositeService should allow adding services while being inited. (Steve Loughran via kasha)
Added:
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/TestCompositeService.java
Modified:
hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1563694&r1=1563693&r2=1563694&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Sun Feb 2 19:51:39 2014
@@ -313,6 +313,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10320. Javadoc in InterfaceStability.java lacks final </ul>.
(René Nyffenegger via cnauroth)
+
+ HADOOP-10085. CompositeService should allow adding services while being
+ inited. (Steve Loughran via kasha)
Release 2.3.0 - UNRELEASED
Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java?rev=1563694&r1=1563693&r2=1563694&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java (original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java Sun Feb 2 19:51:39 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.service;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -54,13 +53,13 @@ public class CompositeService extends Ab
}
/**
- * Get an unmodifiable list of services
+ * Get a cloned list of services
* @return a list of child services at the time of invocation -
* added services will not be picked up.
*/
public List<Service> getServices() {
synchronized (serviceList) {
- return Collections.unmodifiableList(serviceList);
+ return new ArrayList<Service>(serviceList);
}
}
Added: hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/TestCompositeService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/TestCompositeService.java?rev=1563694&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/TestCompositeService.java (added)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/service/TestCompositeService.java Sun Feb 2 19:51:39 2014
@@ -0,0 +1,663 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.service;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestCompositeService {
+
+ private static final int NUM_OF_SERVICES = 5;
+
+ private static final int FAILED_SERVICE_SEQ_NUMBER = 2;
+
+ private static final Log LOG = LogFactory.getLog(TestCompositeService.class);
+
+ /**
+ * flag to state policy of CompositeService, and hence
+ * what to look for after trying to stop a service from another state
+ * (e.g inited)
+ */
+ private static final boolean STOP_ONLY_STARTED_SERVICES =
+ CompositeServiceImpl.isPolicyToStopOnlyStartedServices();
+
+ @Before
+ public void setup() {
+ CompositeServiceImpl.resetCounter();
+ }
+
+ @Test
+ public void testCallSequence() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+
+ assertEquals("Number of registered services ", NUM_OF_SERVICES,
+ services.length);
+
+ Configuration conf = new Configuration();
+ // Initialise the composite service
+ serviceManager.init(conf);
+
+ //verify they were all inited
+ assertInState(STATE.INITED, services);
+
+ // Verify the init() call sequence numbers for every service
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ assertEquals("For " + services[i]
+ + " service, init() call sequence number should have been ", i,
+ services[i].getCallSequenceNumber());
+ }
+
+ // Reset the call sequence numbers
+ resetServices(services);
+
+ serviceManager.start();
+ //verify they were all started
+ assertInState(STATE.STARTED, services);
+
+ // Verify the start() call sequence numbers for every service
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ assertEquals("For " + services[i]
+ + " service, start() call sequence number should have been ", i,
+ services[i].getCallSequenceNumber());
+ }
+ resetServices(services);
+
+
+ serviceManager.stop();
+ //verify they were all stopped
+ assertInState(STATE.STOPPED, services);
+
+ // Verify the stop() call sequence numbers for every service
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ assertEquals("For " + services[i]
+ + " service, stop() call sequence number should have been ",
+ ((NUM_OF_SERVICES - 1) - i), services[i].getCallSequenceNumber());
+ }
+
+ // Try to stop again. This should be a no-op.
+ serviceManager.stop();
+ // Verify that stop() call sequence numbers for every service don't change.
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ assertEquals("For " + services[i]
+ + " service, stop() call sequence number should have been ",
+ ((NUM_OF_SERVICES - 1) - i), services[i].getCallSequenceNumber());
+ }
+ }
+
+ private void resetServices(CompositeServiceImpl[] services) {
+ // Reset the call sequence numbers
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ services[i].reset();
+ }
+ }
+
+ @Test
+ public void testServiceStartup() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ if (i == FAILED_SERVICE_SEQ_NUMBER) {
+ service.setThrowExceptionOnStart(true);
+ }
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+
+ Configuration conf = new Configuration();
+
+ // Initialise the composite service
+ serviceManager.init(conf);
+
+ // Start the composite service
+ try {
+ serviceManager.start();
+ fail("Exception should have been thrown due to startup failure of last service");
+ } catch (ServiceTestRuntimeException e) {
+ for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
+ if (i >= FAILED_SERVICE_SEQ_NUMBER && STOP_ONLY_STARTED_SERVICES) {
+ // Failed service state should be INITED
+ assertEquals("Service state should have been ", STATE.INITED,
+ services[NUM_OF_SERVICES - 1].getServiceState());
+ } else {
+ assertEquals("Service state should have been ", STATE.STOPPED,
+ services[i].getServiceState());
+ }
+ }
+
+ }
+ }
+
+ @Test
+ public void testServiceStop() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ if (i == FAILED_SERVICE_SEQ_NUMBER) {
+ service.setThrowExceptionOnStop(true);
+ }
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+
+ Configuration conf = new Configuration();
+
+ // Initialise the composite service
+ serviceManager.init(conf);
+
+ serviceManager.start();
+
+ // Stop the composite service
+ try {
+ serviceManager.stop();
+ } catch (ServiceTestRuntimeException e) {
+ }
+ assertInState(STATE.STOPPED, services);
+ }
+
+ /**
+ * Assert that all services are in the same expected state
+ * @param expected expected state value
+ * @param services services to examine
+ */
+ private void assertInState(STATE expected, CompositeServiceImpl[] services) {
+ assertInState(expected, services,0, services.length);
+ }
+
+ /**
+ * Assert that all services are in the same expected state
+ * @param expected expected state value
+ * @param services services to examine
+ * @param start start offset
+ * @param finish finish offset: the count stops before this number
+ */
+ private void assertInState(STATE expected,
+ CompositeServiceImpl[] services,
+ int start, int finish) {
+ for (int i = start; i < finish; i++) {
+ Service service = services[i];
+ assertInState(expected, service);
+ }
+ }
+
+ private void assertInState(STATE expected, Service service) {
+ assertEquals("Service state should have been " + expected + " in "
+ + service,
+ expected,
+ service.getServiceState());
+ }
+
+ /**
+ * Shut down from not-inited: expect nothing to have happened
+ */
+ @Test
+ public void testServiceStopFromNotInited() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+ serviceManager.stop();
+ assertInState(STATE.NOTINITED, services);
+ }
+
+ /**
+ * Shut down from inited
+ */
+ @Test
+ public void testServiceStopFromInited() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+
+ // Add services
+ for (int i = 0; i < NUM_OF_SERVICES; i++) {
+ CompositeServiceImpl service = new CompositeServiceImpl(i);
+ serviceManager.addTestService(service);
+ }
+
+ CompositeServiceImpl[] services = serviceManager.getServices().toArray(
+ new CompositeServiceImpl[0]);
+ serviceManager.init(new Configuration());
+ serviceManager.stop();
+ if (STOP_ONLY_STARTED_SERVICES) {
+ //this policy => no services were stopped
+ assertInState(STATE.INITED, services);
+ } else {
+ assertInState(STATE.STOPPED, services);
+ }
+ }
+
+ /**
+ * Use a null configuration & expect a failure
+ * @throws Throwable
+ */
+ @Test
+ public void testInitNullConf() throws Throwable {
+ ServiceManager serviceManager = new ServiceManager("testInitNullConf");
+
+ CompositeServiceImpl service = new CompositeServiceImpl(0);
+ serviceManager.addTestService(service);
+ try {
+ serviceManager.init(null);
+ LOG.warn("Null Configurations are permitted " + serviceManager);
+ } catch (ServiceStateException e) {
+ //expected
+ }
+ }
+
+ /**
+ * Walk the service through their lifecycle without any children;
+ * verify that it all works.
+ */
+ @Test
+ public void testServiceLifecycleNoChildren() {
+ ServiceManager serviceManager = new ServiceManager("ServiceManager");
+ serviceManager.init(new Configuration());
+ serviceManager.start();
+ serviceManager.stop();
+ }
+
+ @Test
+ public void testAddServiceInInit() throws Throwable {
+ BreakableService child = new BreakableService();
+ assertInState(STATE.NOTINITED, child);
+ CompositeServiceAddingAChild composite =
+ new CompositeServiceAddingAChild(child);
+ composite.init(new Configuration());
+ assertInState(STATE.INITED, child);
+ }
+
+ @Test (timeout = 1000)
+ public void testAddIfService() {
+ CompositeService testService = new CompositeService("TestService") {
+ Service service;
+ @Override
+ public void serviceInit(Configuration conf) {
+ Integer notAService = new Integer(0);
+ assertFalse("Added an integer as a service",
+ addIfService(notAService));
+
+ service = new AbstractService("Service") {};
+ assertTrue("Unable to add a service", addIfService(service));
+ }
+ };
+
+ testService.init(new Configuration());
+ assertEquals("Incorrect number of services",
+ 1, testService.getServices().size());
+ }
+
+ @Test(timeout = 1000)
+ public void testAddInitedSiblingInInit() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService sibling = new BreakableService();
+ sibling.init(new Configuration());
+ parent.addService(new AddSiblingService(parent,
+ sibling,
+ STATE.INITED));
+ parent.init(new Configuration());
+ parent.start();
+ parent.stop();
+ assertEquals("Incorrect number of services",
+ 2, parent.getServices().size());
+ }
+
+ @Test(timeout = 1000)
+ public void testAddUninitedSiblingInInit() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService sibling = new BreakableService();
+ parent.addService(new AddSiblingService(parent,
+ sibling,
+ STATE.INITED));
+ parent.init(new Configuration());
+ try {
+ parent.start();
+ fail("Expected an exception, got " + parent);
+ } catch (ServiceStateException e) {
+ //expected
+ }
+ parent.stop();
+ assertEquals("Incorrect number of services",
+ 2, parent.getServices().size());
+ }
+
+ @Test
+ public void testRemoveService() {
+ CompositeService testService = new CompositeService("TestService") {
+ @Override
+ public void serviceInit(Configuration conf) {
+ Integer notAService = new Integer(0);
+ assertFalse("Added an integer as a service",
+ addIfService(notAService));
+
+ Service service1 = new AbstractService("Service1") {};
+ addIfService(service1);
+
+ Service service2 = new AbstractService("Service2") {};
+ addIfService(service2);
+
+ Service service3 = new AbstractService("Service3") {};
+ addIfService(service3);
+
+ removeService(service1);
+ }
+ };
+
+ testService.init(new Configuration());
+ assertEquals("Incorrect number of services",
+ 2, testService.getServices().size());
+ }
+
+ @Test(timeout = 1000)
+ public void testAddStartedChildBeforeInit() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService child = new BreakableService();
+ child.init(new Configuration());
+ child.start();
+ AddSiblingService.addChildToService(parent, child);
+ try {
+ parent.init(new Configuration());
+ fail("Expected an exception, got " + parent);
+ } catch (ServiceStateException e) {
+ //expected
+ }
+ parent.stop();
+ }
+
+ @Test(timeout = 1000)
+ public void testAddStoppedChildBeforeInit() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService child = new BreakableService();
+ child.init(new Configuration());
+ child.start();
+ child.stop();
+ AddSiblingService.addChildToService(parent, child);
+ try {
+ parent.init(new Configuration());
+ fail("Expected an exception, got " + parent);
+ } catch (ServiceStateException e) {
+ //expected
+ }
+ parent.stop();
+ }
+
+ @Test(timeout = 1000)
+ public void testAddStartedSiblingInStart() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService sibling = new BreakableService();
+ sibling.init(new Configuration());
+ sibling.start();
+ parent.addService(new AddSiblingService(parent,
+ sibling,
+ STATE.STARTED));
+ parent.init(new Configuration());
+ parent.start();
+ parent.stop();
+ assertEquals("Incorrect number of services",
+ 2, parent.getServices().size());
+ }
+
+ @Test(timeout = 1000)
+ public void testAddUninitedSiblingInStart() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService sibling = new BreakableService();
+ parent.addService(new AddSiblingService(parent,
+ sibling,
+ STATE.STARTED));
+ parent.init(new Configuration());
+ assertInState(STATE.NOTINITED, sibling);
+ parent.start();
+ parent.stop();
+ assertEquals("Incorrect number of services",
+ 2, parent.getServices().size());
+ }
+
+ @Test(timeout = 1000)
+ public void testAddStartedSiblingInInit() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService sibling = new BreakableService();
+ sibling.init(new Configuration());
+ sibling.start();
+ parent.addService(new AddSiblingService(parent,
+ sibling,
+ STATE.INITED));
+ parent.init(new Configuration());
+ assertInState(STATE.STARTED, sibling);
+ parent.start();
+ assertInState(STATE.STARTED, sibling);
+ parent.stop();
+ assertEquals("Incorrect number of services",
+ 2, parent.getServices().size());
+ assertInState(STATE.STOPPED, sibling);
+ }
+
+ @Test(timeout = 1000)
+ public void testAddStartedSiblingInStop() throws Throwable {
+ CompositeService parent = new CompositeService("parent");
+ BreakableService sibling = new BreakableService();
+ sibling.init(new Configuration());
+ sibling.start();
+ parent.addService(new AddSiblingService(parent,
+ sibling,
+ STATE.STOPPED));
+ parent.init(new Configuration());
+ parent.start();
+ parent.stop();
+ assertEquals("Incorrect number of services",
+ 2, parent.getServices().size());
+ }
+
+ public static class CompositeServiceAddingAChild extends CompositeService{
+ Service child;
+
+ public CompositeServiceAddingAChild(Service child) {
+ super("CompositeServiceAddingAChild");
+ this.child = child;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ addService(child);
+ super.serviceInit(conf);
+ }
+ }
+
+ public static class ServiceTestRuntimeException extends RuntimeException {
+ public ServiceTestRuntimeException(String message) {
+ super(message);
+ }
+ }
+
+ /**
+ * This is a composite service that keeps a count of the number of lifecycle
+ * events called, and can be set to throw a {@link ServiceTestRuntimeException }
+ * during service start or stop
+ */
+ public static class CompositeServiceImpl extends CompositeService {
+
+ public static boolean isPolicyToStopOnlyStartedServices() {
+ return STOP_ONLY_STARTED_SERVICES;
+ }
+
+ private static int counter = -1;
+
+ private int callSequenceNumber = -1;
+
+ private boolean throwExceptionOnStart;
+
+ private boolean throwExceptionOnStop;
+
+ public CompositeServiceImpl(int sequenceNumber) {
+ super(Integer.toString(sequenceNumber));
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ counter++;
+ callSequenceNumber = counter;
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ if (throwExceptionOnStart) {
+ throw new ServiceTestRuntimeException("Fake service start exception");
+ }
+ counter++;
+ callSequenceNumber = counter;
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ counter++;
+ callSequenceNumber = counter;
+ if (throwExceptionOnStop) {
+ throw new ServiceTestRuntimeException("Fake service stop exception");
+ }
+ super.serviceStop();
+ }
+
+ public static int getCounter() {
+ return counter;
+ }
+
+ public int getCallSequenceNumber() {
+ return callSequenceNumber;
+ }
+
+ public void reset() {
+ callSequenceNumber = -1;
+ counter = -1;
+ }
+
+ public static void resetCounter() {
+ counter = -1;
+ }
+
+ public void setThrowExceptionOnStart(boolean throwExceptionOnStart) {
+ this.throwExceptionOnStart = throwExceptionOnStart;
+ }
+
+ public void setThrowExceptionOnStop(boolean throwExceptionOnStop) {
+ this.throwExceptionOnStop = throwExceptionOnStop;
+ }
+
+ @Override
+ public String toString() {
+ return "Service " + getName();
+ }
+
+ }
+
+ /**
+ * Composite service that makes the addService method public to all
+ */
+ public static class ServiceManager extends CompositeService {
+
+ public void addTestService(CompositeService service) {
+ addService(service);
+ }
+
+ public ServiceManager(String name) {
+ super(name);
+ }
+ }
+
+ public static class AddSiblingService extends CompositeService {
+ private final CompositeService parent;
+ private final Service serviceToAdd;
+ private STATE triggerState;
+
+ public AddSiblingService(CompositeService parent,
+ Service serviceToAdd,
+ STATE triggerState) {
+ super("ParentStateManipulatorService");
+ this.parent = parent;
+ this.serviceToAdd = serviceToAdd;
+ this.triggerState = triggerState;
+ }
+
+ /**
+ * Add the serviceToAdd to the parent if this service
+ * is in the state requested
+ */
+ private void maybeAddSibling() {
+ if (getServiceState() == triggerState) {
+ parent.addService(serviceToAdd);
+ }
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ maybeAddSibling();
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ maybeAddSibling();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ maybeAddSibling();
+ super.serviceStop();
+ }
+
+ /**
+ * Expose addService method
+ * @param parent parent service
+ * @param child child to add
+ */
+ public static void addChildToService(CompositeService parent, Service child) {
+ parent.addService(child);
+ }
+ }
+}