You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/09/12 00:35:14 UTC
[12/84] [abbrv] hadoop git commit: YARN-7050. Post cleanup after
YARN-6903, removal of org.apache.slider package. Contributed by Jian He
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java
deleted file mode 100644
index da122da..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.servicemonitor;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.server.appmaster.state.RoleInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Map;
-
-/**
- * Probe for a port being open.
- */
-public class PortProbe extends Probe {
- protected static final Logger log = LoggerFactory.getLogger(PortProbe.class);
- private final int port;
- private final int timeout;
-
- public PortProbe(int port, int timeout) {
- super("Port probe of " + port + " for " + timeout + "ms", null);
- this.port = port;
- this.timeout = timeout;
- }
-
- public static PortProbe create(Map<String, String> props)
- throws IOException {
- int port = getPropertyInt(props, PORT_PROBE_PORT, null);
-
- if (port >= 65536) {
- throw new IOException(PORT_PROBE_PORT + " " + port + " is out of " +
- "range");
- }
-
- int timeout = getPropertyInt(props, PORT_PROBE_CONNECT_TIMEOUT,
- PORT_PROBE_CONNECT_TIMEOUT_DEFAULT);
-
- return new PortProbe(port, timeout);
- }
-
- /**
- * Try to connect to the (host,port); a failure to connect within
- * the specified timeout is a failure.
- * @param instance role instance
- * @return the outcome
- */
- @Override
- public ProbeStatus ping(ComponentInstance instance) {
- ProbeStatus status = new ProbeStatus();
-
- if (instance.getContainerStatus() == null || SliderUtils
- .isEmpty(instance.getContainerStatus().getIPs())) {
- status.fail(this, new IOException(
- instance.getCompInstanceName() + ": IP is not available yet"));
- return status;
- }
-
- String ip = instance.getContainerStatus().getIPs().get(0);
- InetSocketAddress sockAddr = new InetSocketAddress(ip, port);
- Socket socket = new Socket();
- try {
- if (log.isDebugEnabled()) {
- log.debug(instance.getCompInstanceName() + ": Connecting " + sockAddr
- .toString() + ", timeout=" + MonitorUtils
- .millisToHumanTime(timeout));
- }
- socket.connect(sockAddr, timeout);
- status.succeed(this);
- } catch (Throwable e) {
- String error =
- instance.getCompInstanceName() + ": Probe " + sockAddr + " failed";
- log.debug(error, e);
- status.fail(this, new IOException(error, e));
- } finally {
- IOUtils.closeSocket(socket);
- }
- return status;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java
deleted file mode 100644
index 4809b45..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.servicemonitor;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.service.compinstance.ComponentInstance;
-import org.apache.slider.server.appmaster.state.RoleInstance;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Base class of all probes.
- */
-public abstract class Probe implements MonitorKeys {
-
- protected final Configuration conf;
- private String name;
-
- /**
- * Create a probe of a specific name
- *
- * @param name probe name
- * @param conf configuration being stored.
- */
- public Probe(String name, Configuration conf) {
- this.name = name;
- this.conf = conf;
- }
-
-
- protected void setName(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
-
- @Override
- public String toString() {
- return getName();
- }
-
- public static String getProperty(Map<String, String> props, String name,
- String defaultValue) throws IOException {
- String value = props.get(name);
- if (StringUtils.isEmpty(value)) {
- if (defaultValue == null) {
- throw new IOException(name + " not specified");
- }
- return defaultValue;
- }
- return value;
- }
-
- public static int getPropertyInt(Map<String, String> props, String name,
- Integer defaultValue) throws IOException {
- String value = props.get(name);
- if (StringUtils.isEmpty(value)) {
- if (defaultValue == null) {
- throw new IOException(name + " not specified");
- }
- return defaultValue;
- }
- return Integer.parseInt(value);
- }
-
- /**
- * perform any prelaunch initialization
- */
- public void init() throws IOException {
-
- }
-
- /**
- * Ping the endpoint. All exceptions must be caught and included in the
- * (failure) status.
- *
- * @param instance instance to ping
- * @return the status
- */
- public abstract ProbeStatus ping(ComponentInstance instance);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java
deleted file mode 100644
index 24668bd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.servicemonitor;
-
-import java.io.Serializable;
-import java.util.Date;
-
-/**
- * Status message of a probe. This is designed to be sent over the wire, though the exception
- * Had better be unserializable at the far end if that is to work.
- */
-public final class ProbeStatus implements Serializable {
- private static final long serialVersionUID = 165468L;
-
- private long timestamp;
- private String timestampText;
- private boolean success;
- private boolean realOutcome;
- private String message;
- private Throwable thrown;
- private transient Probe originator;
-
- public ProbeStatus() {
- }
-
- public ProbeStatus(long timestamp, String message, Throwable thrown) {
- this.success = false;
- this.message = message;
- this.thrown = thrown;
- setTimestamp(timestamp);
- }
-
- public ProbeStatus(long timestamp, String message) {
- this.success = true;
- setTimestamp(timestamp);
- this.message = message;
- this.thrown = null;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- timestampText = new Date(timestamp).toString();
- }
-
- public boolean isSuccess() {
- return success;
- }
-
- /**
- * Set both the success and the real outcome bits to the same value
- * @param success the new value
- */
- public void setSuccess(boolean success) {
- this.success = success;
- realOutcome = success;
- }
-
- public String getTimestampText() {
- return timestampText;
- }
-
- public boolean getRealOutcome() {
- return realOutcome;
- }
-
- public String getMessage() {
- return message;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
-
- public Throwable getThrown() {
- return thrown;
- }
-
- public void setThrown(Throwable thrown) {
- this.thrown = thrown;
- }
-
- /**
- * Get the probe that generated this result. May be null
- * @return a possibly null reference to a probe
- */
- public Probe getOriginator() {
- return originator;
- }
-
- /**
- * The probe has succeeded -capture the current timestamp, set
- * success to true, and record any other data needed.
- * @param probe probe
- */
- public void succeed(Probe probe) {
- finish(probe, true, probe.getName(), null);
- }
-
- /**
- * A probe has failed either because the test returned false, or an exception
- * was thrown. The {@link #success} field is set to false, any exception
- * thrown is recorded.
- * @param probe probe that failed
- * @param thrown an exception that was thrown.
- */
- public void fail(Probe probe, Throwable thrown) {
- finish(probe, false, "Failure in " + probe, thrown);
- }
-
- public void finish(Probe probe, boolean succeeded, String text, Throwable thrown) {
- setTimestamp(System.currentTimeMillis());
- setSuccess(succeeded);
- originator = probe;
- message = text;
- this.thrown = thrown;
- }
-
- @Override
- public String toString() {
- LogEntryBuilder builder = new LogEntryBuilder("Probe Status");
- builder.elt("time", timestampText)
- .elt("outcome", (success ? "success" : "failure"));
-
- if (success != realOutcome) {
- builder.elt("originaloutcome", (realOutcome ? "success" : "failure"));
- }
- builder.elt("message", message);
- if (thrown != null) {
- builder.elt("exception", thrown);
- }
-
- return builder.toString();
- }
-
- /**
- * Flip the success bit on while the real outcome bit is kept false
- */
- public void markAsSuccessful() {
- success = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
deleted file mode 100644
index 43f0e4e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.zk.ZookeeperUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base service for the standard slider client/server services
- */
-public abstract class AbstractSliderLaunchedService extends
- LaunchedWorkflowCompositeService {
- private static final Logger log =
- LoggerFactory.getLogger(AbstractSliderLaunchedService.class);
-
- protected AbstractSliderLaunchedService(String name) {
- super(name);
- // make sure all the yarn configs get loaded
- ConfigHelper.registerDeprecatedConfigItems();
- }
-
- /**
- * look up the registry quorum from the config
- * @return the quorum string
- * @throws BadConfigException if it is not there or invalid
- */
- public String lookupZKQuorum() throws BadConfigException {
-
- String registryQuorum = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
-
- // though if neither is set: trouble
- if (SliderUtils.isUnset(registryQuorum)) {
- throw new BadConfigException(
- "No Zookeeper quorum provided in the"
- + " configuration property " + RegistryConstants.KEY_REGISTRY_ZK_QUORUM
- );
- }
- ZookeeperUtils.splitToHostsAndPortsStrictly(registryQuorum);
- return registryQuorum;
- }
-
- /**
- * Create, adopt ,and start the YARN registration service
- * @return the registry operations service, already deployed as a child
- * of the AbstractSliderLaunchedService instance.
- */
- public RegistryOperations startRegistryOperationsService()
- throws BadConfigException {
-
- // push back the slider registry entry if needed
- RegistryOperations registryWriterService =
- createRegistryOperationsInstance();
- deployChildService(registryWriterService);
- return registryWriterService;
- }
-
- /**
- * Create the registry operations instance. This is to allow
- * subclasses to instantiate a subclass service
- * @return an instance to match to the lifecycle of this service
- */
- protected RegistryOperations createRegistryOperationsInstance() {
- return RegistryOperationsFactory.createInstance("YarnRegistry", getConfig());
- }
-
- /**
- * Utility method to require an argument to be set (non null, non-empty)
- * @param argname argument name
- * @param value value
- * @throws BadCommandArgumentsException if the condition is not met
- */
- protected static void requireArgumentSet(String argname, String value)
- throws BadCommandArgumentsException {
- require(isSet(value), "Required argument %s missing", argname );
- }
-
- /**
- * Require a condition to hold; throw {@link BadCommandArgumentsException} if not.
- * The exception text is the formatted message.
- * @param condition condition
- * @param message string to format
- * @param args list of arguments to format.
- * @throws BadCommandArgumentsException
- */
- protected static void require(boolean condition, String message,
- Object... args)
- throws BadCommandArgumentsException {
- if (!condition) {
- throw new BadCommandArgumentsException(message, args);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java
deleted file mode 100644
index 40ceab8..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
-
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Wait for a service to stop.
- *
- * WARNING: the notification may come in as soon as the service enters
- * the stopped state: it may take some time for the actual stop operation
- * to complete.
- */
-public class EndOfServiceWaiter implements ServiceStateChangeListener {
-
- private final AtomicBoolean finished = new AtomicBoolean(false);
- private final String name;
- private Service service;
-
- /**
- * Wait for a service; use the service name as this instance's name
- * @param service service
- */
- public EndOfServiceWaiter(Service service) {
- this(service.getName(), service);
- }
-
-
- /**
- * Wait for a service
- * @param name name for messages
- * @param service service
- */
- public EndOfServiceWaiter(String name, Service service) {
- this.name = name;
- this.service = service;
- service.registerServiceListener(this);
- }
-
- public synchronized void waitForServiceToStop(long timeout) throws
- InterruptedException, TimeoutException {
- service.waitForServiceToStop(timeout);
- if (!finished.get()) {
- wait(timeout);
- if (!finished.get()) {
- throw new TimeoutException(name
- + " did not finish after " + timeout +
- " milliseconds");
- }
- }
- }
-
- /**
- * Wait for service state change callbacks; notify self if the service has
- * now stopped
- * @param service service
- */
- @Override
- public synchronized void stateChanged(Service service) {
- if (service.isInState(Service.STATE.STOPPED)) {
- finished.set(true);
- notify();
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
deleted file mode 100644
index bcd1969..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.slider.core.main.LauncherExitCodes;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.server.services.workflow.WorkflowCompositeService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a workflow compositoe service which can be launched from the CLI
- * ... catches the arguments and implements a stub runService operation.
- */
-public class LaunchedWorkflowCompositeService extends WorkflowCompositeService
- implements RunService {
- private static final Logger log = LoggerFactory.getLogger(
- LaunchedWorkflowCompositeService.class);
- private String[] argv;
-
- public LaunchedWorkflowCompositeService(String name) {
- super(name);
- }
-
- public LaunchedWorkflowCompositeService(String name, Service... children) {
- super(name, children);
- }
-
- /**
- * Implementation of set-ness, groovy definition of true/false for a string
- * @param s
- * @return true iff the string is non-null and non-empty
- */
- protected static boolean isUnset(String s) {
- return StringUtils.isEmpty(s);
- }
-
- protected static boolean isSet(String s) {
- return StringUtils.isNotEmpty(s);
- }
-
- protected String[] getArgv() {
- return argv;
- }
-
- /**
- * Pre-init argument binding
- * @param config the initial configuration build up by the
- * service launcher.
- * @param args argument list list of arguments passed to the command line
- * after any launcher-specific commands have been stripped.
- * @return the configuration
- * @throws Exception
- */
- @Override
- public Configuration bindArgs(Configuration config, String... args) throws
- Exception {
- this.argv = args;
- if (log.isDebugEnabled()) {
- log.debug("Binding {} Arguments:", args.length);
-
- StringBuilder builder = new StringBuilder();
- for (String arg : args) {
- builder.append('"').append(arg).append("\" ");
- }
- log.debug(builder.toString());
- }
- return config;
- }
-
- @Override
- public int runService() throws Throwable {
- return LauncherExitCodes.EXIT_SUCCESS;
- }
-
- @Override
- public synchronized void addService(Service service) {
- Preconditions.checkArgument(service != null, "null service argument");
- super.addService(service);
- }
-
- /**
- * Run a child service -initing and starting it if this
- * service has already passed those parts of its own lifecycle
- * @param service the service to start
- */
- protected boolean deployChildService(Service service) {
- service.init(getConfig());
- addService(service);
- if (isInState(STATE.STARTED)) {
- service.start();
- return true;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java
deleted file mode 100644
index 6ab9de6..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.slider.server.appmaster.web.rest.RestPaths;
-
-import java.util.regex.Pattern;
-
-/**
- * Utility class to validate strings against a predefined pattern.
- */
-public class PatternValidator {
-
- public static final String E_INVALID_NAME =
- "Invalid name %s does not match the pattern pattern %s ";
- private final Pattern valid;
- private final String pattern;
-
- public PatternValidator(String pattern) {
- this.pattern = pattern;
- valid = Pattern.compile(pattern);
- }
-
- /**
- * Validate the name -restricting it to the set defined in
- * {@link RestPaths#PUBLISHED_CONFIGURATION_REGEXP}
- * @param name name to validate
- * @throws IllegalArgumentException if not a valid name
- */
- public void validate(String name) {
- if (!matches(name)) {
- throw new IllegalArgumentException(
- String.format(E_INVALID_NAME, name, pattern));
- }
- }
-
- /**
- * Query to see if the pattern matches
- * @param name name to validate
- * @return true if the string matches the pattern
- */
- public boolean matches(String name) {
- return valid.matcher(name).matches();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java
deleted file mode 100644
index ebfcb99..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.webapp.WebApp;
-
-/**
- * Contains a webapp reference and stops it in teardown if non-null
- * <p>
- * It does not start the application.
- * Access to the field is not synchronized across threads; it is the
- * responsibility of the caller.
- */
-public class WebAppService<T extends WebApp> extends AbstractService {
-
- private volatile T webApp;
-
- public WebAppService(String name) {
- super(name);
- }
-
- public WebAppService(String name, T app) {
- super(name);
- webApp = app;
- }
-
- public T getWebApp() {
- return webApp;
- }
-
- public void setWebApp(T webApp) {
- this.webApp = webApp;
- }
-
-
- @Override
- protected void serviceStart() throws Exception {
-
- }
-
- /**
- * Stop operation stops the webapp; sets the reference to null
- * @throws Exception
- */
- @Override
- protected void serviceStop() throws Exception {
- if (webApp != null) {
- webApp.stop();
- webApp = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
deleted file mode 100644
index 8b711aa..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.AbstractService;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Service that closes the closeable supplied during shutdown, if not null.
- *
- * As the Service interface itself extends Closeable, this service
- * can be used to shut down other services if desired.
- */
-public class ClosingService<C extends Closeable> extends AbstractService {
-
- private C closeable;
-
- public ClosingService(String name) {
- super(name);
- }
-
- /**
- * Construct an instance of the service
- * @param name service name
- * @param closeable closeable to close (may be null)
- */
- public ClosingService(String name,
- C closeable) {
- super(name);
- this.closeable = closeable;
- }
-
- /**
- * Construct an instance of the service, using the default name
- * @param closeable closeable to close (may be null)
- */
- public ClosingService(C closeable) {
- this("ClosingService", closeable);
- }
-
-
- /**
- * Get the closeable
- * @return the closeable
- */
- public synchronized C getCloseable() {
- return closeable;
- }
-
- /**
- * Set or update the closeable.
- * @param closeable
- */
- public synchronized void setCloseable(C closeable) {
- this.closeable = closeable;
- }
-
- /**
- * Stop routine will close the closeable -if not null - and set the
- * reference to null afterwards
- * This operation does raise any exception on the close, though it does
- * record it
- */
- @Override
- protected void serviceStop() {
- C target = getCloseable();
- if (target != null) {
- try {
- target.close();
- } catch (IOException ioe) {
- noteFailure(ioe);
- }
- setCloseable(null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
deleted file mode 100644
index 352be49..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.ServiceStateException;
-import org.apache.slider.core.main.ServiceLaunchException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Service wrapper for an external program that is launched and can/will terminate.
- * This service is notified when the subprocess terminates, and stops itself
- * and converts a non-zero exit code into a failure exception.
- *
- * <p>
- * Key Features:
- * <ol>
- * <li>The property {@link #executionTimeout} can be set to set a limit
- * on the duration of a process</li>
- * <li>Output is streamed to the output logger provided</li>.
- * <li>The most recent lines of output are saved to a linked list</li>.
- * <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start
- * and finish of a process.</li>
- * </ol>
- *
- * Usage:
- * <p></p>
- * The service can be built in the constructor, {@link #ForkedProcessService(String, Map, List)},
- * or have its simple constructor used to instantiate the service, then the
- * {@link #build(Map, List)} command used to define the environment variables
- * and list of commands to execute. One of these two options MUST be exercised
- * before calling the services's {@link #start()} method.
- * <p></p>
- * The forked process is executed in the service's {@link #serviceStart()} method;
- * if still running when the service is stopped, {@link #serviceStop()} will
- * attempt to stop it.
- * <p></p>
- *
- * The service delegates process execution to {@link LongLivedProcess},
- * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}.
- * When the service receives a callback notifying that the process has completed,
- * it calls its {@link #stop()} method. If the error code was non-zero,
- * the service is logged as having failed.
- */
-public class ForkedProcessService
- extends WorkflowExecutorService<ExecutorService>
- implements LongLivedProcessLifecycleEvent, Runnable {
-
- /**
- * Log for the forked master process
- */
- private static final Logger LOG =
- LoggerFactory.getLogger(ForkedProcessService.class);
-
- private final AtomicBoolean processTerminated = new AtomicBoolean(false);
- private boolean processStarted = false;
- private LongLivedProcess process;
- private int executionTimeout = -1;
- private int timeoutCode = 1;
- /**
- log to log to; defaults to this service log
- */
- private Logger processLog = LOG;
-
- /**
- * Exit code set when the spawned process exits
- */
- private AtomicInteger exitCode = new AtomicInteger(0);
-
- /**
- * Create an instance of the service
- * @param name a name
- */
- public ForkedProcessService(String name) {
- super(name);
- }
-
- /**
- * Create an instance of the service, set up the process
- * @param name a name
- * @param commandList list of commands is inserted on the front
- * @param env environment variables above those generated by
- * @throws IOException IO problems
- */
- public ForkedProcessService(String name,
- Map<String, String> env,
- List<String> commandList) throws IOException {
- super(name);
- build(env, commandList);
- }
-
- @Override //AbstractService
- protected void serviceStart() throws Exception {
- if (process == null) {
- throw new ServiceStateException("Process not yet configured");
- }
- //now spawn the process -expect updates via callbacks
- process.start();
- }
-
- @Override //AbstractService
- protected void serviceStop() throws Exception {
- completed();
- stopForkedProcess();
- }
-
- private void stopForkedProcess() {
- if (process != null) {
- process.stop();
- }
- }
-
- /**
- * Set the process log. This may be null for "do not log"
- * @param processLog process log
- */
- public void setProcessLog(Logger processLog) {
- this.processLog = processLog;
- process.setProcessLog(processLog);
- }
-
- /**
- * Set the timeout by which time a process must have finished -or -1 for forever
- * @param timeout timeout in milliseconds
- */
- public void setTimeout(int timeout, int code) {
- this.executionTimeout = timeout;
- this.timeoutCode = code;
- }
-
- /**
- * Build the process to execute when the service is started
- * @param commandList list of commands is inserted on the front
- * @param env environment variables above those generated by
- * @throws IOException IO problems
- */
- public void build(Map<String, String> env,
- List<String> commandList)
- throws IOException {
- assert process == null;
-
- process = new LongLivedProcess(getName(), processLog, commandList);
- process.setLifecycleCallback(this);
- //set the env variable mapping
- process.putEnvMap(env);
- }
-
- @Override // notification from executed process
- public synchronized void onProcessStarted(LongLivedProcess process) {
- LOG.debug("Process has started");
- processStarted = true;
- if (executionTimeout > 0) {
- setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
- execute(this);
- }
- }
-
- @Override // notification from executed process
- public void onProcessExited(LongLivedProcess process,
- int uncorrected,
- int code) {
- try {
- synchronized (this) {
- completed();
- //note whether or not the service had already stopped
- LOG.debug("Process has exited with exit code {}", code);
- if (code != 0) {
- reportFailure(code, getName() + " failed with code " + code);
- }
- }
- } finally {
- stop();
- }
- }
-
- private void reportFailure(int code, String text) {
- //error
- ServiceLaunchException execEx = new ServiceLaunchException(code, text);
- LOG.debug("Noting failure", execEx);
- noteFailure(execEx);
- }
-
- /**
- * handle timeout response by escalating it to a failure
- */
- @Override
- public void run() {
- try {
- synchronized (processTerminated) {
- if (!processTerminated.get()) {
- processTerminated.wait(executionTimeout);
- }
- }
-
- } catch (InterruptedException e) {
- //assume signalled; exit
- }
- //check the status; if the marker isn't true, bail
- if (!processTerminated.getAndSet(true)) {
- LOG.info("process timeout: reporting error code {}", timeoutCode);
-
- //timeout
- if (isInState(STATE.STARTED)) {
- //trigger a failure
- stopForkedProcess();
- }
- reportFailure(timeoutCode, getName() + ": timeout after " + executionTimeout
- + " millis: exit code =" + timeoutCode);
- }
- }
-
- /**
- * Note the process as having completed.
- * The process marked as terminated
- * -and anything synchronized on <code>processTerminated</code>
- * is notified
- */
- protected void completed() {
- processTerminated.set(true);
- synchronized (processTerminated) {
- processTerminated.notify();
- }
- }
-
- public boolean isProcessTerminated() {
- return processTerminated.get();
- }
-
- public synchronized boolean isProcessStarted() {
- return processStarted;
- }
-
- /**
- * Is a process running: between started and terminated
- * @return true if the process is up.
- */
- public synchronized boolean isProcessRunning() {
- return processStarted && !isProcessTerminated();
- }
-
-
- public Integer getExitCode() {
- return process.getExitCode();
- }
-
- public int getExitCodeSignCorrected() {
- Integer exitCode = process.getExitCodeSignCorrected();
- if (exitCode == null) return -1;
- return exitCode;
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- * @return a possibly empty list
- */
- public List<String> getRecentOutput() {
- return process != null
- ? process.getRecentOutput()
- : new LinkedList<String>();
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- *
- * @param finalOutput flag to indicate "wait for the final output of the process"
- * @param duration the duration, in ms,
- * to wait for recent output to become non-empty
- * @return a possibly empty list
- */
- public List<String> getRecentOutput(boolean finalOutput, int duration) {
- if (process == null) {
- return new LinkedList<>();
- }
- return process.getRecentOutput(finalOutput, duration);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
deleted file mode 100644
index 90a8d40..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Execute a long-lived process.
- *
- * <p>
- * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
- * a short lived application; this class allows for the process to run for the
- * life of the Java process that forked it.
- * It is designed to be embedded inside a YARN service, though this is not
- * the sole way that it can be used
- * <p>
- * Key Features:
- * <ol>
- * <li>Output is streamed to the output logger provided</li>.
- * <li>the input stream is closed as soon as the process starts.</li>
- * <li>The most recent lines of output are saved to a linked list</li>.
- * <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent},
- * is raised on the start and finish of a process.</li>
- * </ol>
- *
- */
-public class LongLivedProcess implements Runnable {
- /**
- * Limit on number of lines to retain in the "recent" line list:{@value}
- */
- public static final int RECENT_LINE_LOG_LIMIT = 64;
-
- /**
- * Const defining the time in millis between polling for new text.
- */
- private static final int STREAM_READER_SLEEP_TIME = 200;
-
- /**
- * limit on the length of a stream before it triggers an automatic newline.
- */
- private static final int LINE_LENGTH = 256;
- private final ProcessBuilder processBuilder;
- private Process process;
- private Integer exitCode = null;
- private final String name;
- private final ExecutorService processExecutor;
- private final ExecutorService logExecutor;
-
- private ProcessStreamReader processStreamReader;
- //list of recent lines, recorded for extraction into reports
- private final List<String> recentLines = new LinkedList<>();
- private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
- private LongLivedProcessLifecycleEvent lifecycleCallback;
- private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false);
-
- /**
- * Log supplied in the constructor for the spawned process -accessible
- * to inner classes
- */
- private Logger processLog;
-
- /**
- * Class log -accessible to inner classes
- */
- private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
-
- /**
- * flag to indicate that the process is done
- */
- private final AtomicBoolean finished = new AtomicBoolean(false);
-
- /**
- * Create an instance
- * @param name process name
- * @param processLog log for output (or null)
- * @param commands command list
- */
- public LongLivedProcess(String name,
- Logger processLog,
- List<String> commands) {
- Preconditions.checkArgument(commands != null, "commands");
-
- this.name = name;
- this.processLog = processLog;
- ServiceThreadFactory factory = new ServiceThreadFactory(name, true);
- processExecutor = Executors.newSingleThreadExecutor(factory);
- logExecutor = Executors.newSingleThreadExecutor(factory);
- processBuilder = new ProcessBuilder(commands);
- processBuilder.redirectErrorStream(false);
- }
-
- /**
- * Set the limit on recent lines to retain
- * @param recentLineLimit size of rolling list of recent lines.
- */
- public void setRecentLineLimit(int recentLineLimit) {
- this.recentLineLimit = recentLineLimit;
- }
-
- /**
- * Set an optional application exit callback
- * @param lifecycleCallback callback to notify on application exit
- */
- public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
- this.lifecycleCallback = lifecycleCallback;
- }
-
- /**
- * Add an entry to the environment
- * @param envVar envVar -must not be null
- * @param val value
- */
- public void setEnv(String envVar, String val) {
- Preconditions.checkArgument(envVar != null, "envVar");
- Preconditions.checkArgument(val != null, "val");
- processBuilder.environment().put(envVar, val);
- }
-
- /**
- * Bulk set the environment from a map. This does
- * not replace the existing environment, just extend it/overwrite single
- * entries.
- * @param map map to add
- */
- public void putEnvMap(Map<String, String> map) {
- for (Map.Entry<String, String> entry : map.entrySet()) {
- String val = entry.getValue();
- String key = entry.getKey();
- setEnv(key, val);
- }
- }
-
- /**
- * Get the process environment
- * @param variable environment variable
- * @return the value or null if there is no match
- */
- public String getEnv(String variable) {
- return processBuilder.environment().get(variable);
- }
-
- /**
- * Set the process log. Ignored once the process starts
- * @param processLog new log ... may be null
- */
- public void setProcessLog(Logger processLog) {
- this.processLog = processLog;
- }
-
- /**
- * Get the process reference
- * @return the process -null if the process is not started
- */
- public Process getProcess() {
- return process;
- }
-
- /**
- * Get the process builder -this can be manipulated
- * up to the start() operation. As there is no synchronization
- * around it, it must only be used in the same thread setting up the commmand.
- * @return the process builder
- */
- public ProcessBuilder getProcessBuilder() {
- return processBuilder;
- }
-
- /**
- * Get the command list
- * @return the comands
- */
- public List<String> getCommands() {
- return processBuilder.command();
- }
-
- public String getCommand() {
- return getCommands().get(0);
- }
-
- /**
- * probe to see if the process is running
- * @return true iff the process has been started and is not yet finished
- */
- public boolean isRunning() {
- return process != null && !finished.get();
- }
-
- /**
- * Get the exit code: null until the process has finished
- * @return the exit code or null
- */
- public Integer getExitCode() {
- return exitCode;
- }
-
- /**
- * Get the exit code sign corrected: null until the process has finished
- * @return the exit code or null
- */
- public Integer getExitCodeSignCorrected() {
- Integer result;
- if (exitCode != null) {
- result = (exitCode << 24) >> 24;
- } else {
- result = null;
- }
- return result;
- }
-
- /**
- * Stop the process if it is running.
- * This will trigger an application completion event with the given exit code
- */
- public void stop() {
- if (!isRunning()) {
- return;
- }
- process.destroy();
- }
-
- /**
- * Get a text description of the builder suitable for log output
- * @return a multiline string
- */
- protected String describeBuilder() {
- StringBuilder buffer = new StringBuilder();
- for (String arg : processBuilder.command()) {
- buffer.append('"').append(arg).append("\" ");
- }
- return buffer.toString();
- }
-
- /**
- * Dump the environment to a string builder
- * @param buffer the buffer to append to
- */
- public void dumpEnv(StringBuilder buffer) {
- buffer.append("\nEnvironment\n-----------");
- Map<String, String> env = processBuilder.environment();
- Set<String> keys = env.keySet();
- List<String> sortedKeys = new ArrayList<String>(keys);
- Collections.sort(sortedKeys);
- for (String key : sortedKeys) {
- buffer.append(key).append("=").append(env.get(key)).append('\n');
- }
- }
-
- /**
- * Exec the process
- * @return the process
- * @throws IOException on aany failure to start the process
- * @throws FileNotFoundException if the process could not be found
- */
- private Process spawnChildProcess() throws IOException {
- if (process != null) {
- throw new IOException("Process already started");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Spawning process:\n " + describeBuilder());
- }
- try {
- process = processBuilder.start();
- } catch (IOException e) {
- // on windows, upconvert DOS error 2 from ::CreateProcess()
- // to its real meaning: FileNotFound
- if (e.toString().contains("CreateProcess error=2")) {
- FileNotFoundException fnfe =
- new FileNotFoundException(e.toString());
- fnfe.initCause(e);
- throw fnfe;
- } else {
- throw e;
- }
- }
- return process;
- }
-
- /**
- * Entry point for waiting for the program to finish
- */
- @Override // Runnable
- public void run() {
- Preconditions.checkNotNull(process, "null process");
- LOG.debug("Lifecycle callback thread running");
- //notify the callback that the process has started
- if (lifecycleCallback != null) {
- lifecycleCallback.onProcessStarted(this);
- }
- try {
- //close stdin for the process
- IOUtils.closeStream(process.getOutputStream());
- exitCode = process.waitFor();
- } catch (InterruptedException e) {
- LOG.debug("Process wait interrupted -exiting thread", e);
- } finally {
- //here the process has finished
- LOG.debug("process {} has finished", name);
- //tell the logger it has to finish too
- finished.set(true);
-
- // shut down the threads
- logExecutor.shutdown();
- try {
- logExecutor.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {
- //ignored
- }
-
- //now call the callback if it is set
- if (lifecycleCallback != null) {
- lifecycleCallback.onProcessExited(this, exitCode,
- getExitCodeSignCorrected());
- }
- }
- }
-
- /**
- * Spawn the application
- * @throws IOException IO problems
- */
- public void start() throws IOException {
-
- spawnChildProcess();
- processStreamReader =
- new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME);
- logExecutor.submit(processStreamReader);
- processExecutor.submit(this);
- }
-
- /**
- * Get the lines of recent output
- * @return the last few lines of output; an empty list if there are none
- * or the process is not actually running
- */
- public synchronized List<String> getRecentOutput() {
- return new ArrayList<String>(recentLines);
- }
-
- /**
- * @return whether lines of recent output are empty
- */
- public synchronized boolean isRecentOutputEmpty() {
- return recentLines.isEmpty();
- }
-
- /**
- * Query to see if the final output has been processed
- * @return
- */
- public boolean isFinalOutputProcessed() {
- return finalOutputProcessed.get();
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- *
- * @param finalOutput flag to indicate "wait for the final output of the process"
- * @param duration the duration, in ms,
- * ro wait for recent output to become non-empty
- * @return a possibly empty list
- */
- public List<String> getRecentOutput(boolean finalOutput, int duration) {
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start <= duration) {
- boolean finishedOutput;
- if (finalOutput) {
- // final flag means block until all data is done
- finishedOutput = isFinalOutputProcessed();
- } else {
- // there is some output
- finishedOutput = !isRecentOutputEmpty();
- }
- if (finishedOutput) {
- break;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- return getRecentOutput();
- }
-
- /**
- * add the recent line to the list of recent lines; deleting
- * an earlier on if the limit is reached.
- *
- * Implementation note: yes, a circular array would be more
- * efficient, especially with some power of two as the modulo,
- * but is it worth the complexity and risk of errors for
- * something that is only called once per line of IO?
- * @param line line to record
- * @param isErrorStream is the line from the error stream
- * @param logger logger to log to - null for no logging
- */
- private synchronized void recordRecentLine(String line,
- boolean isErrorStream,
- Logger logger) {
- if (line == null) {
- return;
- }
- String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line;
- recentLines.add(entry);
- if (recentLines.size() > recentLineLimit) {
- recentLines.remove(0);
- }
- if (logger != null) {
- if (isErrorStream) {
- logger.warn(line);
- } else {
- logger.info(line);
- }
- }
- }
-
- /**
- * Class to read data from the two process streams, and, when run in a thread
- * to keep running until the <code>done</code> flag is set.
- * Lines are fetched from stdout and stderr and logged at info and error
- * respectively.
- */
-
- private class ProcessStreamReader implements Runnable {
- private final Logger streamLog;
- private final int sleepTime;
-
- /**
- * Create an instance
- * @param streamLog log -or null to disable logging (recent entries
- * will still be retained)
- * @param sleepTime time to sleep when stopping
- */
- private ProcessStreamReader(Logger streamLog, int sleepTime) {
- this.streamLog = streamLog;
- this.sleepTime = sleepTime;
- }
-
- /**
- * Return a character if there is one, -1 if nothing is ready yet
- * @param reader reader
- * @return the value from the reader, or -1 if it is not ready
- * @throws IOException IO problems
- */
- private int readCharNonBlocking(BufferedReader reader) throws IOException {
- if (reader.ready()) {
- return reader.read();
- } else {
- return -1;
- }
- }
-
- /**
- * Read in a line, or, if the limit has been reached, the buffer
- * so far
- * @param reader source of data
- * @param line line to build
- * @param limit limit of line length
- * @return true if the line can be printed
- * @throws IOException IO trouble
- */
- @SuppressWarnings("NestedAssignment")
- private boolean readAnyLine(BufferedReader reader,
- StringBuilder line,
- int limit)
- throws IOException {
- int next;
- while ((-1 != (next = readCharNonBlocking(reader)))) {
- if (next != '\n') {
- line.append((char) next);
- limit--;
- if (line.length() > limit) {
- //enough has been read in to print it any
- return true;
- }
- } else {
- //line end return flag to say so
- return true;
- }
- }
- //here the end of the stream is hit, or the limit
- return false;
- }
-
-
- @Override //Runnable
- @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
- public void run() {
- BufferedReader errReader = null;
- BufferedReader outReader = null;
- StringBuilder outLine = new StringBuilder(LINE_LENGTH);
- StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
- try {
- errReader = new BufferedReader(
- new InputStreamReader(process.getErrorStream(), "UTF-8"));
- outReader = new BufferedReader(
- new InputStreamReader(process.getInputStream(), "UTF-8"));
- while (!finished.get()) {
- boolean processed = false;
- if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
- recordRecentLine(errorLine.toString(), true, streamLog);
- errorLine.setLength(0);
- processed = true;
- }
- if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
- recordRecentLine(outLine.toString(), false, streamLog);
- outLine.setLength(0);
- processed |= true;
- }
- if (!processed && !finished.get()) {
- //nothing processed: wait a bit for data.
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- //ignore this, rely on the done flag
- LOG.debug("Ignoring ", e);
- }
- }
- }
- // finished: cleanup
-
- //print the current error line then stream through the rest
- recordFinalOutput(errReader, errorLine, true, streamLog);
- //now do the info line
- recordFinalOutput(outReader, outLine, false, streamLog);
-
- } catch (Exception ignored) {
- LOG.warn("encountered {}", ignored, ignored);
- //process connection has been torn down
- } finally {
- // close streams
- IOUtils.closeStream(errReader);
- IOUtils.closeStream(outReader);
- //mark output as done
- finalOutputProcessed.set(true);
- }
- }
-
- /**
- * Record the final output of a process stream
- * @param reader reader of output
- * @param lineBuilder string builder into which line is built
- * @param isErrorStream flag to indicate whether or not this is the
- * is the line from the error stream
- * @param logger logger to log to
- * @throws IOException
- */
- protected void recordFinalOutput(BufferedReader reader,
- StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws
- IOException {
- String line = lineBuilder.toString();
- recordRecentLine(line, isErrorStream, logger);
- line = reader.readLine();
- while (line != null) {
- recordRecentLine(line, isErrorStream, logger);
- line = reader.readLine();
- if (Thread.interrupted()) {
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
deleted file mode 100644
index a13b508..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-/**
- * Callback when a long-lived application exits
- */
-public interface LongLivedProcessLifecycleEvent {
-
- /**
- * Callback when a process is started
- * @param process the process invoking the callback
- */
- void onProcessStarted(LongLivedProcess process);
-
- /**
- * Callback when a process has finished
- * @param process the process invoking the callback
- * @param exitCode exit code from the process
- * @param signCorrectedCode the code- as sign corrected
- */
- void onProcessExited(LongLivedProcess process,
- int exitCode,
- int signCorrectedCode);
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
deleted file mode 100644
index a123584..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.Service;
-
-import java.util.List;
-
-/**
- * Interface for accessing services that contain one or more child
- * services.
- */
-public interface ServiceParent extends Service {
-
- /**
- * Add a child service. It must be in a consistent state with the
- * service to which it is being added.
- * @param service the service to add.
- */
- void addService(Service service);
-
- /**
- * Get an unmodifiable list of services
- * @return a list of child services at the time of invocation -
- * added services will not be picked up.
- */
- List<Service> getServices();
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
deleted file mode 100644
index 5ebf77c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.Service;
-
-import java.util.concurrent.Callable;
-
-/**
- * A runnable which terminates its owner; it also catches any
- * exception raised and can serve it back.
- *
- */
-public class ServiceTerminatingCallable<V> implements Callable<V> {
-
- private final Service owner;
- private Exception exception;
- /**
- * This is the callback
- */
- private final Callable<V> callable;
-
-
- /**
- * Create an instance. If the owner is null, the owning service
- * is not terminated.
- * @param owner owning service -can be null
- * @param callable callback.
- */
- public ServiceTerminatingCallable(Service owner,
- Callable<V> callable) {
- Preconditions.checkArgument(callable != null, "null callable");
- this.owner = owner;
- this.callable = callable;
- }
-
-
- /**
- * Get the owning service
- * @return the service to receive notification when
- * the runnable completes.
- */
- public Service getOwner() {
- return owner;
- }
-
- /**
- * Any exception raised by inner <code>action's</code> run.
- * @return an exception or null.
- */
- public Exception getException() {
- return exception;
- }
-
- /**
- * Delegates the call to the callable supplied in the constructor,
- * then calls the stop() operation on its owner. Any exception
- * is caught, noted and rethrown
- * @return the outcome of the delegated call operation
- * @throws Exception if one was raised.
- */
- @Override
- public V call() throws Exception {
- try {
- return callable.call();
- } catch (Exception e) {
- exception = e;
- throw e;
- } finally {
- if (owner != null) {
- owner.stop();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
deleted file mode 100644
index dc591df..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.Service;
-
-/**
- * A runnable which terminates its after running; it also catches any
- * exception raised and can serve it back.
- */
-public class ServiceTerminatingRunnable implements Runnable {
-
- private final Service owner;
- private final Runnable action;
- private Exception exception;
-
- /**
- * Create an instance.
- * @param owner owning service
- * @param action action to execute before terminating the service
- */
- public ServiceTerminatingRunnable(Service owner, Runnable action) {
- Preconditions.checkArgument(owner != null, "null owner");
- Preconditions.checkArgument(action != null, "null action");
- this.owner = owner;
- this.action = action;
- }
-
- /**
- * Get the owning service.
- * @return the service to receive notification when
- * the runnable completes.
- */
- public Service getOwner() {
- return owner;
- }
-
- /**
- * Any exception raised by inner <code>action's</code> run.
- * @return an exception or null.
- */
- public Exception getException() {
- return exception;
- }
-
- @Override
- public void run() {
- try {
- action.run();
- } catch (Exception e) {
- exception = e;
- }
- owner.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
deleted file mode 100644
index 737197b..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A thread factory that creates threads (possibly daemon threads)
- * using the name and naming policy supplied.
- * The thread counter starts at 1, increments atomically,
- * and is supplied as the second argument in the format string.
- *
- * A static method, {@link #singleThreadExecutor(String, boolean)},
- * exists to simplify the construction of an executor with a single well-named
- * threads.
- *
- * Example
- * <pre>
- * ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
- * </pre>
- */
-public class ServiceThreadFactory implements ThreadFactory {
-
- private static final AtomicInteger counter = new AtomicInteger(1);
-
- /**
- * Default format for thread names: {@value}.
- */
- public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
- private final String name;
- private final boolean daemons;
- private final String namingFormat;
-
- /**
- * Create an instance
- * @param name base thread name
- * @param daemons flag to indicate the threads should be marked as daemons
- * @param namingFormat format string to generate thread names from
- */
- public ServiceThreadFactory(String name,
- boolean daemons,
- String namingFormat) {
- Preconditions.checkArgument(name != null, "null name");
- Preconditions.checkArgument(namingFormat != null, "null naming format");
- this.name = name;
- this.daemons = daemons;
- this.namingFormat = namingFormat;
- }
-
- /**
- * Create an instance with the default naming format.
- * @param name base thread name
- * @param daemons flag to indicate the threads should be marked as daemons
- */
- public ServiceThreadFactory(String name,
- boolean daemons) {
- this(name, daemons, DEFAULT_NAMING_FORMAT);
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Preconditions.checkArgument(r != null, "null runnable");
- String threadName =
- String.format(namingFormat, name, counter.getAndIncrement());
- Thread thread = new Thread(r, threadName);
- thread.setDaemon(daemons);
- return thread;
- }
-
- /**
- * Create a single thread executor using this naming policy.
- * @param name base thread name
- * @param daemons flag to indicate the threads should be marked as daemons
- * @return an executor
- */
- public static ExecutorService singleThreadExecutor(String name,
- boolean daemons) {
- return Executors.newSingleThreadExecutor(
- new ServiceThreadFactory(name, daemons));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d44876af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
deleted file mode 100644
index 65d14b7..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A service that calls the supplied callback when it is started -after the
- * given delay.
- *
- * It can be configured to stop itself after the callback has
- * completed, marking any exception raised as the exception of this service.
- * The notifications come in on a callback thread -a thread that is only
- * started in this service's <code>start()</code> operation.
- */
-public class WorkflowCallbackService<V> extends
- WorkflowScheduledExecutorService<ScheduledExecutorService> {
- protected static final Logger LOG =
- LoggerFactory.getLogger(WorkflowCallbackService.class);
-
- /**
- * This is the callback.
- */
- private final Callable<V> callback;
- private final int delay;
- private final ServiceTerminatingCallable<V> command;
-
- private ScheduledFuture<V> scheduledFuture;
-
- /**
- * Create an instance of the service
- * @param name service name
- * @param callback callback to invoke
- * @param delay delay -or 0 for no delay
- * @param terminate terminate this service after the callback?
- */
- public WorkflowCallbackService(String name,
- Callable<V> callback,
- int delay,
- boolean terminate) {
- super(name);
- Preconditions.checkNotNull(callback, "Null callback argument");
- this.callback = callback;
- this.delay = delay;
- command = new ServiceTerminatingCallable<V>(
- terminate ? this : null,
- callback);
- }
-
- public ScheduledFuture<V> getScheduledFuture() {
- return scheduledFuture;
- }
-
- @Override
- protected void serviceStart() throws Exception {
- LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
- ScheduledExecutorService executorService =
- Executors.newSingleThreadScheduledExecutor(
- new ServiceThreadFactory(getName(), true));
- setExecutor(executorService);
- scheduledFuture =
- executorService.schedule(command, delay, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Stop the service.
- * If there is any exception noted from any executed notification,
- * note the exception in this class
- * @throws Exception exception.
- */
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- // propagate any failure
- if (getCallbackException() != null) {
- throw getCallbackException();
- }
- }
-
- /**
- * Get the exception raised by a callback. Will always be null if the
- * callback has not been executed; will only be non-null after any success.
- * @return a callback
- */
- public Exception getCallbackException() {
- return command.getException();
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org