You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/03 17:30:46 UTC
[2/6] git commit: SLIDER-94 : move to Callable based event
notification; drop event notifying service
SLIDER-94 : move to Callable based event notification; drop event notifying service
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5192fc7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5192fc7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5192fc7a
Branch: refs/heads/develop
Commit: 5192fc7a93ff6b27c9e9705e6b81d8d52332943d
Parents: 6165828
Author: Steve Loughran <st...@apache.org>
Authored: Tue Jun 3 16:16:29 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Jun 3 16:17:29 2014 +0100
----------------------------------------------------------------------
.../providers/AbstractProviderService.java | 2 +-
.../slider/providers/ProviderCompleted.java | 29 ++++
.../providers/ProviderCompletedCallable.java | 38 +++++
.../slider/providers/ProviderService.java | 3 +-
.../providers/agent/AgentProviderService.java | 4 +-
.../slideram/SliderAMProviderService.java | 4 +-
.../server/appmaster/SliderAppMaster.java | 14 +-
.../services/workflow/ForkedProcessService.java | 1 +
.../workflow/ServiceTerminatingCallable.java | 11 +-
.../workflow/WorkflowCallbackService.java | 24 ++-
.../workflow/WorkflowEventCallback.java | 31 ----
.../workflow/WorkflowEventNotifyingService.java | 114 -------------
.../server/services/workflow/package-info.java | 158 +++++++++++--------
.../model/mock/MockProviderService.groovy | 4 +-
.../apache/slider/test/SliderTestUtils.groovy | 8 +-
.../workflow/TestForkedProcessService.java | 140 ----------------
.../TestServiceTerminatingRunnable.java | 64 --------
.../workflow/TestWorkflowCompositeService.java | 2 +-
.../TestWorkflowForkedProcessService.java | 140 ++++++++++++++++
.../workflow/TestWorkflowSequenceService.java | 4 +-
.../TestWorkflowServiceTerminatingRunnable.java | 64 ++++++++
.../workflow/WorkflowServiceTestBase.java | 17 --
.../accumulo/AccumuloProviderService.java | 26 +--
.../providers/hbase/HBaseProviderService.java | 4 +-
24 files changed, 428 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index da9f1d1..7c9b38e 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -169,7 +169,7 @@ public abstract class AbstractProviderService
}
}
ForkedProcessService lastProc = latestProcess();
- if (lastProc == null) {
+ if (lastProc == null || !lastProc.isProcessTerminated()) {
return 0;
} else {
return lastProc.getExitCode();
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
new file mode 100644
index 0000000..f6ff4fd
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+/**
+ * This is the callback triggered by the {@link ProviderCompletedCallable}
+ * when it generates a notification
+ */
+public interface ProviderCompleted {
+
+ public void eventCallbackEvent(Object parameter);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
new file mode 100644
index 0000000..47939c9
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import java.util.concurrent.Callable;
+
+public class ProviderCompletedCallable implements Callable<Object> {
+
+ private final ProviderCompleted callback;
+ private final Object parameter;
+
+ public ProviderCompletedCallable(ProviderCompleted callback, Object parameter) {
+ this.callback = callback;
+ this.parameter = parameter;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ callback.eventCallbackEvent(parameter);
+ return parameter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index ab09a8d..d77135c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -34,7 +34,6 @@ import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.server.appmaster.state.StateAccessForProviders;
import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
import org.apache.slider.server.services.registry.RegistryViewForProviders;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import java.io.File;
import java.io.IOException;
@@ -81,7 +80,7 @@ public interface ProviderService extends ProviderCore, Service,
boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- WorkflowEventCallback execInProgress) throws IOException,
+ ProviderCompleted execInProgress) throws IOException,
SliderException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 8948746..f62198c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -46,6 +46,7 @@ import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.core.registry.info.RegisteredEndpoint;
import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
@@ -66,7 +67,6 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -307,7 +307,7 @@ public class AgentProviderService extends AbstractProviderService implements
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- WorkflowEventCallback execInProgress) throws
+ ProviderCompleted execInProgress) throws
IOException,
SliderException {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index 9667c8f..1610954 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -39,12 +39,12 @@ import org.apache.slider.core.registry.info.RegisteredEndpoint;
import org.apache.slider.core.registry.info.RegistryView;
import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.agent.AgentKeys;
import org.apache.slider.server.appmaster.PublishedArtifacts;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import java.io.File;
import java.io.IOException;
@@ -94,7 +94,7 @@ public class SliderAMProviderService extends AbstractProviderService implements
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- WorkflowEventCallback execInProgress) throws IOException, SliderException {
+ ProviderCompleted execInProgress) throws IOException, SliderException {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 2457b4d..e126261 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -88,6 +88,7 @@ import org.apache.slider.core.registry.info.CustomRegistryConstants;
import org.apache.slider.core.registry.info.RegisteredEndpoint;
import org.apache.slider.core.registry.info.RegistryNaming;
import org.apache.slider.core.registry.info.ServiceInstanceData;
+import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderService;
import org.apache.slider.providers.SliderProviderFactory;
@@ -112,7 +113,6 @@ import org.apache.slider.server.appmaster.web.WebAppApiImpl;
import org.apache.slider.server.appmaster.web.rest.RestPaths;
import org.apache.slider.server.services.registry.SliderRegistryService;
import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import org.apache.slider.server.services.workflow.WorkflowRpcService;
import org.apache.slider.server.services.utility.WebAppService;
import org.slf4j.Logger;
@@ -149,7 +149,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
SliderClusterProtocol,
ServiceStateChangeListener,
RoleKeys,
- WorkflowEventCallback,
+ ProviderCompleted,
ContainerStartOperation {
protected static final Logger log =
LoggerFactory.getLogger(SliderAppMaster.class);
@@ -1321,7 +1321,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// didn't start, so don't register
providerService.start();
// and send the started event ourselves
- eventCallbackEvent(this, null, null);
+ eventCallbackEvent(null);
}
}
@@ -1330,10 +1330,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/* EventCallback from the child or ourselves directly */
/* =================================================================== */
- @Override // EventCallback
- public void eventCallbackEvent(Object caller,
- Object parameter,
- Exception exception) {
+ @Override // ProviderCompleted
+ public void eventCallbackEvent(Object parameter) {
// signalled that the child process is up.
appState.noteAMLive();
// now ask for the cluster nodes
@@ -1357,7 +1355,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
*/
@Override //ServiceStateChangeListener
public void stateChanged(Service service) {
- if (service == providerService) {
+ if (service == providerService && service.isInState(STATE.STOPPED)) {
//its the current master process in play
int exitCode = providerService.getExitCode();
int mappedProcessExitCode =
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index 141ab7d..900e722 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -250,6 +250,7 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
public int getExitCode() {
return process.getExitCode();
}
+
public int getExitCodeSignCorrected() {
return process.getExitCodeSignCorrected();
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
index 3ec1428..5ebf77c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
@@ -38,9 +38,14 @@ public class ServiceTerminatingCallable<V> implements Callable<V> {
private final Callable<V> callable;
+ /**
+ * Create an instance. If the owner is null, the owning service
+ * is not terminated.
+ * @param owner owning service -can be null
+ * @param callable callback.
+ */
public ServiceTerminatingCallable(Service owner,
Callable<V> callable) {
- Preconditions.checkArgument(owner != null, "null owner");
Preconditions.checkArgument(callable != null, "null callable");
this.owner = owner;
this.callable = callable;
@@ -79,7 +84,9 @@ public class ServiceTerminatingCallable<V> implements Callable<V> {
exception = e;
throw e;
} finally {
- owner.stop();
+ if (owner != null) {
+ owner.stop();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
index 5bf0803..6c50798 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
@@ -30,7 +30,8 @@ import java.util.concurrent.TimeUnit;
/**
* A service that calls the supplied callback when it is started -after the
- * given delay, then stops itself.
+ * given delay. It can be configured to stop itself after the callback has
+ * completed, marking any exception raised as the exception of this service.
* The notifications come in on a callback thread -a thread that is only
* started in this service's <code>start()</code> operation.
*/
@@ -53,15 +54,19 @@ public class WorkflowCallbackService<V> extends
* @param name service name
* @param callback callback to invoke
* @param delay delay -or 0 for no delay
+ * @param terminate terminate this service after the callback?
*/
public WorkflowCallbackService(String name,
Callable<V> callback,
- int delay) {
+ int delay,
+ boolean terminate) {
super(name);
Preconditions.checkNotNull(callback, "Null callback argument");
this.callback = callback;
this.delay = delay;
- command = new ServiceTerminatingCallable<V>(this, callback);
+ command = new ServiceTerminatingCallable<V>(
+ terminate ? this : null,
+ callback);
}
public ScheduledFuture<V> getScheduledFuture() {
@@ -89,9 +94,18 @@ public class WorkflowCallbackService<V> extends
protected void serviceStop() throws Exception {
super.serviceStop();
// propagate any failure
- if (command.getException() != null) {
- throw command.getException();
+ if (getCallbackException() != null) {
+ throw getCallbackException();
}
}
+ /**
+ * Get the exception raised by a callback. Will always be null if the
+ * callback has not been executed; will only be non-null after any success.
+ * @return a callback
+ */
+ public Exception getCallbackException() {
+ return command.getException();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
deleted file mode 100644
index fe28e38..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-/**
- * This is the callback triggered by the {@link WorkflowEventNotifyingService}
- * when it generates a notification
- */
-public interface WorkflowEventCallback {
-
- public void eventCallbackEvent(Object caller,
- Object parameter,
- Exception exception);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
deleted file mode 100644
index 3c776d1..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-
-/**
- * A service that calls the supplied callback when it is started -after the
- * given delay, then stops itself.
- * The notifications come in on a callback thread -a thread that is only
- * started in this service's <code>start()</code> operation.
- */
-public class WorkflowEventNotifyingService<V> extends
- AbstractWorkflowExecutorService
- implements Runnable {
- protected static final Logger LOG =
- LoggerFactory.getLogger(WorkflowEventNotifyingService.class);
- private final WorkflowEventCallback callback;
- private final int delay;
- private final ServiceTerminatingRunnable command;
- private final Object parameter;
-
- /**
- * Create an instance of the service
- * @param name service name
- * @param callback callback to invoke
- * @param parameter optional parameter for the callback
- * @param delay delay -or 0 for no delay
- */
- public WorkflowEventNotifyingService(String name,
- WorkflowEventCallback callback,
- Object parameter,
- int delay) {
- super(name);
- Preconditions.checkNotNull(callback, "Null callback argument");
- this.callback = callback;
- this.delay = delay;
- this.parameter = parameter;
- this.command = new ServiceTerminatingRunnable(this, this);
- }
-
- /**
- * Create an instance of the service
- * @param callback callback to invoke
- * @param parameter optional parameter for the callback
- * @param delay delay -or 0 for no delay
- */
- public WorkflowEventNotifyingService(WorkflowEventCallback callback,
- Object parameter,
- int delay) {
- this("WorkflowEventNotifyingService", callback, parameter, delay);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
- setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
- execute(command);
- }
-
- /**
- * Stop the service.
- * If there is any exception noted from any executed notification,
- * note the exception in this class
- * @throws Exception exception.
- */
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- // propagate any failure
- if (command.getException() != null) {
- noteFailure(command.getException());
- }
- }
-
- /**
- * Perform the work in a different thread. Relies on
- * the {@link ServiceTerminatingRunnable} to trigger
- * the service halt on this thread.
- */
- @Override // Runnable
- public void run() {
- if (delay > 0) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted: {} in runnable", e, e);
- }
- }
- LOG.debug("Notifying {}", callback);
- callback.eventCallbackEvent(this, parameter, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
index 4dd2cc7..fab1b9f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -20,44 +20,49 @@ package org.apache.slider.server.services.workflow;
/**
- <h2>
- Introduction
- </h2>
+<p>
This package contains classes which can be aggregated to build up
complex workflows of services: sequences of operations, callbacks
and composite services with a shared lifespan.
+ </p>
+<h2>
Core concepts:
- <ol>
- <li>
- Workflow service instances have a limited lifespan, and will self-terminate when
- they consider it time</li>
- <li>
- Workflow Services that have children implement the
- {@link org.apache.slider.server.services.workflow.ServiceParent}
- class, which provides (thread-safe) access to the children -allowing new children
- to be added, and existing children to be ennumerated
- </li>
- <li>
- Workflow Services are designed to be aggregated, to be composed to produce larger
- composite services which than perform ordered operations, notify other services
- when work has completed, and to propagate failure up the service hierarchy.
- </li>
- <li>
- Workflow Services may be subclassed to extend their behavior, or to use them
- in specific applications. Just as the standard
- {@link org.apache.hadoop.service.CompositeService}
- is often subclassed to aggregate child services, the
- {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
- can be used instead -adding the feature that failing services trigger automatic
- parent shutdown. If that is the desired operational mode of a class,
- swapping the composite service implementation may be sufficient to adopt it.
- </li>
- </ol>
+</h2>
+
- <h2>
- How do the workflow services differ from the standard YARN services?
- </h2>
+<p>
+The Workflow Services are set of Hadoop YARN services, all implementing
+the {@link org.apache.hadoop.service.Service} API.
+They are designed to be aggregated, to be composed to produce larger
+composite services which than perform ordered operations, notify other services
+when work has completed, and to propagate failure up the service hierarchy.
+</p>
+<p>
+Service instances may a limited lifespan, and may self-terminate when
+they consider it appropriate.</p>
+<p>
+Workflow Services that have children implement the
+{@link org.apache.slider.server.services.workflow.ServiceParent}
+class, which provides (thread-safe) access to the children -allowing new children
+to be added, and existing children to be ennumerated. The implement policies
+on how to react to the termination of children -so can sequence operations
+which terminate themselves when complete.
+</p>
+
+<p>
+Workflow Services may be subclassed to extend their behavior, or to use them
+in specific applications. Just as the standard
+{@link org.apache.hadoop.service.CompositeService}
+is often subclassed to aggregate child services, the
+{@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+can be used instead -adding the feature that failing services trigger automatic
+parent shutdown. If that is the desired operational mode of a class,
+swapping the composite service implementation may be sufficient to adopt it.
+</p>
+
+
+<h2> How do the workflow services differ from the standard YARN services? </h2>
<p>
@@ -69,40 +74,55 @@ package org.apache.slider.server.services.workflow;
Where it differs is that if any child service stops -either due to a failure
or to an action which invokes that service's
{@link org.apache.hadoop.service.Service#stop()} method.
- </p><p>
-
- In contrast, the original <code>CompositeService</code> class starts its children
- in its{@link org.apache.hadoop.service.Service#start()} method, but does not listen or react to any
- child service halting. As a result, changes in child state are not detected
- or propagated.
- </p><p>
-
- If a child service runs until completed -that is it will not be stopped until
- instructed to do so, and if it is only the parent service that attempts to
- stop the child, then this difference is unimportant.
- </p><p>
-
- However, if any service that depends upon all it child services running -
- and if those child services are written so as to stop when they fail, using
- the <code>WorkflowCompositeService</code> as a base class will enable the
- parent service to be automatically notified of a child stopping.
-
- </p><p>
- The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
- resembles the composite service in API, but its workflow is different. It
- initializes and starts its children one-by-one, only starting the second after
- the first one succeeds, the third after the second, etc. If any service in
- the sequence fails, the parent <code>WorkflowSequenceService</code> stops,
- reporting the same exception.
</p>
<p>
- The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
- Executes a process when started, and binds to the life of that process. When the
- process terminates, so does the service -and vice versa. This service enables
- external processes to be executed as part of a sequence of operations -or,
- using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
- in parallel with other services, terminating the process when the other services
- stop -and vice versa.
+
+In contrast, the original <code>CompositeService</code> class starts its children
+in its{@link org.apache.hadoop.service.Service#start()} method, but does not
+listen or react to any child service halting. As a result, changes in child
+state are not automatically detected or propagated, other than failures in
+the actual init() and start() methods.
+</p>
+
+<p>
+If a child service runs until completed -that is it will not be stopped until
+instructed to do so, and if it is only the parent service that attempts to
+stop the child, then this difference is unimportant.
+</p>
+<p>
+
+However, if any service that depends upon all it child services running -
+and if those child services are written so as to stop when they fail, using
+the <code>WorkflowCompositeService</code> as a base class will enable the
+parent service to be automatically notified of a child stopping.
+
+</p>
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
+resembles the composite service in API, but its workflow is different. It
+initializes and starts its children one-by-one, only starting the second after
+the first one succeeds, the third after the second, etc. If any service in
+the sequence fails, the parent <code>WorkflowSequenceService</code> stops,
+reporting the same exception.
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+Executes a process when started, and binds to the life of that process. When the
+process terminates, so does the service -and vice versa. This service enables
+external processes to be executed as part of a sequence of operations -or,
+using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+in parallel with other services, terminating the process when the other services
+stop -and vice versa.
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowCallbackService}
+executes a {@link java.util.concurrent.Callable} callback a specified delay
+after the service is started, then potentially terminates itself.
+This is useful for callbacks when a workflow reaches a specific point
+-or simply for executing arbitrary code in the workflow.
+
</p>
@@ -110,18 +130,16 @@ package org.apache.slider.server.services.workflow;
Other Workflow Services
</h2>
- There are some minor services that have proven useful within aggregate workflows,
- and simply in applications which are built from composite YARN services.
-
+There are some minor services that have proven useful within aggregate workflows,
+and simply in applications which are built from composite YARN services.
+
<ul>
<li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }:
Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
When the service is started, so is the RPC server. Similarly, when the service
is stopped, so is the RPC server instance.
</li>
- <li>{@link org.apache.slider.server.services.workflow.WorkflowEventNotifyingService }:
- Notifies callbacks when a workflow reaches a specific point (potentially after a delay).
- </li>
+
<li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
an instance of {@link java.io.Closeable} when the service is stopped. This
is purely a housekeeping class.
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
index e82deb0..361fc2e 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
@@ -42,7 +42,7 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus
import org.apache.slider.server.services.registry.RegistryViewForProviders
-import org.apache.slider.server.services.workflow.WorkflowEventCallback
+import org.apache.slider.providers.ProviderCompleted
class MockProviderService implements ProviderService {
@@ -145,7 +145,7 @@ class MockProviderService implements ProviderService {
AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- WorkflowEventCallback execInProgress) throws IOException, SliderException {
+ ProviderCompleted execInProgress) throws IOException, SliderException {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
index 2045f11..cb95fd2 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
@@ -481,8 +481,8 @@ class SliderTestUtils extends Assert {
ServiceLauncher<SliderClient> serviceLauncher =
new ServiceLauncher<SliderClient>(SliderClient.name);
serviceLauncher.launchService(conf,
- toArray(args),
- false);
+ toArray(args),
+ false, true);
return serviceLauncher
}
@@ -493,8 +493,8 @@ class SliderTestUtils extends Assert {
ServiceLauncher serviceLauncher =
new ServiceLauncher(serviceClass.name);
serviceLauncher.launchService(conf,
- toArray(args),
- false);
+ toArray(args),
+ false, true);
return serviceLauncher;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
deleted file mode 100644
index 1ac89a5..0000000
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.ServiceOperations;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Test the long lived process by executing a command that works and a command
- * that fails
- */
-public class TestForkedProcessService extends WorkflowServiceTestBase {
- private static final Logger
- log = LoggerFactory.getLogger(TestForkedProcessService.class);
-
- private static final Logger
- processLog =
- LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
-
-
- private ForkedProcessService process;
- private File testDir = new File("target");
- private ProcessCommandFactory commandFactory;
- private Map<String, String> env = new HashMap<String, String>();
-
- @Before
- public void setupProcesses() {
- commandFactory = ProcessCommandFactory.createProcessCommandFactory();
- }
-
- @After
- public void stopProcesses() {
- ServiceOperations.stop(process);
- }
-
- @Test
- public void testLs() throws Throwable {
-
- initProcess(commandFactory.ls(testDir));
- exec();
- assertFalse(process.isProcessRunning());
- assertEquals(0, process.getExitCode());
-
- assertStringInOutput("test-classes", getFinalOutput());
- // assert that the service did not fail
- assertNull(process.getFailureCause());
- }
-
- @Test
- public void testExitCodes() throws Throwable {
-
- initProcess(commandFactory.exitFalse());
- exec();
- assertFalse(process.isProcessRunning());
- int exitCode = process.getExitCode();
- assertTrue(exitCode != 0);
- int corrected = process.getExitCodeSignCorrected();
- assertEquals(1, corrected);
- // assert that the exit code was uprated to a service failure
- assertNotNull(process.getFailureCause());
-
- }
-
- @Test
- public void testEcho() throws Throwable {
-
- String echoText = "hello, world";
- initProcess(commandFactory.echo(echoText));
- exec();
-
- assertEquals(0, process.getExitCode());
- assertStringInOutput(echoText, getFinalOutput());
-
- }
-
- @Test
- public void testSetenv() throws Throwable {
-
- String var = "TEST_RUN";
- String val = "TEST-RUN-ENV-VALUE";
- env.put(var, val);
- initProcess(commandFactory.env());
- exec();
-
- assertEquals(0, process.getExitCode());
- assertStringInOutput(val, getFinalOutput());
- }
-
- /**
- * Get the final output. includes a quick sleep for the tail output
- * @return the last output
- */
- private List<String> getFinalOutput() {
- return process.getRecentOutput();
- }
-
- private ForkedProcessService initProcess(List<String> commands) throws
- IOException {
- process = new ForkedProcessService(name.getMethodName(), env,
- commands);
- process.init(new Configuration());
-
- return process;
- }
-
- public void exec() throws InterruptedException {
- assertNotNull(process);
- EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
- process.start();
- waiter.waitForServiceToStop(5000);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
deleted file mode 100644
index ffedd6e..0000000
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.junit.Test;
-
-
-public class TestServiceTerminatingRunnable extends WorkflowServiceTestBase {
-
- @Test
- public void testNoservice() throws Throwable {
-
- try {
- new ServiceTerminatingRunnable(null, new SimpleRunnable());
- fail("unexpected ");
- } catch (IllegalArgumentException e) {
-
- // expected
- }
- }
-
-
- @Test
- public void testBasicRun() throws Throwable {
-
- WorkflowCompositeService svc = run(new WorkflowCompositeService());
- ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
- new SimpleRunnable());
-
- // synchronous in-thread execution
- runnable.run();
- assertStopped(svc);
- }
-
- @Test
- public void testFailureRun() throws Throwable {
-
- WorkflowCompositeService svc = run(new WorkflowCompositeService());
- ServiceTerminatingRunnable runnable =
- new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
-
- // synchronous in-thread execution
- runnable.run();
- assertStopped(svc);
- assertNotNull(runnable.getException());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
index afd5a61..5780149 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
@@ -66,7 +66,7 @@ public class TestWorkflowCompositeService extends ParentWorkflowTestBase {
MockService one = new MockService("one", false, 100);
CallableHandler handler = new CallableHandler("hello");
WorkflowCallbackService<String> ens =
- new WorkflowCallbackService<String>("handler", handler, 100);
+ new WorkflowCallbackService<String>("handler", handler, 100, true);
MockService two = new MockService("two", false, 100);
ServiceParent parent = startService(one, ens, two);
waitForParentToStop(parent);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
new file mode 100644
index 0000000..29d5578
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the long lived process by executing a command that works and a command
+ * that fails
+ */
+public class TestWorkflowForkedProcessService extends WorkflowServiceTestBase {
+ private static final Logger
+ log = LoggerFactory.getLogger(TestWorkflowForkedProcessService.class);
+
+ private static final Logger
+ processLog =
+ LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+
+
+ private ForkedProcessService process;
+ private File testDir = new File("target");
+ private ProcessCommandFactory commandFactory;
+ private Map<String, String> env = new HashMap<String, String>();
+
+ @Before
+ public void setupProcesses() {
+ commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+ }
+
+ @After
+ public void stopProcesses() {
+ ServiceOperations.stop(process);
+ }
+
+ @Test
+ public void testLs() throws Throwable {
+
+ initProcess(commandFactory.ls(testDir));
+ exec();
+ assertFalse(process.isProcessRunning());
+ assertEquals(0, process.getExitCode());
+
+ assertStringInOutput("test-classes", getFinalOutput());
+ // assert that the service did not fail
+ assertNull(process.getFailureCause());
+ }
+
+ @Test
+ public void testExitCodes() throws Throwable {
+
+ initProcess(commandFactory.exitFalse());
+ exec();
+ assertFalse(process.isProcessRunning());
+ int exitCode = process.getExitCode();
+ assertTrue(exitCode != 0);
+ int corrected = process.getExitCodeSignCorrected();
+ assertEquals(1, corrected);
+ // assert that the exit code was uprated to a service failure
+ assertNotNull(process.getFailureCause());
+
+ }
+
+ @Test
+ public void testEcho() throws Throwable {
+
+ String echoText = "hello, world";
+ initProcess(commandFactory.echo(echoText));
+ exec();
+
+ assertEquals(0, process.getExitCode());
+ assertStringInOutput(echoText, getFinalOutput());
+
+ }
+
+ @Test
+ public void testSetenv() throws Throwable {
+
+ String var = "TEST_RUN";
+ String val = "TEST-RUN-ENV-VALUE";
+ env.put(var, val);
+ initProcess(commandFactory.env());
+ exec();
+
+ assertEquals(0, process.getExitCode());
+ assertStringInOutput(val, getFinalOutput());
+ }
+
+ /**
+ * Get the final output. includes a quick sleep for the tail output
+ * @return the last output
+ */
+ private List<String> getFinalOutput() {
+ return process.getRecentOutput();
+ }
+
+ private ForkedProcessService initProcess(List<String> commands) throws
+ IOException {
+ process = new ForkedProcessService(name.getMethodName(), env,
+ commands);
+ process.init(new Configuration());
+
+ return process;
+ }
+
+ public void exec() throws InterruptedException {
+ assertNotNull(process);
+ EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
+ process.start();
+ waiter.waitForServiceToStop(5000);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
index a581bc9..581e3ed 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
@@ -57,7 +57,7 @@ public class TestWorkflowSequenceService extends ParentWorkflowTestBase {
MockService one = new MockService("one", false, 100);
CallableHandler handler = new CallableHandler("hello");
WorkflowCallbackService<String> ens =
- new WorkflowCallbackService<String>("handler", handler, 100);
+ new WorkflowCallbackService<String>("handler", handler, 100, true);
MockService two = new MockService("two", false, 100);
ServiceParent parent = startService(one, ens, two);
waitForParentToStop(parent);
@@ -131,7 +131,7 @@ public class TestWorkflowSequenceService extends ParentWorkflowTestBase {
ServiceParent parent = startService(one, two);
CallableHandler handler = new CallableHandler("hello");
WorkflowCallbackService<String> ens =
- new WorkflowCallbackService<String>("handler", handler, 100);
+ new WorkflowCallbackService<String>("handler", handler, 100, true);
parent.addService(ens);
waitForParentToStop(parent, 10000);
assertStopped(one);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java
new file mode 100644
index 0000000..15be1dc
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.junit.Test;
+
+
+public class TestWorkflowServiceTerminatingRunnable extends WorkflowServiceTestBase {
+
+ @Test
+ public void testNoservice() throws Throwable {
+
+ try {
+ new ServiceTerminatingRunnable(null, new SimpleRunnable());
+ fail("unexpected ");
+ } catch (IllegalArgumentException e) {
+
+ // expected
+ }
+ }
+
+
+ @Test
+ public void testBasicRun() throws Throwable {
+
+ WorkflowCompositeService svc = run(new WorkflowCompositeService());
+ ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
+ new SimpleRunnable());
+
+ // synchronous in-thread execution
+ runnable.run();
+ assertStopped(svc);
+ }
+
+ @Test
+ public void testFailureRun() throws Throwable {
+
+ WorkflowCompositeService svc = run(new WorkflowCompositeService());
+ ServiceTerminatingRunnable runnable =
+ new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
+
+ // synchronous in-thread execution
+ runnable.run();
+ assertStopped(svc);
+ assertNotNull(runnable.getException());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index 8adaa1d..3049d8f 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -93,23 +93,6 @@ public abstract class WorkflowServiceTestBase extends Assert {
}
/**
- * Class to log when an event callback happens
- */
- public static class EventCallbackHandler implements WorkflowEventCallback {
- public volatile boolean notified = false;
- public Object result;
-
- @Override
- public void eventCallbackEvent(Object caller,
- Object parameter,
- Exception exception) {
- log.info("EventCallback");
- notified = true;
- result = parameter;
- }
- }
-
- /**
* Handler for callable events
*/
public static class CallableHandler implements Callable<String> {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
index b43751d..df927a9 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.slider.common.SliderKeys;
@@ -39,6 +40,8 @@ import org.apache.slider.core.exceptions.BadCommandArgumentsException;
import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
+import org.apache.slider.providers.ProviderCompletedCallable;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
@@ -46,9 +49,9 @@ import org.apache.slider.common.tools.SliderFileSystem;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.zk.BlockingZKWatcher;
import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
-import org.apache.slider.server.services.workflow.WorkflowEventNotifyingService;
+import org.apache.slider.server.services.utility.WorkflowEventNotifyingService;
import org.apache.slider.server.services.workflow.ForkedProcessService;
+import org.apache.slider.server.services.workflow.WorkflowCallbackService;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -264,7 +267,7 @@ public class AccumuloProviderService extends AbstractProviderService implements
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- WorkflowEventCallback execInProgress)
+ ProviderCompleted execInProgress)
throws IOException, SliderException {
@@ -330,12 +333,17 @@ public class AccumuloProviderService extends AbstractProviderService implements
//callback to AM to trigger cluster review is set up to happen after
//the init/verify action has succeeded
- WorkflowEventNotifyingService notifier = new WorkflowEventNotifyingService(
- execInProgress,
- null,
- internalOperations.getGlobalOptions().getOptionInt(
- OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
- OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY));
+ int delay = internalOperations.getGlobalOptions().getOptionInt(
+ OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
+ OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY);
+ ProviderCompletedCallable completedCallable =
+ new ProviderCompletedCallable(execInProgress, null);
+ Service notifier = new WorkflowCallbackService<>(
+ "accumulo notifier",
+ completedCallable,
+ delay,
+ true);
+
// register the service for lifecycle management;
// this service is started after the accumulo process completes
addService(notifier);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
index be68e4a..407f39a 100644
--- a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
+++ b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
@@ -38,6 +38,7 @@ import org.apache.slider.core.registry.docstore.PublishedConfigSet;
import org.apache.slider.core.registry.docstore.PublishedConfiguration;
import org.apache.slider.core.registry.info.ServiceInstanceData;
import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
import org.apache.slider.providers.ProviderCore;
import org.apache.slider.providers.ProviderRole;
import org.apache.slider.providers.ProviderUtils;
@@ -50,7 +51,6 @@ import org.apache.slider.server.appmaster.web.rest.agent.HeartBeatResponse;
import org.apache.slider.server.appmaster.web.rest.agent.Register;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,7 +240,7 @@ public class HBaseProviderService extends AbstractProviderService implements
public boolean exec(AggregateConf instanceDefinition,
File confDir,
Map<String, String> env,
- WorkflowEventCallback execInProgress) throws
+ ProviderCompleted execInProgress) throws
IOException,
SliderException {