You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/05/30 20:57:16 UTC
[09/12] git commit: SLIDER-94 more workflow tests and javadocs;
sequence service now correctly terminates if started without any
child services
SLIDER-94 more workflow tests and javadocs; sequence service now correctly terminates if started without any child services
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/2cdb4b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/2cdb4b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/2cdb4b74
Branch: refs/heads/develop
Commit: 2cdb4b7442742cba8f2e77f01226a6873f8677d4
Parents: 21cdfbd
Author: Steve Loughran <st...@apache.org>
Authored: Fri May 30 18:46:34 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri May 30 18:46:34 2014 +0100
----------------------------------------------------------------------
.../providers/AbstractProviderService.java | 2 +-
.../server/appmaster/RoleLaunchService.java | 39 +++---
.../server/appmaster/SliderAppMaster.java | 6 +-
.../utility/AbstractSliderLaunchedService.java | 2 +-
.../server/services/utility/ClosingService.java | 58 ---------
.../LaunchedWorkflowCompositeService.java | 113 +++++++++++++++++
.../WorkflowCompositeLaunchedService.java | 117 ------------------
.../services/workflow/ClosingService.java | 58 +++++++++
.../services/workflow/ForkedProcessService.java | 104 +++++++---------
.../services/workflow/LongLivedProcess.java | 21 ++--
.../LongLivedProcessLifecycleEvent.java | 38 ++++++
.../workflow/ProcessLifecycleEventCallback.java | 29 -----
.../server/services/workflow/ServiceParent.java | 5 +
.../workflow/ServiceTerminatingRunnable.java | 57 +++++++++
.../services/workflow/ServiceThreadFactory.java | 39 +++++-
.../workflow/WorkflowCompositeService.java | 32 ++++-
.../workflow/WorkflowEventCallback.java | 6 +-
.../workflow/WorkflowEventNotifyingService.java | 73 +++++++----
.../workflow/WorkflowExecutorService.java | 73 +++++++++++
.../workflow/WorkflowSequenceService.java | 106 +++++++++++-----
.../server/services/workflow/package-info.java | 120 +++++++++++++++++++
.../workflow/TestWorkflowCompositeService.java | 15 +--
.../workflow/TestWorkflowSequenceService.java | 47 ++++++--
.../workflow/WorkflowServiceTestBase.java | 48 ++++++--
.../accumulo/AccumuloProviderService.java | 3 +-
25 files changed, 828 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index f4a872c..da9f1d1 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -181,7 +181,7 @@ public abstract class AbstractProviderService
* @return the forkes service
*/
protected ForkedProcessService latestProcess() {
- Service current = getCurrentService();
+ Service current = getActiveService();
Service prev = getPreviousService();
Service latest = current != null ? current : prev;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
index d90eeb6..7a09115 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
@@ -18,8 +18,8 @@
package org.apache.slider.server.appmaster;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.core.conf.AggregateConf;
@@ -29,6 +29,8 @@ import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.RoleStatus;
+import org.apache.slider.server.services.workflow.ServiceThreadFactory;
+import org.apache.slider.server.services.workflow.WorkflowExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,11 +38,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
/**
* A service for launching containers
*/
-public class RoleLaunchService extends AbstractService {
+public class RoleLaunchService extends WorkflowExecutorService {
protected static final Logger log =
LoggerFactory.getLogger(RoleLaunchService.class);
/**
@@ -101,14 +104,15 @@ public class RoleLaunchService extends AbstractService {
* @param provider the provider
* @param fs filesystem
* @param generatedConfDirPath path in the FS for the generated dir
- * @param envVars
- * @param launcherTmpDirPath
+ * @param envVars environment variables
+ * @param launcherTmpDirPath path for a temporary data in the launch process
*/
public RoleLaunchService(ContainerStartOperation startOperation,
ProviderService provider,
SliderFileSystem fs,
Path generatedConfDirPath,
- Map<String, String> envVars, Path launcherTmpDirPath) {
+ Map<String, String> envVars,
+ Path launcherTmpDirPath) {
super(ROLE_LAUNCH_SERVICE);
containerStarter = startOperation;
this.fs = fs;
@@ -118,6 +122,14 @@ public class RoleLaunchService extends AbstractService {
this.envVars = envVars;
}
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ setExecutor(Executors.newCachedThreadPool(
+ new ServiceThreadFactory(ROLE_LAUNCH_SERVICE, true)));
+ }
+
@Override
protected void serviceStop() throws Exception {
joinAllLaunchedThreads();
@@ -138,16 +150,13 @@ public class RoleLaunchService extends AbstractService {
assert provider.isSupportedRole(roleName) : "unsupported role";
RoleLaunchService.RoleLauncher launcher =
new RoleLaunchService.RoleLauncher(container,
- role.getProviderRole(),
- clusterSpec,
- clusterSpec.getResourceOperations()
- .getOrAddComponent(roleName),
- clusterSpec.getAppConfOperations()
- .getOrAddComponent(roleName) );
+ role.getProviderRole(),
+ clusterSpec,
+ clusterSpec.getResourceOperations() .getOrAddComponent( roleName),
+ clusterSpec.getAppConfOperations().getOrAddComponent(roleName));
launchThread(launcher,
- String.format("%s-%s", roleName,
- container.getId().toString())
- );
+ String.format("%s-%s", roleName, container.getId().toString())
+ );
}
@@ -190,7 +199,7 @@ public class RoleLaunchService extends AbstractService {
//first: take a snapshot of the thread list
List<Thread> liveThreads;
synchronized (launchThreads) {
- liveThreads = new ArrayList<Thread>(launchThreads.values());
+ liveThreads = new ArrayList<>(launchThreads.values());
}
int size = liveThreads.size();
if (size > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index d75c788..25a9c7a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1321,7 +1321,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// didn't start, so don't register
providerService.start();
// and send the started event ourselves
- eventCallbackEvent();
+ eventCallbackEvent(null);
}
}
@@ -1331,7 +1331,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/* =================================================================== */
@Override // EventCallback
- public void eventCallbackEvent() {
+ public void eventCallbackEvent(Object parameter) {
// signalled that the child process is up.
appState.noteAMLive();
// now ask for the cluster nodes
@@ -1382,6 +1382,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
exitCode,
mappedProcessExitCode);
}
+ } else {
+ super.stateChanged(service);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
index 6870be6..6c0edb8 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
@@ -37,7 +37,7 @@ import static org.apache.slider.common.SliderXmlConfKeys.REGISTRY_PATH;
* Base service for the standard slider client/server services
*/
public abstract class AbstractSliderLaunchedService extends
- WorkflowCompositeLaunchedService {
+ LaunchedWorkflowCompositeService {
private static final Logger log =
LoggerFactory.getLogger(AbstractSliderLaunchedService.class);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java
deleted file mode 100644
index 4696fce..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.service.AbstractService;
-
-import java.io.Closeable;
-
-/**
- * Service that closes the closeable supplied during shutdown, if not null.
- */
-public class ClosingService<C extends Closeable> extends AbstractService {
-
- private volatile C closeable;
-
-
- public ClosingService(String name,
- C closeable) {
- super(name);
- this.closeable = closeable;
- }
-
- public Closeable getCloseable() {
- return closeable;
- }
-
- public void setCloseable(C closeable) {
- this.closeable = closeable;
- }
-
- /**
- * Stop routine will close the closeable -if not null - and set the
- * reference to null afterwards
- * @throws Exception
- */
- @Override
- protected void serviceStop() throws Exception {
- IOUtils.closeStream(closeable);
- closeable = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
new file mode 100644
index 0000000..b5d11e7
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.utility;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.slider.core.main.LauncherExitCodes;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.server.services.workflow.WorkflowCompositeService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LaunchedWorkflowCompositeService extends WorkflowCompositeService
+ implements RunService {
+ private static final Logger log = LoggerFactory.getLogger(
+ LaunchedWorkflowCompositeService.class);
+ private String[] argv;
+
+ public LaunchedWorkflowCompositeService(String name) {
+ super(name);
+ }
+
+ public LaunchedWorkflowCompositeService(String name, Service... children) {
+ super(name, children);
+ }
+
+ /**
+ * Implementation of set-ness, groovy definition of true/false for a string
+ * @param s
+ * @return
+ */
+ protected static boolean isUnset(String s) {
+ return StringUtils.isEmpty(s);
+ }
+
+ protected static boolean isSet(String s) {
+ return StringUtils.isNotEmpty(s);
+ }
+
+ protected String[] getArgv() {
+ return argv;
+ }
+
+ /**
+ * Pre-init argument binding
+ * @param config the initial configuration build up by the
+ * service launcher.
+ * @param args argument list list of arguments passed to the command line
+ * after any launcher-specific commands have been stripped.
+ * @return the configuration
+ * @throws Exception
+ */
+ @Override
+ public Configuration bindArgs(Configuration config, String... args) throws
+ Exception {
+ this.argv = args;
+ if (log.isDebugEnabled()) {
+ log.debug("Binding {} Arguments:", args.length);
+
+ StringBuilder builder = new StringBuilder();
+ for (String arg : args) {
+ builder.append('"').append(arg).append("\" ");
+ }
+ log.debug(builder.toString());
+ }
+ return config;
+ }
+
+ @Override
+ public int runService() throws Throwable {
+ return LauncherExitCodes.EXIT_SUCCESS;
+ }
+
+ @Override
+ public synchronized void addService(Service service) {
+ Preconditions.checkNotNull(service, "null service");
+ super.addService(service);
+ }
+
+ /**
+ * Run a child service -initing and starting it if this
+ * service has already passed those parts of its own lifecycle
+ * @param service the service to start
+ */
+ protected boolean deployChildService(Service service) {
+ service.init(getConfig());
+ addService(service);
+ if (isInState(STATE.STARTED)) {
+ service.start();
+ return true;
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
deleted file mode 100644
index 6e9614b..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.slider.core.main.LauncherExitCodes;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.server.services.workflow.WorkflowCompositeService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkflowCompositeLaunchedService extends WorkflowCompositeService
- implements RunService {
- private static final Logger log = LoggerFactory.getLogger(
- WorkflowCompositeLaunchedService.class);
- private String[] argv;
-
- public WorkflowCompositeLaunchedService(String name) {
- super(name);
- }
-
- public WorkflowCompositeLaunchedService() {
- super("WorkflowCompositeLaunchedService");
- }
-
- public WorkflowCompositeLaunchedService(String name, Service... children) {
- super(name, children);
- }
-
- /**
- * Implementation of set-ness, groovy definition of true/false for a string
- * @param s
- * @return
- */
- protected static boolean isUnset(String s) {
- return StringUtils.isEmpty(s);
- }
-
- protected static boolean isSet(String s) {
- return StringUtils.isNotEmpty(s);
- }
-
- protected String[] getArgv() {
- return argv;
- }
-
- /**
- * Pre-init argument binding
- * @param config the initial configuration build up by the
- * service launcher.
- * @param args argument list list of arguments passed to the command line
- * after any launcher-specific commands have been stripped.
- * @return the configuration
- * @throws Exception
- */
- @Override
- public Configuration bindArgs(Configuration config, String... args) throws
- Exception {
- this.argv = args;
- if (log.isDebugEnabled()) {
- log.debug("Binding {} Arguments:", args.length);
-
- StringBuilder builder = new StringBuilder();
- for (String arg : args) {
- builder.append('"').append(arg).append("\" ");
- }
- log.debug(builder.toString());
- }
- return config;
- }
-
- @Override
- public int runService() throws Throwable {
- return LauncherExitCodes.EXIT_SUCCESS;
- }
-
- @Override
- public synchronized void addService(Service service) {
- Preconditions.checkNotNull(service, "null service");
- super.addService(service);
- }
-
- /**
- * Run a child service -initing and starting it if this
- * service has already passed those parts of its own lifecycle
- * @param service the service to start
- */
- protected boolean deployChildService(Service service) {
- service.init(getConfig());
- addService(service);
- if (isInState(STATE.STARTED)) {
- service.start();
- return true;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
new file mode 100644
index 0000000..7c9054c
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.io.Closeable;
+
+/**
+ * Service that closes the closeable supplied during shutdown, if not null.
+ */
+public class ClosingService<C extends Closeable> extends AbstractService {
+
+ private volatile C closeable;
+
+
+ public ClosingService(String name,
+ C closeable) {
+ super(name);
+ this.closeable = closeable;
+ }
+
+ public Closeable getCloseable() {
+ return closeable;
+ }
+
+ public void setCloseable(C closeable) {
+ this.closeable = closeable;
+ }
+
+ /**
+ * Stop routine will close the closeable -if not null - and set the
+ * reference to null afterwards
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ IOUtils.closeStream(closeable);
+ closeable = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index 3ddeb2e..1d6ccae 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -18,12 +18,7 @@
package org.apache.slider.server.services.workflow;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.ServiceStateException;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.main.ServiceLaunchException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,25 +35,20 @@ import java.util.concurrent.atomic.AtomicInteger;
* This service is notified when the subprocess terminates, and stops itself
* and converts a non-zero exit code into a failure exception
*/
-public class ForkedProcessService extends AbstractService implements
- ProcessLifecycleEventCallback,
- ExitCodeProvider,
- Runnable {
+public class ForkedProcessService extends WorkflowExecutorService implements
+ LongLivedProcessLifecycleEvent, Runnable {
/**
* Log for the forked master process
*/
- protected static final Logger log =
+ private static final Logger LOG =
LoggerFactory.getLogger(ForkedProcessService.class);
- private final String name;
private final AtomicBoolean processTerminated = new AtomicBoolean(false);
- ;
private boolean processStarted = false;
private LongLivedProcess process;
private Map<String, String> environment;
private List<String> commands;
- private String commandLine;
private int executionTimeout = -1;
private int timeoutCode = 1;
@@ -66,22 +56,19 @@ public class ForkedProcessService extends AbstractService implements
* Exit code set when the spawned process exits
*/
private AtomicInteger exitCode = new AtomicInteger(0);
- private Thread timeoutThread;
+ /**
+ * Create an instance of the service
+ * @param name a name
+ */
public ForkedProcessService(String name) {
super(name);
- this.name = name;
- }
-
- @Override //AbstractService
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
}
@Override //AbstractService
protected void serviceStart() throws Exception {
if (process == null) {
- throw new ServiceStateException("Subprocess not yet configured");
+ throw new ServiceStateException("Process not yet configured");
}
//now spawn the process -expect updates via callbacks
process.spawnApplication();
@@ -90,6 +77,10 @@ public class ForkedProcessService extends AbstractService implements
@Override //AbstractService
protected void serviceStop() throws Exception {
completed(0);
+ stopForkedProcess();
+ }
+
+ private void stopForkedProcess() {
if (process != null) {
process.stop();
}
@@ -106,45 +97,40 @@ public class ForkedProcessService extends AbstractService implements
/**
* Build the process to execute when the service is started
- * @param commands list of commands is inserted on the front
+ * @param commandList list of commands is inserted on the front
* @param env environment variables above those generated by
* @throws IOException IO problems
- * @throws SliderException anything internal
*/
- public void build(Map<String, String> environment,
- List<String> commands) throws
- IOException,
- SliderException {
+ public void build(Map<String, String> env,
+ List<String> commandList)
+ throws IOException {
assert process == null;
- this.commands = commands;
- this.commandLine = SliderUtils.join(commands, " ", false);
- this.environment = environment;
- process = new LongLivedProcess(getName(), log, commands);
+ this.commands = commandList;
+ this.environment = env;
+ process = new LongLivedProcess(getName(), LOG, commandList);
process.setLifecycleCallback(this);
//set the env variable mapping
- process.putEnvMap(environment);
+ process.putEnvMap(env);
}
@Override // ApplicationEventHandler
- public synchronized void onProcessStarted(LongLivedProcess application) {
- log.info("Process has started");
+ public synchronized void onProcessStarted(LongLivedProcess process) {
+ LOG.info("Process has started");
processStarted = true;
if (executionTimeout > 0) {
- timeoutThread = new Thread(this);
- timeoutThread.start();
+ setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+ execute(this);
}
}
@Override // ApplicationEventHandler
- public void onProcessExited(LongLivedProcess application,
- int exitC) {
+ public void onProcessExited(LongLivedProcess process, int code) {
synchronized (this) {
- completed(exitC);
+ completed(code);
//note whether or not the service had already stopped
- log.info("Process has exited with exit code {}", exitC);
- if (exitC != 0) {
- reportFailure(exitC, name + " failed with code " +
- exitC);
+ LOG.info("Process has exited with exit code {}", code);
+ if (code != 0) {
+ reportFailure(code, getName() + " failed with code " + code);
}
}
//now stop itself
@@ -153,13 +139,11 @@ public class ForkedProcessService extends AbstractService implements
}
}
- private void reportFailure(int exitC, String text) {
- this.exitCode.set(exitC);
+ private void reportFailure(int code, String text) {
+ this.exitCode.set(code);
//error
- ServiceLaunchException execEx =
- new ServiceLaunchException(exitC,
- text);
- log.debug("Noting failure", execEx);
+ ServiceLaunchException execEx = new ServiceLaunchException(code, text);
+ LOG.debug("Noting failure", execEx);
noteFailure(execEx);
}
@@ -180,20 +164,27 @@ public class ForkedProcessService extends AbstractService implements
}
//check the status; if the marker isn't true, bail
if (!processTerminated.getAndSet(true)) {
- log.info("process timeout: reporting error code {}", timeoutCode);
+ LOG.info("process timeout: reporting error code {}", timeoutCode);
//timeout
if (isInState(STATE.STARTED)) {
//trigger a failure
- process.stop();
+ stopForkedProcess();
}
- reportFailure(timeoutCode, name + ": timeout after " + executionTimeout
+ reportFailure(timeoutCode, getName() + ": timeout after " + executionTimeout
+ " millis: exit code =" + timeoutCode);
}
}
- protected void completed(int exitCode) {
- this.exitCode.set(exitCode);
+ /**
+ * Note the process as having completed.
+ * The exit code is stored, the process marked as terminated
+ * -and anything synchronized on <code>processTerminated</code>
+ * is notified
+ * @param code exit code
+ */
+ protected void completed(int code) {
+ exitCode.set(code);
processTerminated.set(true);
synchronized (processTerminated) {
processTerminated.notify();
@@ -209,15 +200,10 @@ public class ForkedProcessService extends AbstractService implements
}
- @Override // ExitCodeProvider
public int getExitCode() {
return exitCode.get();
}
- public String getCommandLine() {
- return commandLine;
- }
-
/**
* Get the recent output from the process, or [] if not defined
* @return a possibly empty list
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 12fe3f4..22a4c00 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -39,13 +39,14 @@ import java.util.concurrent.TimeUnit;
/**
* Execute a long-lived process.
*
- * Hadoop's Shell class assumes it is executing
+ * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
* a short lived application; this class allows for the process to run for the
* life of the Java process that forked it.
*/
public class LongLivedProcess implements Runnable {
public static final int STREAM_READER_SLEEP_TIME = 200;
public static final int RECENT_LINE_LOG_LIMIT = 64;
+ public static final int LINE_LENGTH = 256;
private final ProcessBuilder processBuilder;
private Process process;
private Exception exception;
@@ -58,7 +59,7 @@ public class LongLivedProcess implements Runnable {
//list of recent lines, recorded for extraction into reports
private final List<String> recentLines = new LinkedList<String>();
private final int recentLineLimit = RECENT_LINE_LOG_LIMIT;
- private ProcessLifecycleEventCallback lifecycleCallback;
+ private LongLivedProcessLifecycleEvent lifecycleCallback;
/**
@@ -103,7 +104,7 @@ public class LongLivedProcess implements Runnable {
* Set an optional application exit callback
* @param lifecycleCallback callback to notify on application exit
*/
- public void setLifecycleCallback(ProcessLifecycleEventCallback lifecycleCallback) {
+ public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
Preconditions.checkNotNull(lifecycleCallback, "null lifecycleCallback");
this.lifecycleCallback = lifecycleCallback;
}
@@ -330,6 +331,12 @@ public class LongLivedProcess implements Runnable {
this.sleepTime = sleepTime;
}
+ /**
+ * Return a character if there is one, -1 if nothing is ready yet
+ * @param reader reader
+ * @return the value from the reader, or -1 if it is not ready
+ * @throws IOException IO problems
+ */
private int readCharNonBlocking(BufferedReader reader) throws IOException {
if (reader.ready()) {
return reader.read();
@@ -376,8 +383,8 @@ public class LongLivedProcess implements Runnable {
public void run() {
BufferedReader errReader = null;
BufferedReader outReader = null;
- StringBuilder outLine = new StringBuilder(256);
- StringBuilder errorLine = new StringBuilder(256);
+ StringBuilder outLine = new StringBuilder(LINE_LENGTH);
+ StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
try {
errReader = new BufferedReader(
new InputStreamReader(process.getErrorStream()));
@@ -385,14 +392,14 @@ public class LongLivedProcess implements Runnable {
new InputStreamReader(process.getInputStream()));
while (!finished) {
boolean processed = false;
- if (readAnyLine(errReader, errorLine, 256)) {
+ if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
String line = errorLine.toString();
recordRecentLine(line, true);
streamLog.warn(line);
errorLine.setLength(0);
processed = true;
}
- if (readAnyLine(outReader, outLine, 256)) {
+ if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
String line = outLine.toString();
recordRecentLine(line, false);
streamLog.info(line);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
new file mode 100644
index 0000000..af83ed0
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+ * Callback when a long-lived application exits
+ */
+public interface LongLivedProcessLifecycleEvent {
+
+ /**
+ * Callback when a process is started
+ * @param process the process invoking the callback
+ */
+ void onProcessStarted(LongLivedProcess process);
+
+ /**
+ * Callback when a process has finished
+ * @param process the process invoking the callback
+ * @param exitCode exit code from the process
+ */
+ void onProcessExited(LongLivedProcess process, int exitCode);
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java
deleted file mode 100644
index 2dd2a2b..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-/**
- * Callback when a long-lived application exits
- */
-public interface ProcessLifecycleEventCallback {
-
- void onProcessStarted(LongLivedProcess application);
-
- void onProcessExited(LongLivedProcess application, int exitCode);
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
index 8bad60e..a123584 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
@@ -28,6 +28,11 @@ import java.util.List;
*/
public interface ServiceParent extends Service {
+ /**
+ * Add a child service. It must be in a consistent state with the
+ * service to which it is being added.
+ * @param service the service to add.
+ */
void addService(Service service);
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
new file mode 100644
index 0000000..f2563ec
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A runnable which terminates its owner.
+ */
+public class ServiceTerminatingRunnable implements Runnable {
+
+ private final Service owner;
+ private final Runnable action;
+ private Exception exception;
+
+ public ServiceTerminatingRunnable(Service owner, Runnable action) {
+ Preconditions.checkNotNull(owner, "null owner");
+ Preconditions.checkNotNull(action, "null action");
+ this.owner = owner;
+ this.action = action;
+ }
+
+ public Service getOwner() {
+ return owner;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ @Override
+ public void run() {
+ try {
+ action.run();
+ } catch (Exception e) {
+ exception = e;
+ }
+ owner.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
index be7c54d..b1d235d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.slider.server.services.workflow;
import com.google.common.base.Preconditions;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,16 +29,34 @@ import java.util.concurrent.atomic.AtomicInteger;
* A thread factory that creates threads (possibly daemon threads)
* using the name and naming policy supplied.
* The thread counter starts at 1, increments atomically,
- * and is supplied as the second argument in the format string
+ * and is supplied as the second argument in the format string.
+ *
+ * A static method, {@link #newSingleThreadExecutor(String, boolean)},
+ * exists to simplify the construction of an executor with a single well-named
+ * threads.
+ *
+ * Example
+ * <pre>
+ * ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
+ * </pre>
*/
public class ServiceThreadFactory implements ThreadFactory {
private static AtomicInteger counter = new AtomicInteger(1);
+ /**
+ * Default format for thread names: {@value}
+ */
public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
private final String name;
private final boolean daemons;
private final String namingFormat;
+ /**
+ * Create an instance
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ * @param namingFormat format string to generate thread names from
+ */
public ServiceThreadFactory(String name,
boolean daemons,
String namingFormat) {
@@ -47,6 +67,12 @@ public class ServiceThreadFactory implements ThreadFactory {
this.namingFormat = namingFormat;
}
+ /**
+ *
+ * Create an instance with the default naming format
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ */
public ServiceThreadFactory(String name,
boolean daemons) {
this(name, daemons, DEFAULT_NAMING_FORMAT);
@@ -60,4 +86,15 @@ public class ServiceThreadFactory implements ThreadFactory {
return new Thread(r, threadName);
}
+ /**
+ * Create a single thread executor using this naming policy
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ * @return an executor
+ */
+ public static ExecutorService newSingleThreadExecutor(String name,
+ boolean daemons) {
+ return Executors.newSingleThreadExecutor(
+ new ServiceThreadFactory(name, daemons));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
index c9df6c5..e556422 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
@@ -18,6 +18,7 @@
package org.apache.slider.server.services.workflow;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
@@ -41,20 +42,28 @@ import java.util.List;
public class WorkflowCompositeService extends CompositeService
implements ServiceParent, ServiceStateChangeListener {
- private static final Logger log =
+ private static final Logger LOG =
LoggerFactory.getLogger(WorkflowCompositeService.class);
+ /**
+ * Construct an instance
+ * @param name name of this service instance
+ */
public WorkflowCompositeService(String name) {
super(name);
}
+ /**
+ * Construct an instance with the default name.
+ */
public WorkflowCompositeService() {
this("WorkflowCompositeService");
}
/**
* Varargs constructor
+ * @param name name of this service instance
* @param children children
*/
public WorkflowCompositeService(String name, Service... children) {
@@ -63,9 +72,11 @@ public class WorkflowCompositeService extends CompositeService
addService(child);
}
}
+
/**
- * Varargs constructor
- * @param children children
+ * Construct with a list of children
+ * @param name name of this service instance
+ * @param children children to add
*/
public WorkflowCompositeService(String name, List<Service> children) {
this(name);
@@ -82,6 +93,7 @@ public class WorkflowCompositeService extends CompositeService
*/
@Override
public synchronized void addService(Service service) {
+ Preconditions.checkNotNull(service, "null service argument");
service.registerServiceListener(this);
super.addService(service);
}
@@ -100,7 +112,7 @@ public class WorkflowCompositeService extends CompositeService
//did the child fail? if so: propagate
Throwable failureCause = child.getFailureCause();
if (failureCause != null) {
- log.info("Child service " + child + " failed", failureCause);
+ LOG.info("Child service " + child + " failed", failureCause);
//failure. Convert to an exception
Exception e = (failureCause instanceof Exception) ?
(Exception) failureCause : new Exception(failureCause);
@@ -108,15 +120,23 @@ public class WorkflowCompositeService extends CompositeService
noteFailure(e);
stop();
} else {
- log.info("Child service completed {}", child);
+ LOG.info("Child service completed {}", child);
if (areAllChildrenStopped()) {
- log.info("All children are halted: stopping");
+ LOG.info("All children are halted: stopping");
stop();
}
}
}
}
+ /**
+ * Probe to query if all children are stopped -simply
+ * by taking a snapshot of the child service list and enumerating
+ * their state.
+ * The state of the children may change during this operation -that will
+ * not get picked up.
+ * @return true if all the children are stopped.
+ */
private boolean areAllChildrenStopped() {
List<Service> children = getServices();
boolean stopped = true;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
index dc51bac..a0f954e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
@@ -18,8 +18,12 @@
package org.apache.slider.server.services.workflow;
+/**
+ * This is the callback triggered by the {@link WorkflowEventNotifyingService}
+ * when it generates a notification
+ */
public interface WorkflowEventCallback {
- public void eventCallbackEvent();
+ public void eventCallbackEvent(Object parameter);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
index a9244eb..d504d13 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -19,65 +19,94 @@
package org.apache.slider.server.services.workflow;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* A service that calls the supplied callback when it is started -after the
* given delay, then stops itself.
- * The notifications come in on a different thread
+ * The notifications come in on a callback thread -a thread that is only
+ * started in this service's <code>start()</code> operation.
*/
-public class WorkflowEventNotifyingService extends AbstractService implements Runnable {
- protected static final Logger log =
+public class WorkflowEventNotifyingService extends WorkflowExecutorService
+ implements Runnable {
+ protected static final Logger LOG =
LoggerFactory.getLogger(WorkflowEventNotifyingService.class);
private final WorkflowEventCallback callback;
private final int delay;
- private ExecutorService executor;
+ private final ServiceTerminatingRunnable command;
+ private final Object parameter;
+ /**
+ * Create an instance of the service
+ * @param name service name
+ * @param callback callback to invoke
+ * @param parameter optional parameter for the callback
+ * @param delay delay -or 0 for no delay
+ */
public WorkflowEventNotifyingService(String name,
- WorkflowEventCallback callback, int delay) {
+ WorkflowEventCallback callback,
+ Object parameter,
+ int delay) {
super(name);
Preconditions.checkNotNull(callback, "Null callback argument");
this.callback = callback;
this.delay = delay;
+ this.parameter = parameter;
+ command = new ServiceTerminatingRunnable(this, this);
}
- public WorkflowEventNotifyingService(WorkflowEventCallback callback, int delay) {
- this("WorkflowEventNotifyingService", callback, delay);
+ /**
+ * Create an instance of the service
+ * @param callback callback to invoke
+ * @param parameter optional parameter for the callback
+ * @param delay delay -or 0 for no delay
+ */
+ public WorkflowEventNotifyingService(WorkflowEventCallback callback,
+ Object parameter,
+ int delay) {
+ this("WorkflowEventNotifyingService", callback, parameter, delay);
}
@Override
protected void serviceStart() throws Exception {
- log.debug("Notifying {} after a delay of {} millis", callback, delay);
- executor = Executors.newSingleThreadExecutor(
- new ServiceThreadFactory(getName(), true));
- executor.execute(this);
+ LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
+ setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+ execute(command);
}
+ /**
+ * Stop the service.
+ * If there is any exception noted from any executed notification,
+ * note the exception in this class
+ * @throws Exception exception.
+ */
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
- if (executor != null) {
- executor.shutdownNow();
+ // propagate any failure
+ if (command != null && command.getException() != null) {
+ noteFailure(command.getException());
}
}
+ /**
+ * Perform the work in a different thread. Relies on
+ * the {@link ServiceTerminatingRunnable} to trigger
+ * the service halt on this thread.
+ */
@Override // Runnable
public void run() {
if (delay > 0) {
try {
Thread.sleep(delay);
- } catch (InterruptedException interrupted) {
- return;
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted: {} in runnable", e, e);
}
}
- log.debug("Notifying {}", callback);
- callback.eventCallbackEvent();
- stop();
+ LOG.debug("Notifying {}", callback);
+ callback.eventCallbackEvent(parameter);
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
new file mode 100644
index 0000000..a61ac05
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.AbstractService;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A service that hosts an executor -in shutdown it is stopped.
+ */
+public class WorkflowExecutorService extends AbstractService {
+
+ private ExecutorService executor;
+
+ public WorkflowExecutorService(String name) {
+ super(name);
+ }
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ protected void setExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Execute the runnable with the executor (which
+ * must have been created already)
+ * @param runnable runnable to execute
+ */
+ protected void execute(Runnable runnable) {
+ executor.execute(runnable);
+ }
+
+ /**
+ * Stop the service: halt the executor.
+ * @throws Exception exception.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ stopExecutor();
+ }
+
+ /**
+ * Stop the executor if it is not null.
+ * This uses {@link ExecutorService#shutdownNow()}
+ * and so does not block until they have completed.
+ */
+ protected void stopExecutor() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
index 946166b..c42e784 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
@@ -18,6 +18,7 @@
package org.apache.slider.server.services.workflow;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceStateChangeListener;
@@ -31,15 +32,34 @@ import java.util.List;
/**
* This resembles the YARN CompositeService, except that it
- * starts one service after another: it's init & start operations
- * only work with one service
+ * starts one service after another
+ *
+ * Workflow
+ * <ol>
+ * <li>When the <code>WorkflowSequenceService</code> instance is
+ * initialized, it only initializes itself.</li>
+ *
+ * <li>When the <code>WorkflowSequenceService</code> instance is
+ * started, it initializes then starts the first of its children.
+ * If there are no children, it immediately stops.</li>
+ *
+ * <li>When the active child stops, it did not fail, and the parent has not
+ * stopped -then the next service is initialized and started. If there is no
+ * remaining child the parent service stops.</li>
+ *
+ * <li>If the active child did fail, the parent service notes the exception
+ * and stops -effectively propagating up the failure.
+ * </li>
+ * </ol>
+ *
+ * New service instances MAY be added to a running instance -but no guarantees
+ * can be made as to whether or not they will be run.
*/
public class WorkflowSequenceService extends AbstractService implements
- ServiceParent,
- ServiceStateChangeListener {
+ ServiceParent, ServiceStateChangeListener {
- private static final Logger log =
+ private static final Logger LOG =
LoggerFactory.getLogger(WorkflowSequenceService.class);
/**
@@ -48,21 +68,29 @@ public class WorkflowSequenceService extends AbstractService implements
private final List<Service> serviceList = new ArrayList<Service>();
/**
- * The current service.
+ * The currently active service.
* Volatile -may change & so should be read into a
* local variable before working with
*/
- private volatile Service currentService;
- /*
+ private volatile Service activeService;
+
+ /**
the previous service -the last one that finished.
- Null if one did not finish yet
+ null if one did not finish yet
*/
private volatile Service previousService;
+ /**
+ * Construct an instance
+ * @param name service name
+ */
public WorkflowSequenceService(String name) {
super(name);
}
+ /**
+ * Construct an instance with the default name
+ */
public WorkflowSequenceService() {
this("WorkflowSequenceService");
}
@@ -70,11 +98,21 @@ public class WorkflowSequenceService extends AbstractService implements
/**
* Create a service sequence with the given list of services
* @param name service name
- * @param offspring initial sequence
+ * @param children initial sequence
*/
- public WorkflowSequenceService(String name, Service... offspring) {
+ public WorkflowSequenceService(String name, Service... children) {
super(name);
- for (Service service : offspring) {
+ for (Service service : children) {
+ addService(service);
+ }
+ } /**
+ * Create a service sequence with the given list of services
+ * @param name service name
+ * @param children initial sequence
+ */
+ public WorkflowSequenceService(String name, List<Service> children) {
+ super(name);
+ for (Service service : children) {
addService(service);
}
}
@@ -83,10 +121,14 @@ public class WorkflowSequenceService extends AbstractService implements
* Get the current service -which may be null
* @return service running
*/
- public Service getCurrentService() {
- return currentService;
+ public Service getActiveService() {
+ return activeService;
}
+ /**
+ * Get the previously active service
+ * @return the service last run, or null if there is none.
+ */
public Service getPreviousService() {
return previousService;
}
@@ -97,16 +139,19 @@ public class WorkflowSequenceService extends AbstractService implements
*/
@Override
protected void serviceStart() throws Exception {
- startNextService();
+ if (!startNextService()) {
+ //nothing to start -so stop
+ stop();
+ }
}
@Override
protected void serviceStop() throws Exception {
//stop current service.
//this triggers a callback that is caught and ignored
- Service current = currentService;
+ Service current = activeService;
previousService = current;
- currentService = null;
+ activeService = null;
if (current != null) {
current.stop();
}
@@ -124,7 +169,7 @@ public class WorkflowSequenceService extends AbstractService implements
public synchronized boolean startNextService() {
if (isInState(STATE.STOPPED)) {
//downgrade to a failed
- log.debug("Not starting next service -{} is stopped", this);
+ LOG.debug("Not starting next service -{} is stopped", this);
return false;
}
if (!isInState(STATE.STARTED)) {
@@ -136,10 +181,10 @@ public class WorkflowSequenceService extends AbstractService implements
//nothing left to run
return false;
}
- if (currentService != null && currentService.getFailureCause() != null) {
+ if (activeService != null && activeService.getFailureCause() != null) {
//did the last service fail? Is this caused by some premature callback?
- log.debug("Not starting next service due to a failure of {}",
- currentService);
+ LOG.debug("Not starting next service due to a failure of {}",
+ activeService);
return false;
}
//bear in mind that init & start can fail, which
@@ -148,7 +193,7 @@ public class WorkflowSequenceService extends AbstractService implements
//the start-next-service logic is skipped.
//now, what does that mean w.r.t exit states?
- currentService = null;
+ activeService = null;
Service head = serviceList.remove(0);
try {
@@ -161,7 +206,7 @@ public class WorkflowSequenceService extends AbstractService implements
}
//at this point the service must have explicitly started & not failed,
//else an exception would have been raised
- currentService = head;
+ activeService = head;
return true;
}
@@ -175,7 +220,7 @@ public class WorkflowSequenceService extends AbstractService implements
public void stateChanged(Service service) {
// only react to the state change when it is the current service
// and it has entered the STOPPED state
- if (service == currentService && service.isInState(STATE.STOPPED)) {
+ if (service == activeService && service.isInState(STATE.STOPPED)) {
onServiceCompleted(service);
}
}
@@ -185,8 +230,8 @@ public class WorkflowSequenceService extends AbstractService implements
* @param service service that has completed
*/
protected synchronized void onServiceCompleted(Service service) {
- log.info("Running service stopped: {}", service);
- previousService = currentService;
+ LOG.info("Running service stopped: {}", service);
+ previousService = activeService;
//start the next service if we are not stopped ourselves
@@ -218,7 +263,7 @@ public class WorkflowSequenceService extends AbstractService implements
} else {
//not started, so just note that the current service
//has gone away
- currentService = null;
+ activeService = null;
}
}
@@ -227,9 +272,10 @@ public class WorkflowSequenceService extends AbstractService implements
* {@link WorkflowSequenceService}
* @param service the {@link Service} to be added
*/
- @Override //Parent
+ @Override
public synchronized void addService(Service service) {
- log.debug("Adding service {} ", service.getName());
+ Preconditions.checkNotNull(service, "null service argument");
+ LOG.debug("Adding service {} ", service.getName());
synchronized (serviceList) {
serviceList.add(service);
}
@@ -247,7 +293,7 @@ public class WorkflowSequenceService extends AbstractService implements
@Override // Object
public synchronized String toString() {
- return super.toString() + "; current service " + currentService
+ return super.toString() + "; current service " + activeService
+ "; queued service count=" + serviceList.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
new file mode 100644
index 0000000..89275c8
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+
+ <h2>
+ Introduction
+ </h2>
+ This package contains classes which can be aggregated to build up
+ complex workflows of services: sequences of operations, callbacks
+ and composite services with a shared lifespan.
+
+ Core concepts:
+ <ol>
+ <li>
+ Workflow service instances have a limited lifespan, and will self-terminate when
+ they consider it time</li>
+ <li>
+ Workflow Services that have children implement the {@link org.apache.slider.server.services.workflow.ServiceParent}
+ class, which provides (thread-safe) access to the children -allowing new children
+ to be added, and existing children to be ennumerated
+ </li>
+ <li>
+ Workflow Services are designed to be aggregated, to be composed to produce larger
+ composite services which than perform ordered operations, notify other services
+ when work has completed, and to propagate failure up the service hierarchy.
+ </li>
+ <li>
+ Workflow Services may be subclassed to extend their behavior, or to use them
+ in specific applications. Just as the standard {@link org.apache.hadoop.service.CompositeService}
+ is often subclassed to aggregate child services, the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ can be used instead -adding the feature that failing services trigger automatic
+ parent shutdown. If that is the desired operational mode of a class,
+ swapping the composite service implementation may be sufficient to adopt it.
+ </li>
+ </ol>
+
+ <h2>
+ How do the workflow services differ from the standard <code>CompositeService</code>?
+ </h2>
+
+ The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ shares the same model of "child services, all inited and started together".
+ Where it differs is that if any child service stops -either due to a failure
+ or to an action which invokes that service's <code>stop()</code> method.
+
+ In contrast, the original <code>CompositeService</code> class starts its children
+ in its <code>start()</code> method, but does not listen or react to any
+ child service halting. As a result, changes in child state are not detected
+ or propagated.
+
+ If a child service runs until completed -that is it will not be stopped until
+ instructed to do so, and if it is only the parent service that attempts to
+ stop the child, then this difference is unimportant.
+
+ However, if any service that depends upon all it child services running -
+ and if those child services are written so as to stop when they fail, using
+ the <code>WorkflowCompositeService</code> as a base class will enable the
+ parent service to be automatically notified of a child stopping.
+
+ The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
+ resembles the composite service in API, but its workflow is different. It
+ initializes and starts its children one-by-one, only starting the second after
+ the first one succeeds, the third after the second, etc. If any service in
+ the sequence fails, the parent <code>WorkflowSequenceService</code> stops,
+ reporting the same exception.
+
+
+ <h2>
+ Other workflow services
+ </h2>
+
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowEventNotifyingService }:
+ Notifies callbacks when a workflow reaches a specific point (potentially after a delay).</li>
+ <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+ Executes a process when started, and binds to the life of that process. When the
+ process terminates, so does the service -and vice versa.</li>
+ <li>{@link }: </li>
+ <li>{@link }: </li>
+ </ul>
+
+Lower level classes
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowExecutorService }:
+ This is a base class for YARN services that use an {@link java.util.concurrent.ExecutorService}.
+ for managing asynchronous operations: it stops the executor when the service is
+ stopped.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+ Executes a process when started, and binds to the life of that process. When the
+ process terminates, so does the service -and vice versa.</li>
+ <li>{@link org.apache.slider.server.services.workflow.LongLivedProcess}:
+ The inner class used to managed the forked process. When called directly it
+ offers more features.</li>
+ <li>{@link org.apache.slider.server.services.workflow.ClosingService}:
+ A parameterized service to close the <code>Closeable</code> passed in -used for cleaning
+ up references.</li>
+ </ul>
+
+
+
+ */
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
index 03cf998..d4f90a5 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
@@ -62,22 +62,12 @@ public class TestWorkflowCompositeService extends WorkflowServiceTestBase {
}
@Test
- public void testOneLongLivedChild() throws Throwable {
- MockService one = new MockService("one", false, 500);
- MockService two = new MockService("two", false, 100);
- ServiceParent parent = startService(one, two);
- waitForParentToStop(parent);
- assertStopped(one);
- assertStopped(two);
- }
-
- @Test
public void testNotificationChild() throws Throwable {
- EventCallbackHandler ecb = new EventCallbackHandler();
MockService one = new MockService("one", false, 100);
+ EventCallbackHandler ecb = new EventCallbackHandler();
WorkflowEventNotifyingService ens =
- new WorkflowEventNotifyingService(ecb, 100);
+ new WorkflowEventNotifyingService(ecb, "hello", 100);
MockService two = new MockService("two", false, 100);
ServiceParent parent = startService(one, ens, two);
waitForParentToStop(parent);
@@ -85,6 +75,7 @@ public class TestWorkflowCompositeService extends WorkflowServiceTestBase {
assertStopped(ens);
assertStopped(two);
assertTrue(ecb.notified);
+ assertEquals("hello", ecb.result);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
index 986057a..80b70a2 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
@@ -28,22 +28,27 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
private static final Logger
log = LoggerFactory.getLogger(TestWorkflowSequenceService.class);
-
@Test
public void testSingleSequence() throws Throwable {
- ServiceParent ss = startService(new MockService());
- ss.stop();
+ ServiceParent parent = startService(new MockService());
+ parent.stop();
+ }
+
+ @Test
+ public void testEmptySequence() throws Throwable {
+ ServiceParent parent = startService();
+ waitForParentToStop(parent);
}
@Test
public void testSequence() throws Throwable {
MockService one = new MockService("one", false, 100);
MockService two = new MockService("two", false, 100);
- ServiceParent ss = startService(one, two);;
- assert ss.waitForServiceToStop(1000);
- assert one.isInState(Service.STATE.STOPPED);
- assert two.isInState(Service.STATE.STOPPED);
- assert ((WorkflowSequenceService)ss).getPreviousService().equals(two);
+ ServiceParent parent = startService(one, two);
+ waitForParentToStop(parent);
+ assertStopped(one);
+ assertStopped(two);
+ assert ((WorkflowSequenceService)parent).getPreviousService().equals(two);
}
@Test
@@ -51,14 +56,15 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
EventCallbackHandler ecb = new EventCallbackHandler();
MockService one = new MockService("one", false, 100);
WorkflowEventNotifyingService ens =
- new WorkflowEventNotifyingService(ecb, 100);
+ new WorkflowEventNotifyingService(ecb, 3, 100);
MockService two = new MockService("two", false, 100);
- ServiceParent ss = startService(one, ens, two);
- assert ss.waitForServiceToStop(1000);
+ ServiceParent parent = startService(one, ens, two);
+ waitForParentToStop(parent);
assertStopped(one);
assertStopped(ens);
assertStopped(two);
assertTrue(ecb.notified);
+ assertEquals(3, ecb.result);
}
@Test
@@ -114,6 +120,25 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
assertStopped(two);
}
+
+ @Test
+ public void testAddChild() throws Throwable {
+ MockService one = new MockService("one", false, 5000);
+ MockService two = new MockService("two", false, 100);
+ ServiceParent parent = startService(one, two);
+ EventCallbackHandler ecb = new EventCallbackHandler();
+ WorkflowEventNotifyingService ens =
+ new WorkflowEventNotifyingService(ecb, "hello", 100);
+ parent.addService(ens);
+ waitForParentToStop(parent, 10000);
+ assertStopped(one);
+ assertStopped(two);
+ assertStopped(ens);
+ assertStopped(two);
+ assertTrue(ecb.notified);
+ assertEquals("hello", ecb.result);
+ }
+
public WorkflowSequenceService buildService(Service... services) {
WorkflowSequenceService parent =
new WorkflowSequenceService("test", services);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index d8d87e4..657cc31 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -21,6 +21,8 @@ package org.apache.slider.server.services.workflow;
import org.apache.hadoop.service.Service;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,12 +30,20 @@ public abstract class WorkflowServiceTestBase extends Assert {
private static final Logger
log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
+
+ /**
+ * Set the timeout for every test
+ */
+ @Rule
+ public Timeout testTimeout = new Timeout(15000);
+
@Before
public void nameThread() {
Thread.currentThread().setName("JUnit");
}
+
- public void assertInState(Service service, Service.STATE expected) {
+ protected void assertInState(Service service, Service.STATE expected) {
Service.STATE actual = service.getServiceState();
if (actual != expected) {
fail("Service " + service.getName() + " in state " + actual
@@ -41,18 +51,18 @@ public abstract class WorkflowServiceTestBase extends Assert {
}
}
- public void assertStopped(Service service) {
+ protected void assertStopped(Service service) {
assertInState(service, Service.STATE.STOPPED);
}
- void logState(ServiceParent p) {
+ protected void logState(ServiceParent p) {
logService(p);
for (Service s : p.getServices()) {
logService(s);
}
}
- public void logService(Service s) {
+ protected void logService(Service s) {
log.info(s.toString());
Throwable failureCause = s.getFailureCause();
if (failureCause != null) {
@@ -61,30 +71,48 @@ public abstract class WorkflowServiceTestBase extends Assert {
}
}
- public void waitForParentToStop(ServiceParent parent) {
- boolean stop = parent.waitForServiceToStop(1000);
+ /**
+ * Wait a second for the service parent to stop
+ * @param parent the service to wait for
+ */
+ protected void waitForParentToStop(ServiceParent parent) {
+ waitForParentToStop(parent, 1000);
+ }
+
+ /**
+ * Wait for the service parent to stop
+ * @param parent the service to wait for
+ * @param timeout time in milliseconds
+ */
+ protected void waitForParentToStop(ServiceParent parent, int timeout) {
+ boolean stop = parent.waitForServiceToStop(timeout);
if (!stop) {
logState(parent);
- fail("Service failed to stop :" + parent);
+ fail("Service failed to stop : after " + timeout +" millis " + parent);
}
}
- public abstract ServiceParent buildService(Service... services);
+ protected abstract ServiceParent buildService(Service... services);
- public ServiceParent startService(Service... services) {
+ protected ServiceParent startService(Service... services) {
ServiceParent parent = buildService(services);
//expect service to start and stay started
parent.start();
return parent;
}
+ /**
+ * Class to log when an event callback happens
+ */
public static class EventCallbackHandler implements WorkflowEventCallback {
public volatile boolean notified = false;
+ public Object result;
@Override
- public void eventCallbackEvent() {
+ public void eventCallbackEvent(Object parameter) {
log.info("EventCallback");
notified = true;
+ result = parameter;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
index f653aa5..2210ff3 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
@@ -332,7 +332,8 @@ public class AccumuloProviderService extends AbstractProviderService implements
//callback to AM to trigger cluster review is set up to happen after
//the init/verify action has succeeded
WorkflowEventNotifyingService notifier = new WorkflowEventNotifyingService(execInProgress,
- internalOperations.getGlobalOptions().getOptionInt(
+ null,
+ internalOperations.getGlobalOptions().getOptionInt(
OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY));
// register the service for lifecycle management;