You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/05/30 20:57:09 UTC
[02/12] git commit: SLIDER-94: moving the workflow services into
their own package and give the names planned for YARN;
review and clean up ServiceLauncher (which now uses SLF4J(
SLIDER-94: moving the workflow services into their own package and give the names planned for YARN; review and clean up ServiceLauncher (which now uses SLF4J(
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/8f4b8647
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/8f4b8647
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/8f4b8647
Branch: refs/heads/develop
Commit: 8f4b86471acaa0fda346937d30dceb574d512626
Parents: 917de1f
Author: Steve Loughran <st...@apache.org>
Authored: Wed May 28 20:34:03 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Wed May 28 20:34:03 2014 +0100
----------------------------------------------------------------------
.../slider/core/main/LauncherExitCodes.java | 2 +-
.../core/main/ServiceLaunchException.java | 22 +-
.../slider/core/main/ServiceLauncher.java | 85 +++---
.../slider/core/main/ServiceShutdownHook.java | 25 +-
.../providers/AbstractProviderService.java | 18 +-
.../slider/providers/ProviderService.java | 4 +-
.../providers/agent/AgentProviderService.java | 4 +-
.../slideram/SliderAMProviderService.java | 4 +-
.../server/appmaster/SliderAppMaster.java | 4 +-
.../utility/AbstractSliderLaunchedService.java | 2 +-
.../utility/CompoundLaunchedService.java | 134 ----------
.../services/utility/CompoundService.java | 115 ---------
.../server/services/utility/EventCallback.java | 25 --
.../services/utility/EventNotifyingService.java | 63 -----
.../services/utility/ForkedProcessService.java | 233 -----------------
.../slider/server/services/utility/Parent.java | 39 ---
.../services/utility/SequenceService.java | 243 ------------------
.../services/utility/SliderServiceUtils.java | 29 ---
.../WorkflowCompositeLaunchedService.java | 135 ++++++++++
.../services/workflow/ForkedProcessService.java | 233 +++++++++++++++++
.../server/services/workflow/ServiceParent.java | 39 +++
.../workflow/WorkflowCompositeService.java | 131 ++++++++++
.../workflow/WorkflowEventCallback.java | 25 ++
.../workflow/WorkflowEventNotifyingService.java | 73 ++++++
.../workflow/WorkflowSequenceService.java | 257 +++++++++++++++++++
.../model/mock/MockProviderService.groovy | 4 +-
.../services/utility/TestCompoundService.groovy | 38 +--
.../services/utility/TestSequenceService.groovy | 31 ++-
.../accumulo/AccumuloProviderService.java | 10 +-
.../providers/hbase/HBaseProviderService.java | 4 +-
30 files changed, 1039 insertions(+), 992 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/core/main/LauncherExitCodes.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/main/LauncherExitCodes.java b/slider-core/src/main/java/org/apache/slider/core/main/LauncherExitCodes.java
index b172260..2b19e5f 100644
--- a/slider-core/src/main/java/org/apache/slider/core/main/LauncherExitCodes.java
+++ b/slider-core/src/main/java/org/apache/slider/core/main/LauncherExitCodes.java
@@ -20,7 +20,7 @@ package org.apache.slider.core.main;
/*
- * YARN Codes,
+ * Common Exit codes
* Exit codes from 32 up are relative to a base value that
* we put a fair way up from the the base numbers, so that
* applications can have their own set of failures
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java b/slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java
index 8277a51..d84e7f2 100644
--- a/slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java
+++ b/slider-core/src/main/java/org/apache/slider/core/main/ServiceLaunchException.java
@@ -30,22 +30,42 @@ public class ServiceLaunchException extends YarnException
implements ExitCodeProvider, LauncherExitCodes {
private final int exitCode;
-
+
+ /**
+ * Create an exception with the specific exit code
+ * @param exitCode exit code
+ * @param cause cause of the exception
+ */
public ServiceLaunchException(int exitCode, Throwable cause) {
super(cause);
this.exitCode = exitCode;
}
+ /**
+ * Create an exception with the specific exit code and text
+ * @param exitCode exit code
+ * @param message message to use in exception
+ */
public ServiceLaunchException(int exitCode, String message) {
super(message);
this.exitCode = exitCode;
}
+ /**
+ * Create an exception with the specific exit code, text and cause
+ * @param exitCode exit code
+ * @param message message to use in exception
+ * @param cause cause of the exception
+ */
public ServiceLaunchException(int exitCode, String message, Throwable cause) {
super(message, cause);
this.exitCode = exitCode;
}
+ /**
+ * Get the exit code
+ * @return the exit code
+ */
@Override
public int getExitCode() {
return exitCode;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java b/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
index e5e72fa..95bd15e 100644
--- a/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/main/ServiceLauncher.java
@@ -18,8 +18,7 @@
package org.apache.slider.core.main;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.Service;
@@ -27,6 +26,8 @@ import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -65,16 +66,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("UseOfSystemOutOrSystemErr")
public class ServiceLauncher<S extends Service>
implements LauncherExitCodes, IrqHandler.Interrupted {
- private static final Log LOG = LogFactory.getLog(ServiceLauncher.class);
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ServiceLauncher.class);
+
protected static final int PRIORITY = 30;
public static final String NAME = "ServiceLauncher";
- /**
- * name of class for entry point strings: {@value}
- */
- public static final String ENTRY_POINT =
- "org.apache.hadoop.yarn.service.launcher." + NAME;
-
public static final String USAGE_MESSAGE =
"Usage: " + NAME + " classname [--conf <conf file>] <service arguments> | ";
@@ -87,7 +84,7 @@ public class ServiceLauncher<S extends Service>
private volatile S service;
private int serviceExitCode;
- private final List<IrqHandler> interruptHandlers = new ArrayList<>(1);
+ private final List<IrqHandler> interruptHandlers = new ArrayList<IrqHandler>(1);
private Configuration configuration;
private String serviceClassName;
private static AtomicBoolean signalAlreadyReceived = new AtomicBoolean(false);
@@ -153,16 +150,12 @@ public class ServiceLauncher<S extends Service>
* @throws IllegalAccessException not allowed at the class
* @throws InstantiationException not allowed to instantiate it
* @throws InterruptedException thread interrupted
- * @throws IOException any IO exception
+ * @throws Throwable any other failure
*/
public int launchService(Configuration conf,
String[] processedArgs,
boolean addShutdownHook)
- throws Throwable,
- ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- ExitUtil.ExitException {
+ throws Throwable {
instantiateService(conf);
@@ -179,7 +172,8 @@ public class ServiceLauncher<S extends Service>
//if its a runService, pass in the conf and arguments before init)
runService = (RunService) service;
configuration = runService.bindArgs(configuration, processedArgs);
- assert configuration != null : "null configuration returned by bindArgs()";
+ Preconditions.checkNotNull(configuration,
+ "null configuration returned by bindArgs()");
}
//some class constructors init; here this is picked up on.
@@ -191,7 +185,7 @@ public class ServiceLauncher<S extends Service>
if (runService != null) {
//assume that runnable services are meant to run from here
exitCode = runService.runService();
- LOG.debug("Service exited with exit code " + exitCode);
+ LOG.debug("Service exited with exit code {}", exitCode);
} else {
//run the service until it stops or an interrupt happens on a different thread.
@@ -212,26 +206,27 @@ public class ServiceLauncher<S extends Service>
* @throws ClassNotFoundException no such class
* @throws InstantiationException no empty constructor,
* problems with dependencies
- * @throws IllegalAccessException no access rights
+ * @throws ClassNotFoundException classname not on the classpath
+ * @throws IllegalAccessException not allowed at the class
+ * @throws InstantiationException not allowed to instantiate it
+ * @throws InterruptedException thread interrupted
+ * @throws Throwable any other failure
*/
- public Service instantiateService(Configuration conf) throws
- ClassNotFoundException,
- InstantiationException,
- IllegalAccessException,
- ExitUtil.ExitException,
- NoSuchMethodException,
- InvocationTargetException {
+ public Service instantiateService(Configuration conf)
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+ ExitUtil.ExitException, NoSuchMethodException, InvocationTargetException {
+ Preconditions.checkNotNull(conf, "null configuration");
configuration = conf;
//Instantiate the class -this requires the service to have a public
// zero-argument constructor
Class<?> serviceClass =
- this.getClass().getClassLoader().loadClass(serviceClassName);
+ this.getClass().getClassLoader().loadClass(serviceClassName);
Object instance = serviceClass.getConstructor().newInstance();
if (!(instance instanceof Service)) {
//not a service
throw new ExitUtil.ExitException(EXIT_BAD_CONFIGURATION,
- "Not a Service class: " + serviceClassName);
+ "Not a Service class: " + serviceClassName);
}
service = (S) instance;
@@ -241,20 +236,18 @@ public class ServiceLauncher<S extends Service>
/**
* Register this class as the handler for the control-C interrupt.
* Can be overridden for testing.
- * @throws IOException on a failure to add the handler
*/
- protected void registerInterruptHandler() throws IOException {
+ protected void registerInterruptHandler() {
try {
interruptHandlers.add(new IrqHandler(IrqHandler.CONTROL_C, this));
interruptHandlers.add(new IrqHandler(IrqHandler.SIGTERM, this));
} catch (IOException e) {
- error("Signal handler setup failed : " + e, e);
+ error("Signal handler setup failed : {}" + e, e);
}
}
/**
- * The service has been interrupted.
- * Trigger something resembling an elegant shutdown;
+ * The service has been interrupted -try to shut down the service.
* Give the service time to do this before the exit operation is called
* @param interruptData the interrupted data.
*/
@@ -287,14 +280,24 @@ public class ServiceLauncher<S extends Service>
exit(EXIT_INTERRUPTED, message);
}
+ /**
+ * Print a warning: currently this goes to stderr
+ * @param text
+ */
protected void warn(String text) {
System.err.println(text);
}
+ /**
+ * Report an error. The message is printed to stderr; the exception
+ * is logged via the current logger.
+ * @param message message for the user
+ * @param thrown the exception thrown
+ */
protected void error(String message, Throwable thrown) {
String text = "Exception: " + message;
- System.err.println(text);
+ warn(text);
LOG.error(text, thrown);
}
@@ -432,7 +435,7 @@ public class ServiceLauncher<S extends Service>
if (failureState == Service.STATE.STOPPED) {
//the failure occurred during shutdown, not important enough to bother
//the user as it may just scare them
- LOG.debug("Failure during shutdown: " + failure, failure);
+ LOG.debug("Failure during shutdown:{} ", failure, failure);
} else {
//throw it for the catch handlers to deal with
throw failure;
@@ -441,8 +444,8 @@ public class ServiceLauncher<S extends Service>
}
exitException = new ExitUtil.ExitException(exitCode,
"In " + serviceClassName);
- //either the service succeeded, or an error raised during shutdown,
- //which we don't worry that much about
+ // either the service succeeded, or an error raised during shutdown,
+ // which we don't worry that much about
} catch (ExitUtil.ExitException ee) {
exitException = ee;
} catch (Throwable thrown) {
@@ -451,17 +454,17 @@ public class ServiceLauncher<S extends Service>
if (message == null) {
message = thrown.toString();
}
- LOG.error(message) ;
if (thrown instanceof ExitCodeProvider) {
exitCode = ((ExitCodeProvider) thrown).getExitCode();
if (LOG.isDebugEnabled()) {
- LOG.debug("While running " + getServiceName() + ": " + message, thrown);
+ LOG.debug("While running {}: {}", getServiceName(), message, thrown);
}
+ LOG.error(message);
} else {
- //not any of the service launcher exceptions -assume something worse
+ // not any of the service launcher exceptions -assume something worse
error(message, thrown);
exitCode = EXIT_EXCEPTION_THROWN;
- }
+ }
exitException = new ExitUtil.ExitException(exitCode, message);
exitException.initCause(thrown);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java b/slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java
index 82e0e27..9fd9c02 100644
--- a/slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java
+++ b/slider-core/src/main/java/org/apache/slider/core/main/ServiceShutdownHook.java
@@ -18,10 +18,10 @@
package org.apache.slider.core.main;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.lang.ref.WeakReference;
@@ -33,13 +33,14 @@ import java.lang.ref.WeakReference;
* been stopped and deferenced elsewhere.
*/
public class ServiceShutdownHook implements Runnable {
- private static final Log LOG = LogFactory.getLog(ServiceShutdownHook.class);
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ServiceShutdownHook.class);
- private WeakReference<Service> serviceRef;
+ private final WeakReference<Service> serviceRef;
private Runnable hook;
public ServiceShutdownHook(Service service) {
- serviceRef = new WeakReference<>(service);
+ serviceRef = new WeakReference<Service>(service);
}
public void register(int priority) {
@@ -48,20 +49,24 @@ public class ServiceShutdownHook implements Runnable {
ShutdownHookManager.get().addShutdownHook(hook, priority);
}
- public void unregister() {
+ public synchronized void unregister() {
if (hook != null) {
try {
ShutdownHookManager.get().removeShutdownHook(hook);
} catch (IllegalStateException e) {
- LOG.info("Failed to unregister shutdown hook",e);
+ LOG.info("Failed to unregister shutdown hook: {}", e, e);
}
hook = null;
}
}
-// @Override
+ @Override
public void run() {
- Service service = serviceRef.get();
+ Service service;
+ synchronized (this) {
+ service = serviceRef.get();
+ serviceRef.clear();
+ }
if (service == null) {
return;
}
@@ -69,7 +74,7 @@ public class ServiceShutdownHook implements Runnable {
// Stop the Service
service.stop();
} catch (Throwable t) {
- LOG.info("Error stopping " + service.getName(), t);
+ LOG.info("Error stopping {}: {}", service.getName(), t);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index a06134b..f4a872c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -33,9 +33,9 @@ import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.services.registry.RegistryViewForProviders;
-import org.apache.slider.server.services.utility.ForkedProcessService;
-import org.apache.slider.server.services.utility.Parent;
-import org.apache.slider.server.services.utility.SequenceService;
+import org.apache.slider.server.services.workflow.ForkedProcessService;
+import org.apache.slider.server.services.workflow.ServiceParent;
+import org.apache.slider.server.services.workflow.WorkflowSequenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +55,7 @@ import java.util.Map;
* upstream
*/
public abstract class AbstractProviderService
- extends SequenceService
+ extends WorkflowSequenceService
implements
ProviderCore,
SliderKeys,
@@ -189,8 +189,8 @@ public abstract class AbstractProviderService
return (ForkedProcessService) latest;
} else {
//its a composite object, so look inside it for a process
- if (latest instanceof Parent) {
- return getFPSFromParentService((Parent) latest);
+ if (latest instanceof ServiceParent) {
+ return getFPSFromParentService((ServiceParent) latest);
} else {
//no match
return null;
@@ -201,11 +201,11 @@ public abstract class AbstractProviderService
/**
* Given a parent service, find the one that is a forked process
- * @param parent parent
+ * @param serviceParent parent
* @return the forked process service or null if there is none
*/
- protected ForkedProcessService getFPSFromParentService(Parent parent) {
- List<Service> services = parent.getServices();
+ protected ForkedProcessService getFPSFromParentService(ServiceParent serviceParent) {
+ List<Service> services = serviceParent.getServices();
for (Service s : services) {
if (s instanceof ForkedProcessService) {
return (ForkedProcessService) s;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index 8d2462e..ab09a8d 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -34,7 +34,7 @@ import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.services.registry.RegistryViewForProviders;
-import org.apache.slider.server.services.utility.EventCallback;
+import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import java.io.File;
import java.io.IOException;
@@ -81,7 +81,7 @@ public interface ProviderService extends ProviderCore, Service,
boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- EventCallback execInProgress) throws IOException,
+ WorkflowEventCallback execInProgress) throws IOException,
SliderException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 6d3d0e1..8948746 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -66,7 +66,7 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand;
-import org.apache.slider.server.services.utility.EventCallback;
+import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -307,7 +307,7 @@ public class AgentProviderService extends AbstractProviderService implements
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- EventCallback execInProgress) throws
+ WorkflowEventCallback execInProgress) throws
IOException,
SliderException {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index 09e8229..9667c8f 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -44,7 +44,7 @@ import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.agent.AgentKeys;
import org.apache.slider.server.appmaster.PublishedArtifacts;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
-import org.apache.slider.server.services.utility.EventCallback;
+import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import java.io.File;
import java.io.IOException;
@@ -94,7 +94,7 @@ public class SliderAMProviderService extends AbstractProviderService implements
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- EventCallback execInProgress) throws IOException, SliderException {
+ WorkflowEventCallback execInProgress) throws IOException, SliderException {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 3f54e27..d75c788 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -112,7 +112,7 @@ import org.apache.slider.server.appmaster.web.WebAppApiImpl;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
import org.apache.slider.server.services.registry.SliderRegistryService;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
-import org.apache.slider.server.services.utility.EventCallback;
+import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import org.apache.slider.server.services.utility.RpcService;
import org.apache.slider.server.services.utility.WebAppService;
import org.slf4j.Logger;
@@ -149,7 +149,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
SliderClusterProtocol,
ServiceStateChangeListener,
RoleKeys,
- EventCallback,
+ WorkflowEventCallback,
ContainerStartOperation {
protected static final Logger log =
LoggerFactory.getLogger(SliderAppMaster.class);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
index 5d37c32..6bdd376 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
@@ -36,7 +36,7 @@ import static org.apache.slider.common.SliderXmlConfKeys.REGISTRY_PATH;
* Base service for the standard slider client/server services
*/
public abstract class AbstractSliderLaunchedService extends
- CompoundLaunchedService {
+ WorkflowCompositeLaunchedService {
private static final Logger log =
LoggerFactory.getLogger(AbstractSliderLaunchedService.class);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundLaunchedService.java
deleted file mode 100644
index 692da38..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundLaunchedService.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.main.LauncherExitCodes;
-import org.apache.slider.core.main.RunService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompoundLaunchedService extends CompoundService
- implements RunService {
- private static final Logger log = LoggerFactory.getLogger(
- CompoundLaunchedService.class);
- private String[] argv;
-
- public CompoundLaunchedService(String name) {
- super(name);
- }
-
- public CompoundLaunchedService() {
- super("CompoundLaunchedService");
- }
-
- public CompoundLaunchedService(Service... children) {
- super(children);
- }
-
- /**
- * Implementation of set-ness, groovy definition of true/false for a string
- * @param s
- * @return
- */
- protected static boolean isUnset(String s) {
- return StringUtils.isEmpty(s);
- }
-
- protected static boolean isSet(String s) {
- return StringUtils.isNotEmpty(s);
- }
-
- protected String[] getArgv() {
- return argv;
- }
-
- /**
- * Pre-init argument binding
- * @param config the initial configuration build up by the
- * service launcher.
- * @param args argument list list of arguments passed to the command line
- * after any launcher-specific commands have been stripped.
- * @return the configuration
- * @throws Exception
- */
- @Override
- public Configuration bindArgs(Configuration config, String... args) throws
- Exception {
- this.argv = args;
- if (log.isDebugEnabled()) {
- log.debug("Binding {} Arguments:", args.length);
-
- StringBuilder builder = new StringBuilder();
- for (String arg : args) {
- builder.append('"').append(arg).append("\" ");
- }
- log.debug(builder.toString());
- }
- return config;
- }
-
- @Override
- public int runService() throws Throwable {
- return LauncherExitCodes.EXIT_SUCCESS;
- }
-
- @Override
- public void addService(Service service) {
- Preconditions.checkNotNull(service, "null service");
- super.addService(service);
- }
-
- /**
- * Run a child service -initing and starting it if this
- * service has already passed those parts of its own lifecycle
- * @param service the service to start
- */
- protected boolean deployChildService(Service service) {
- service.init(getConfig());
- addService(service);
- if (isInState(STATE.STARTED)) {
- service.start();
- return true;
- }
- return false;
- }
-
- protected void requireArgumentSet(String argname, String argfield)
- throws BadCommandArgumentsException {
- if (isUnset(argfield)) {
- throw new BadCommandArgumentsException("Required argument "
- + argname
- + " missing");
- }
- }
-
- protected void requireArgumentSet(String argname, Object argfield) throws
- BadCommandArgumentsException {
- if (argfield == null) {
- throw new BadCommandArgumentsException("Required argument "
- + argname
- + " missing");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundService.java
deleted file mode 100644
index 4e97842..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/CompoundService.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * An extended composite service which not only makes the
- * addService method public, it auto-registers
- * itself as a listener for state change events.
- *
- * When all child services has stopped, this service stops itself.
- */
-public class CompoundService extends CompositeService implements Parent,
- ServiceStateChangeListener {
-
- private static final Logger log =
- LoggerFactory.getLogger(CompoundService.class);
-
- public CompoundService(String name) {
- super(name);
- }
-
-
- public CompoundService() {
- super("CompoundService");
- }
-
- /**
- * Varargs constructor
- * @param children children
- */
- public CompoundService(Service ... children) {
- this();
- for (Service child : children) {
- addService(child);
- }
- }
-
- /**
- * Add a service, and register it
- * @param service the {@link Service} to be added.
- * Important: do not add a service to a parent during your own serviceInit/start,
- * in Hadoop 2.2; you will trigger a ConcurrentModificationException.
- */
- @Override
- public void addService(Service service) {
- service.registerServiceListener(this);
- super.addService(service);
- }
-
- /**
- * When this service is started, any service stopping with a failure
- * exception is converted immediately into a failure of this service,
- * storing the failure and stopping ourselves.
- * @param child the service that has changed.
- */
- @Override
- public void stateChanged(Service child) {
- //if that child stopped while we are running:
- if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) {
- // a child service has stopped
- //did the child fail? if so: propagate
- Throwable failureCause = child.getFailureCause();
- if (failureCause != null) {
- log.info("Child service " + child + " failed", failureCause);
- //failure. Convert to an exception
- Exception e = SliderServiceUtils.convertToException(failureCause);
- //flip ourselves into the failed state
- noteFailure(e);
- stop();
- } else {
- log.info("Child service completed {}", child);
- if (areAllChildrenStopped()) {
- log.info("All children are halted: stopping");
- stop();
- }
- }
- }
- }
-
- private boolean areAllChildrenStopped() {
- List<Service> children = getServices();
- boolean stopped = true;
- for (Service child : children) {
- if (!child.isInState(STATE.STOPPED)) {
- stopped = false;
- break;
- }
- }
- return stopped;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/EventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/EventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/EventCallback.java
deleted file mode 100644
index 7af463d..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/EventCallback.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-public interface EventCallback {
-
- public void eventCallbackEvent();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/EventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/EventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/EventNotifyingService.java
deleted file mode 100644
index e8db69e..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/EventNotifyingService.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.AbstractService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A service that calls the supplied callback when it is started -after the
- * given delay, then stops itself.
- * Because it calls in on a different thread, it can be used for callbacks
- * that don't
- */
-public class EventNotifyingService extends AbstractService implements Runnable {
- protected static final Logger log =
- LoggerFactory.getLogger(EventNotifyingService.class);
- private final EventCallback callback;
- private final int delay;
-
- public EventNotifyingService(EventCallback callback, int delay) {
- super("EventNotifyingService");
- assert callback != null;
- this.callback = callback;
- this.delay = delay;
- }
-
- @Override
- protected void serviceStart() throws Exception {
- log.debug("Notifying {} after a delay of {} millis", callback, delay);
- new Thread(this, "event").start();
- }
-
- @Override
- public void run() {
- if (delay > 0) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ignored) {
-
- }
- }
- log.debug("Notifying {}", callback);
- callback.eventCallbackEvent();
- stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/ForkedProcessService.java
deleted file mode 100644
index e6610bb..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/ForkedProcessService.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.service.ServiceStateException;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.main.ExitCodeProvider;
-import org.apache.slider.core.main.ServiceLaunchException;
-import org.apache.slider.server.exec.ApplicationEventHandler;
-import org.apache.slider.server.exec.RunLongLivedApp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Service wrapper for an external program that is launched and can/will terminate.
- * This service is notified when the subprocess terminates, and stops itself
- * and converts a non-zero exit code into a failure exception
- */
-public class ForkedProcessService extends AbstractService implements
- ApplicationEventHandler,
- ExitCodeProvider,
- Runnable {
-
- /**
- * Log for the forked master process
- */
- protected static final Logger log =
- LoggerFactory.getLogger(ForkedProcessService.class);
-
- private final String name;
- private final AtomicBoolean processTerminated = new AtomicBoolean(false);
- ;
- private boolean processStarted = false;
- private RunLongLivedApp process;
- private Map<String, String> environment;
- private List<String> commands;
- private String commandLine;
- private int executionTimeout = -1;
- private int timeoutCode = 1;
-
- /**
- * Exit code set when the spawned process exits
- */
- private AtomicInteger exitCode = new AtomicInteger(0);
- private Thread timeoutThread;
-
- public ForkedProcessService(String name) {
- super(name);
- this.name = name;
- }
-
- @Override //AbstractService
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- }
-
- @Override //AbstractService
- protected void serviceStart() throws Exception {
- if (process == null) {
- throw new ServiceStateException("Subprocess not yet configured");
- }
- //now spawn the process -expect updates via callbacks
- process.spawnApplication();
- }
-
- @Override //AbstractService
- protected void serviceStop() throws Exception {
- completed(0);
- if (process != null) {
- process.stop();
- }
- }
-
- /**
- * Set the timeout by which time a process must have finished -or -1 for forever
- * @param timeout timeout in milliseconds
- */
- public void setTimeout(int timeout, int code) {
- this.executionTimeout = timeout;
- this.timeoutCode = code;
- }
-
- /**
- * Build the process to execute when the service is started
- * @param commands list of commands is inserted on the front
- * @param env environment variables above those generated by
- * @throws IOException IO problems
- * @throws SliderException anything internal
- */
- public void build(Map<String, String> environment,
- List<String> commands) throws
- IOException,
- SliderException {
- assert process == null;
- this.commands = commands;
- this.commandLine = SliderUtils.join(commands, " ", false);
- this.environment = environment;
- process = new RunLongLivedApp(log, commands);
- process.setApplicationEventHandler(this);
- //set the env variable mapping
- process.putEnvMap(environment);
- }
-
- @Override // ApplicationEventHandler
- public synchronized void onApplicationStarted(RunLongLivedApp application) {
- log.info("Process has started");
- processStarted = true;
- if (executionTimeout > 0) {
- timeoutThread = new Thread(this);
- timeoutThread.start();
- }
- }
-
- @Override // ApplicationEventHandler
- public void onApplicationExited(RunLongLivedApp application,
- int exitC) {
- synchronized (this) {
- completed(exitC);
- //note whether or not the service had already stopped
- log.info("Process has exited with exit code {}", exitC);
- if (exitC != 0) {
- reportFailure(exitC, name + " failed with code " +
- exitC);
- }
- }
- //now stop itself
- if (!isInState(STATE.STOPPED)) {
- stop();
- }
- }
-
- private void reportFailure(int exitC, String text) {
- this.exitCode.set(exitC);
- //error
- ServiceLaunchException execEx =
- new ServiceLaunchException(exitC,
- text);
- log.debug("Noting failure", execEx);
- noteFailure(execEx);
- }
-
- /**
- * handle timeout response by escalating it to a failure
- */
- @Override
- public void run() {
- try {
- synchronized (processTerminated) {
- if (!processTerminated.get()) {
- processTerminated.wait(executionTimeout);
- }
- }
-
- } catch (InterruptedException e) {
- //assume signalled; exit
- }
- //check the status; if the marker isn't true, bail
- if (!processTerminated.getAndSet(true)) {
- log.info("process timeout: reporting error code {}", timeoutCode);
-
- //timeout
- if (isInState(STATE.STARTED)) {
- //trigger a failure
- process.stop();
- }
- reportFailure(timeoutCode, name + ": timeout after " + executionTimeout
- + " millis: exit code =" + timeoutCode);
- }
- }
-
- protected void completed(int exitCode) {
- this.exitCode.set(exitCode);
- processTerminated.set(true);
- synchronized (processTerminated) {
- processTerminated.notify();
- }
- }
-
- public boolean isProcessTerminated() {
- return processTerminated.get();
- }
-
- public synchronized boolean isProcessStarted() {
- return processStarted;
- }
-
-
- @Override // ExitCodeProvider
- public int getExitCode() {
- return exitCode.get();
- }
-
- public String getCommandLine() {
- return commandLine;
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- * @return a possibly empty list
- */
- public List<String> getRecentOutput() {
- return process != null
- ? process.getRecentOutput()
- : new LinkedList<String>();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/Parent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/Parent.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/Parent.java
deleted file mode 100644
index ea1769c..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/Parent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.Service;
-
-import java.util.List;
-
-/**
- * Interface that services with public methods to manipulate child services
- * should implement
- */
-public interface Parent extends Service {
-
- void addService(Service service);
-
- /**
- * Get an unmodifiable list of services
- * @return a list of child services at the time of invocation -
- * added services will not be picked up.
- */
- List<Service> getServices();
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/SequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/SequenceService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/SequenceService.java
deleted file mode 100644
index 5136645..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/SequenceService.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
-import org.apache.hadoop.service.ServiceStateException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This resembles the YARN CompositeService, except that it
- * starts one service after another: it's init & start operations
- * only work with one service
- */
-
-public class SequenceService extends AbstractService implements Parent,
- ServiceStateChangeListener {
-
- private static final Logger log =
- LoggerFactory.getLogger(SequenceService.class);
-
- /**
- * list of services
- */
- private final List<Service> serviceList = new ArrayList<>();
-
- /**
- * The current service.
- * Volatile -may change & so should be read into a
- * local variable before working with
- */
- private volatile Service currentService;
- /*
- the previous service -the last one that finished.
- Null if one did not finish yet
- */
- private volatile Service previousService;
-
- /**
- * Create a service sequence with the given list of services
- * @param name service name
- * @param offspring initial sequence
- */
- public SequenceService(String name, Service... offspring) {
- super(name);
- for (Service service : offspring) {
- addService(service);
- }
- }
-
- /**
- * Get the current service -which may be null
- * @return service running
- */
- public Service getCurrentService() {
- return currentService;
- }
-
- public Service getPreviousService() {
- return previousService;
- }
-
- /**
- * When started
- * @throws Exception
- */
- @Override
- protected void serviceStart() throws Exception {
- startNextService();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- //stop current service.
- //this triggers a callback that is caught and ignored
- Service current = currentService;
- previousService = current;
- currentService = null;
- if (current != null) {
- current.stop();
- }
- }
-
-
- /**
- * Start the next service in the list.
- * Return false if there are no more services to run, or this
- * service has stopped
- * @return true if a service was started
- * @throws RuntimeException from any init or start failure
- * @throws ServiceStateException if this call is made before
- * the service is started
- */
- public synchronized boolean startNextService() {
- if (isInState(STATE.STOPPED)) {
- //downgrade to a failed
- log.debug("Not starting next service -{} is stopped", this);
- return false;
- }
- if (!isInState(STATE.STARTED)) {
- //reject attempts to start a service too early
- throw new ServiceStateException(
- "Cannot start a child service when not started");
- }
- if (serviceList.isEmpty()) {
- //nothing left to run
- return false;
- }
- if (currentService != null && currentService.getFailureCause() != null) {
- //did the last service fail? Is this caused by some premature callback?
- log.debug("Not starting next service due to a failure of {}",
- currentService);
- return false;
- }
- //bear in mind that init & start can fail, which
- //can trigger re-entrant calls into the state change listener.
- //by setting the current service to null
- //the start-next-service logic is skipped.
- //now, what does that mean w.r.t exit states?
-
- currentService = null;
- Service head = serviceList.remove(0);
-
- try {
- head.init(getConfig());
- head.registerServiceListener(this);
- head.start();
- } catch (RuntimeException e) {
- noteFailure(e);
- throw e;
- }
- //at this point the service must have explicitly started & not failed,
- //else an exception would have been raised
- currentService = head;
- return true;
- }
-
- /**
- * State change event relays service stop events to
- * {@link #onServiceCompleted(Service)}. Subclasses can
- * extend that with extra logic
- * @param service the service that has changed.
- */
- @Override
- public void stateChanged(Service service) {
- if (service == currentService && service.isInState(STATE.STOPPED)) {
- onServiceCompleted(service);
- }
- }
-
- /**
- * handler for service completion: base class starts the next service
- * @param service service that has completed
- */
- protected synchronized void onServiceCompleted(Service service) {
- log.info("Running service stopped: {}", service);
- previousService = currentService;
-
-
- //start the next service if we are not stopped ourselves
- if (isInState(STATE.STARTED)) {
-
- //did the service fail? if so: propagate
- Throwable failureCause = service.getFailureCause();
- if (failureCause != null) {
- Exception e = SliderServiceUtils.convertToException(failureCause);
- noteFailure(e);
- stop();
- }
-
- //start the next service
- boolean started;
- try {
- started = startNextService();
- } catch (Exception e) {
- //something went wrong here
- noteFailure(e);
- started = false;
- }
- if (!started) {
- //no start because list is empty
- //stop and expect the notification to go upstream
- stop();
- }
- } else {
- //not started, so just note that the current service
- //has gone away
- currentService = null;
- }
- }
-
- /**
- * Add the passed {@link Service} to the list of services managed by this
- * {@link SequenceService}
- * @param service the {@link Service} to be added
- */
- @Override //Parent
- public synchronized void addService(Service service) {
- log.debug("Adding service {} ", service.getName());
- synchronized (serviceList) {
- serviceList.add(service);
- }
- }
-
- /**
- * Get an unmodifiable list of services
- * @return a list of child services at the time of invocation -
- * added services will not be picked up.
- */
- @Override //Parent
- public synchronized List<Service> getServices() {
- return Collections.unmodifiableList(serviceList);
- }
-
- @Override // Object
- public synchronized String toString() {
- return super.toString() + "; current service " + currentService
- + "; queued service count=" + serviceList.size();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/SliderServiceUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/SliderServiceUtils.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/SliderServiceUtils.java
deleted file mode 100644
index 4fc1525..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/SliderServiceUtils.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-public class SliderServiceUtils {
-
- public static Exception convertToException(Throwable failureCause) {
- return (failureCause instanceof Exception) ?
- (Exception)failureCause
- : new Exception(failureCause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
new file mode 100644
index 0000000..ea20393
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.utility;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.main.LauncherExitCodes;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.server.services.workflow.WorkflowCompositeService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WorkflowCompositeLaunchedService extends WorkflowCompositeService
+ implements RunService {
+ private static final Logger log = LoggerFactory.getLogger(
+ WorkflowCompositeLaunchedService.class);
+ private String[] argv;
+
+ public WorkflowCompositeLaunchedService(String name) {
+ super(name);
+ }
+
+ public WorkflowCompositeLaunchedService() {
+ super("CompoundLaunchedService");
+ }
+
+ public WorkflowCompositeLaunchedService(Service... children) {
+ super(children);
+ }
+
+ /**
+ * Implementation of set-ness, groovy definition of true/false for a string
+ * @param s
+ * @return
+ */
+ protected static boolean isUnset(String s) {
+ return StringUtils.isEmpty(s);
+ }
+
+ protected static boolean isSet(String s) {
+ return StringUtils.isNotEmpty(s);
+ }
+
+ protected String[] getArgv() {
+ return argv;
+ }
+
+ /**
+ * Pre-init argument binding
+ * @param config the initial configuration build up by the
+ * service launcher.
+ * @param args argument list list of arguments passed to the command line
+ * after any launcher-specific commands have been stripped.
+ * @return the configuration
+ * @throws Exception
+ */
+ @Override
+ public Configuration bindArgs(Configuration config, String... args) throws
+ Exception {
+ this.argv = args;
+ if (log.isDebugEnabled()) {
+ log.debug("Binding {} Arguments:", args.length);
+
+ StringBuilder builder = new StringBuilder();
+ for (String arg : args) {
+ builder.append('"').append(arg).append("\" ");
+ }
+ log.debug(builder.toString());
+ }
+ return config;
+ }
+
+ @Override
+ public int runService() throws Throwable {
+ return LauncherExitCodes.EXIT_SUCCESS;
+ }
+
+ @Override
+ public void addService(Service service) {
+ Preconditions.checkNotNull(service, "null service");
+ super.addService(service);
+ }
+
+ /**
+ * Run a child service -initing and starting it if this
+ * service has already passed those parts of its own lifecycle
+ * @param service the service to start
+ */
+ protected boolean deployChildService(Service service) {
+ service.init(getConfig());
+ addService(service);
+ if (isInState(STATE.STARTED)) {
+ service.start();
+ return true;
+ }
+ return false;
+ }
+
+ protected void requireArgumentSet(String argname, String argfield)
+ throws BadCommandArgumentsException {
+ if (isUnset(argfield)) {
+ throw new BadCommandArgumentsException("Required argument "
+ + argname
+ + " missing");
+ }
+ }
+
+ protected void requireArgumentSet(String argname, Object argfield) throws
+ BadCommandArgumentsException {
+ if (argfield == null) {
+ throw new BadCommandArgumentsException("Required argument "
+ + argname
+ + " missing");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
new file mode 100644
index 0000000..3210cbf
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.main.ExitCodeProvider;
+import org.apache.slider.core.main.ServiceLaunchException;
+import org.apache.slider.server.exec.ApplicationEventHandler;
+import org.apache.slider.server.exec.RunLongLivedApp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Service wrapper for an external program that is launched and can/will terminate.
+ * This service is notified when the subprocess terminates, and stops itself
+ * and converts a non-zero exit code into a failure exception
+ */
+public class ForkedProcessService extends AbstractService implements
+ ApplicationEventHandler,
+ ExitCodeProvider,
+ Runnable {
+
+ /**
+ * Log for the forked master process
+ */
+ protected static final Logger log =
+ LoggerFactory.getLogger(ForkedProcessService.class);
+
+ private final String name;
+ private final AtomicBoolean processTerminated = new AtomicBoolean(false);
+ ;
+ private boolean processStarted = false;
+ private RunLongLivedApp process;
+ private Map<String, String> environment;
+ private List<String> commands;
+ private String commandLine;
+ private int executionTimeout = -1;
+ private int timeoutCode = 1;
+
+ /**
+ * Exit code set when the spawned process exits
+ */
+ private AtomicInteger exitCode = new AtomicInteger(0);
+ private Thread timeoutThread;
+
+ public ForkedProcessService(String name) {
+ super(name);
+ this.name = name;
+ }
+
+ @Override //AbstractService
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override //AbstractService
+ protected void serviceStart() throws Exception {
+ if (process == null) {
+ throw new ServiceStateException("Subprocess not yet configured");
+ }
+ //now spawn the process -expect updates via callbacks
+ process.spawnApplication();
+ }
+
+ @Override //AbstractService
+ protected void serviceStop() throws Exception {
+ completed(0);
+ if (process != null) {
+ process.stop();
+ }
+ }
+
+ /**
+ * Set the timeout by which time a process must have finished -or -1 for forever
+ * @param timeout timeout in milliseconds
+ */
+ public void setTimeout(int timeout, int code) {
+ this.executionTimeout = timeout;
+ this.timeoutCode = code;
+ }
+
+ /**
+ * Build the process to execute when the service is started
+ * @param commands list of commands is inserted on the front
+ * @param env environment variables above those generated by
+ * @throws IOException IO problems
+ * @throws SliderException anything internal
+ */
+ public void build(Map<String, String> environment,
+ List<String> commands) throws
+ IOException,
+ SliderException {
+ assert process == null;
+ this.commands = commands;
+ this.commandLine = SliderUtils.join(commands, " ", false);
+ this.environment = environment;
+ process = new RunLongLivedApp(log, commands);
+ process.setApplicationEventHandler(this);
+ //set the env variable mapping
+ process.putEnvMap(environment);
+ }
+
+ @Override // ApplicationEventHandler
+ public synchronized void onApplicationStarted(RunLongLivedApp application) {
+ log.info("Process has started");
+ processStarted = true;
+ if (executionTimeout > 0) {
+ timeoutThread = new Thread(this);
+ timeoutThread.start();
+ }
+ }
+
+ @Override // ApplicationEventHandler
+ public void onApplicationExited(RunLongLivedApp application,
+ int exitC) {
+ synchronized (this) {
+ completed(exitC);
+ //note whether or not the service had already stopped
+ log.info("Process has exited with exit code {}", exitC);
+ if (exitC != 0) {
+ reportFailure(exitC, name + " failed with code " +
+ exitC);
+ }
+ }
+ //now stop itself
+ if (!isInState(STATE.STOPPED)) {
+ stop();
+ }
+ }
+
+ private void reportFailure(int exitC, String text) {
+ this.exitCode.set(exitC);
+ //error
+ ServiceLaunchException execEx =
+ new ServiceLaunchException(exitC,
+ text);
+ log.debug("Noting failure", execEx);
+ noteFailure(execEx);
+ }
+
+ /**
+ * handle timeout response by escalating it to a failure
+ */
+ @Override
+ public void run() {
+ try {
+ synchronized (processTerminated) {
+ if (!processTerminated.get()) {
+ processTerminated.wait(executionTimeout);
+ }
+ }
+
+ } catch (InterruptedException e) {
+ //assume signalled; exit
+ }
+ //check the status; if the marker isn't true, bail
+ if (!processTerminated.getAndSet(true)) {
+ log.info("process timeout: reporting error code {}", timeoutCode);
+
+ //timeout
+ if (isInState(STATE.STARTED)) {
+ //trigger a failure
+ process.stop();
+ }
+ reportFailure(timeoutCode, name + ": timeout after " + executionTimeout
+ + " millis: exit code =" + timeoutCode);
+ }
+ }
+
+ protected void completed(int exitCode) {
+ this.exitCode.set(exitCode);
+ processTerminated.set(true);
+ synchronized (processTerminated) {
+ processTerminated.notify();
+ }
+ }
+
+ public boolean isProcessTerminated() {
+ return processTerminated.get();
+ }
+
+ public synchronized boolean isProcessStarted() {
+ return processStarted;
+ }
+
+
+ @Override // ExitCodeProvider
+ public int getExitCode() {
+ return exitCode.get();
+ }
+
+ public String getCommandLine() {
+ return commandLine;
+ }
+
+ /**
+ * Get the recent output from the process, or [] if not defined
+ * @return a possibly empty list
+ */
+ public List<String> getRecentOutput() {
+ return process != null
+ ? process.getRecentOutput()
+ : new LinkedList<String>();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
new file mode 100644
index 0000000..8bad60e
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+
+import java.util.List;
+
+/**
+ * Interface for accessing services that contain one or more child
+ * services.
+ */
+public interface ServiceParent extends Service {
+
+ void addService(Service service);
+
+ /**
+ * Get an unmodifiable list of services
+ * @return a list of child services at the time of invocation -
+ * added services will not be picked up.
+ */
+ List<Service> getServices();
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
new file mode 100644
index 0000000..8c62525
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * An extended composite service which stops itself if any child service
+ * fails, or when all its children have successfully stopped without failure.
+ *
+ * Lifecycle
+ * <ol>
+ * <li>If any child exits with a failure: this service stops, propagating
+ * the exception.</li>
+ * <li>When all child services has stopped, this service stops itself</li>
+ * </ol>
+ *
+ */
+public class WorkflowCompositeService extends CompositeService
+ implements ServiceParent, ServiceStateChangeListener {
+
+ private static final Logger log =
+ LoggerFactory.getLogger(WorkflowCompositeService.class);
+
+ public WorkflowCompositeService(String name) {
+ super(name);
+ }
+
+
+ public WorkflowCompositeService() {
+ this("WorkflowCompositeService");
+ }
+
+ /**
+ * Varargs constructor
+ * @param children children
+ */
+ public WorkflowCompositeService(String name, Service... children) {
+ this(name);
+ for (Service child : children) {
+ addService(child);
+ }
+ }
+
+ /**
+ * Varargs constructor
+ * @param children children
+ */
+ public WorkflowCompositeService(Service... children) {
+ this("WorkflowCompositeService", children);
+ }
+
+ /**
+ * Add a service, and register it
+ * @param service the {@link Service} to be added.
+ * Important: do not add a service to a parent during your own serviceInit/start,
+ * in Hadoop 2.2; you will trigger a ConcurrentModificationException.
+ */
+ @Override
+ public synchronized void addService(Service service) {
+ service.registerServiceListener(this);
+ super.addService(service);
+ }
+
+ /**
+ * When this service is started, any service stopping with a failure
+ * exception is converted immediately into a failure of this service,
+ * storing the failure and stopping ourselves.
+ * @param child the service that has changed.
+ */
+ @Override
+ public void stateChanged(Service child) {
+ //if that child stopped while we are running:
+ if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) {
+ // a child service has stopped
+ //did the child fail? if so: propagate
+ Throwable failureCause = child.getFailureCause();
+ if (failureCause != null) {
+ log.info("Child service " + child + " failed", failureCause);
+ //failure. Convert to an exception
+ Exception e = (failureCause instanceof Exception) ?
+ (Exception) failureCause
+ : new Exception(
+ failureCause);
+ //flip ourselves into the failed state
+ noteFailure(e);
+ stop();
+ } else {
+ log.info("Child service completed {}", child);
+ if (areAllChildrenStopped()) {
+ log.info("All children are halted: stopping");
+ stop();
+ }
+ }
+ }
+ }
+
+ private boolean areAllChildrenStopped() {
+ List<Service> children = getServices();
+ boolean stopped = true;
+ for (Service child : children) {
+ if (!child.isInState(STATE.STOPPED)) {
+ stopped = false;
+ break;
+ }
+ }
+ return stopped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
new file mode 100644
index 0000000..dc51bac
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+public interface WorkflowEventCallback {
+
+ public void eventCallbackEvent();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/8f4b8647/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
new file mode 100644
index 0000000..03f6945
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+/**
+ * A service that calls the supplied callback when it is started -after the
+ * given delay, then stops itself.
+ * Because it calls in on a different thread, it can be used for callbacks
+ * that don't
+ */
+public class WorkflowEventNotifyingService extends AbstractService implements Runnable {
+ protected static final Logger log =
+ LoggerFactory.getLogger(WorkflowEventNotifyingService.class);
+ private final WorkflowEventCallback callback;
+ private final int delay;
+ private Executor executor;
+
+
+ public WorkflowEventNotifyingService(String name,
+ WorkflowEventCallback callback, int delay) {
+ super(name);
+ Preconditions.checkNotNull(callback, "Null callback argument");
+ this.callback = callback;
+ this.delay = delay;
+ }
+
+ public WorkflowEventNotifyingService(WorkflowEventCallback callback, int delay) {
+ this("WorkflowEventNotifyingService", callback, delay);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ log.debug("Notifying {} after a delay of {} millis", callback, delay);
+ new Thread(this, getName()).start();
+ }
+
+ @Override
+ public void run() {
+ if (delay > 0) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ignored) {
+
+ }
+ }
+ log.debug("Notifying {}", callback);
+ callback.eventCallbackEvent();
+ stop();
+ }
+}