You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2016/08/25 20:19:40 UTC
[11/46] incubator-slider git commit: SLIDER-1165 Create
yarn-native-services branch on Slider corresponding to the
yarn-native-services branch on Hadoop
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/security/StoresGenerator.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/security/StoresGenerator.java b/slider-core/src/main/java/org/apache/slider/server/services/security/StoresGenerator.java
deleted file mode 100644
index 226250f..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/security/StoresGenerator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.slider.server.services.security;
-
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.exceptions.SliderException;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- *
- */
-public class StoresGenerator {
-
- static CertificateManager certMgr = new CertificateManager();
- private static SecurityStoreGenerator[] GENERATORS = {
- new KeystoreGenerator(certMgr), new TruststoreGenerator(certMgr)
- };
-
- public static SecurityStore[] generateSecurityStores(String hostname,
- String containerId,
- String role,
- AggregateConf instanceDefinition,
- MapOperations compOps)
- throws SliderException, IOException {
- //discover which stores need generation based on the passwords configured
- List<SecurityStore> files = new ArrayList<SecurityStore>();
- for (SecurityStoreGenerator generator : GENERATORS) {
- if (generator.isStoreRequested(compOps)) {
- SecurityStore store = generator.generate(hostname,
- containerId,
- instanceDefinition,
- compOps,
- role);
- if (store != null) {
- files.add(store);
- }
- }
- }
-
- if (files.isEmpty()) {
- throw new SliderException("Security stores were requested but none were "
- + "generated. Check the AM logs and ensure "
- + "passwords are configured for the components "
- + "requiring the stores.");
- }
- return files.toArray(new SecurityStore[files.size()]);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/security/TruststoreGenerator.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/security/TruststoreGenerator.java b/slider-core/src/main/java/org/apache/slider/server/services/security/TruststoreGenerator.java
deleted file mode 100644
index d16dcbd..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/security/TruststoreGenerator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.slider.server.services.security;
-
-import org.apache.slider.common.SliderKeys;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.exceptions.SliderException;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class TruststoreGenerator extends AbstractSecurityStoreGenerator {
-
-
- public TruststoreGenerator(CertificateManager certificateMgr) {
- super(certificateMgr);
- }
-
- @Override
- public SecurityStore generate(String hostname, String containerId,
- AggregateConf instanceDefinition,
- MapOperations compOps, String role)
- throws SliderException, IOException {
- SecurityStore truststore = null;
- String password = getStorePassword(
- instanceDefinition.getAppConf().credentials, compOps, role);
- if (password != null) {
- truststore = certificateMgr.generateContainerTruststore(containerId,
- role, password);
- }
- return truststore;
- }
-
- @Override
- String getPassword(MapOperations compOps) {
- return compOps.get(
- compOps.get(SliderKeys.COMP_TRUSTSTORE_PASSWORD_PROPERTY_KEY));
- }
-
- @Override
- String getAlias(MapOperations compOps) {
- return compOps.getOption(SliderKeys.COMP_TRUSTSTORE_PASSWORD_ALIAS_KEY,
- SliderKeys.COMP_TRUSTSTORE_PASSWORD_ALIAS_DEFAULT);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
deleted file mode 100644
index 1622309..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.exceptions.BadConfigException;
-import org.apache.slider.core.zk.ZookeeperUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base service for the standard slider client/server services
- */
-public abstract class AbstractSliderLaunchedService extends
- LaunchedWorkflowCompositeService {
- private static final Logger log =
- LoggerFactory.getLogger(AbstractSliderLaunchedService.class);
-
- protected AbstractSliderLaunchedService(String name) {
- super(name);
- // make sure all the yarn configs get loaded
- YarnConfiguration conf = new YarnConfiguration();
- ConfigHelper.registerDeprecatedConfigItems();
- }
-
- /**
- * look up the registry quorum from the config
- * @return the quorum string
- * @throws BadConfigException if it is not there or invalid
- */
- public String lookupZKQuorum() throws BadConfigException {
-
- String registryQuorum = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM);
-
- // though if neither is set: trouble
- if (SliderUtils.isUnset(registryQuorum)) {
- throw new BadConfigException(
- "No Zookeeper quorum provided in the"
- + " configuration property " + RegistryConstants.KEY_REGISTRY_ZK_QUORUM
- );
- }
- ZookeeperUtils.splitToHostsAndPortsStrictly(registryQuorum);
- return registryQuorum;
- }
-
- /**
- * Create, adopt ,and start the YARN registration service
- * @return the registry operations service, already deployed as a child
- * of the AbstractSliderLaunchedService instance.
- */
- public RegistryOperations startRegistryOperationsService()
- throws BadConfigException {
-
- // push back the slider registry entry if needed
- String quorum = lookupZKQuorum();
- RegistryOperations registryWriterService =
- createRegistryOperationsInstance();
- deployChildService(registryWriterService);
- return registryWriterService;
- }
-
- /**
- * Create the registry operations instance. This is to allow
- * subclasses to instantiate a subclass service
- * @return an instance to match to the lifecycle of this service
- */
- protected RegistryOperations createRegistryOperationsInstance() {
- return RegistryOperationsFactory.createInstance("YarnRegistry", getConfig());
- }
-
- /**
- * Utility method to require an argument to be set (non null, non-empty)
- * @param argname argument name
- * @param value value
- * @throws BadCommandArgumentsException if the condition is not met
- */
- protected static void requireArgumentSet(String argname, String value)
- throws BadCommandArgumentsException {
- require(isSet(value), "Required argument %s missing", argname );
- }
-
- /**
- * Require a condition to hold; throw {@link BadCommandArgumentsException} if not.
- * The exception text is the formatted message.
- * @param condition condition
- * @param message string to format
- * @param args list of arguments to format.
- * @throws BadCommandArgumentsException
- */
- protected static void require(boolean condition, String message,
- Object... args)
- throws BadCommandArgumentsException {
- if (!condition) {
- throw new BadCommandArgumentsException(message, args);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java
deleted file mode 100644
index 40ceab8..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/EndOfServiceWaiter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
-
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Wait for a service to stop.
- *
- * WARNING: the notification may come in as soon as the service enters
- * the stopped state: it may take some time for the actual stop operation
- * to complete.
- */
-public class EndOfServiceWaiter implements ServiceStateChangeListener {
-
- private final AtomicBoolean finished = new AtomicBoolean(false);
- private final String name;
- private Service service;
-
- /**
- * Wait for a service; use the service name as this instance's name
- * @param service service
- */
- public EndOfServiceWaiter(Service service) {
- this(service.getName(), service);
- }
-
-
- /**
- * Wait for a service
- * @param name name for messages
- * @param service service
- */
- public EndOfServiceWaiter(String name, Service service) {
- this.name = name;
- this.service = service;
- service.registerServiceListener(this);
- }
-
- public synchronized void waitForServiceToStop(long timeout) throws
- InterruptedException, TimeoutException {
- service.waitForServiceToStop(timeout);
- if (!finished.get()) {
- wait(timeout);
- if (!finished.get()) {
- throw new TimeoutException(name
- + " did not finish after " + timeout +
- " milliseconds");
- }
- }
- }
-
- /**
- * Wait for service state change callbacks; notify self if the service has
- * now stopped
- * @param service service
- */
- @Override
- public synchronized void stateChanged(Service service) {
- if (service.isInState(Service.STATE.STOPPED)) {
- finished.set(true);
- notify();
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
deleted file mode 100644
index bcd1969..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.slider.core.main.LauncherExitCodes;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.server.services.workflow.WorkflowCompositeService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This is a workflow compositoe service which can be launched from the CLI
- * ... catches the arguments and implements a stub runService operation.
- */
-public class LaunchedWorkflowCompositeService extends WorkflowCompositeService
- implements RunService {
- private static final Logger log = LoggerFactory.getLogger(
- LaunchedWorkflowCompositeService.class);
- private String[] argv;
-
- public LaunchedWorkflowCompositeService(String name) {
- super(name);
- }
-
- public LaunchedWorkflowCompositeService(String name, Service... children) {
- super(name, children);
- }
-
- /**
- * Implementation of set-ness, groovy definition of true/false for a string
- * @param s
- * @return true iff the string is non-null and non-empty
- */
- protected static boolean isUnset(String s) {
- return StringUtils.isEmpty(s);
- }
-
- protected static boolean isSet(String s) {
- return StringUtils.isNotEmpty(s);
- }
-
- protected String[] getArgv() {
- return argv;
- }
-
- /**
- * Pre-init argument binding
- * @param config the initial configuration build up by the
- * service launcher.
- * @param args argument list list of arguments passed to the command line
- * after any launcher-specific commands have been stripped.
- * @return the configuration
- * @throws Exception
- */
- @Override
- public Configuration bindArgs(Configuration config, String... args) throws
- Exception {
- this.argv = args;
- if (log.isDebugEnabled()) {
- log.debug("Binding {} Arguments:", args.length);
-
- StringBuilder builder = new StringBuilder();
- for (String arg : args) {
- builder.append('"').append(arg).append("\" ");
- }
- log.debug(builder.toString());
- }
- return config;
- }
-
- @Override
- public int runService() throws Throwable {
- return LauncherExitCodes.EXIT_SUCCESS;
- }
-
- @Override
- public synchronized void addService(Service service) {
- Preconditions.checkArgument(service != null, "null service argument");
- super.addService(service);
- }
-
- /**
- * Run a child service -initing and starting it if this
- * service has already passed those parts of its own lifecycle
- * @param service the service to start
- */
- protected boolean deployChildService(Service service) {
- service.init(getConfig());
- addService(service);
- if (isInState(STATE.STARTED)) {
- service.start();
- return true;
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java
deleted file mode 100644
index 6ab9de6..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/PatternValidator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.slider.server.appmaster.web.rest.RestPaths;
-
-import java.util.regex.Pattern;
-
-/**
- * Utility class to validate strings against a predefined pattern.
- */
-public class PatternValidator {
-
- public static final String E_INVALID_NAME =
- "Invalid name %s does not match the pattern pattern %s ";
- private final Pattern valid;
- private final String pattern;
-
- public PatternValidator(String pattern) {
- this.pattern = pattern;
- valid = Pattern.compile(pattern);
- }
-
- /**
- * Validate the name -restricting it to the set defined in
- * {@link RestPaths#PUBLISHED_CONFIGURATION_REGEXP}
- * @param name name to validate
- * @throws IllegalArgumentException if not a valid name
- */
- public void validate(String name) {
- if (!matches(name)) {
- throw new IllegalArgumentException(
- String.format(E_INVALID_NAME, name, pattern));
- }
- }
-
- /**
- * Query to see if the pattern matches
- * @param name name to validate
- * @return true if the string matches the pattern
- */
- public boolean matches(String name) {
- return valid.matcher(name).matches();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java
deleted file mode 100644
index ebfcb99..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/WebAppService.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.webapp.WebApp;
-
-/**
- * Contains a webapp reference and stops it in teardown if non-null
- * <p>
- * It does not start the application.
- * Access to the field is not synchronized across threads; it is the
- * responsibility of the caller.
- */
-public class WebAppService<T extends WebApp> extends AbstractService {
-
- private volatile T webApp;
-
- public WebAppService(String name) {
- super(name);
- }
-
- public WebAppService(String name, T app) {
- super(name);
- webApp = app;
- }
-
- public T getWebApp() {
- return webApp;
- }
-
- public void setWebApp(T webApp) {
- this.webApp = webApp;
- }
-
-
- @Override
- protected void serviceStart() throws Exception {
-
- }
-
- /**
- * Stop operation stops the webapp; sets the reference to null
- * @throws Exception
- */
- @Override
- protected void serviceStop() throws Exception {
- if (webApp != null) {
- webApp.stop();
- webApp = null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
deleted file mode 100644
index 8b711aa..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.AbstractService;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-/**
- * Service that closes the closeable supplied during shutdown, if not null.
- *
- * As the Service interface itself extends Closeable, this service
- * can be used to shut down other services if desired.
- */
-public class ClosingService<C extends Closeable> extends AbstractService {
-
- private C closeable;
-
- public ClosingService(String name) {
- super(name);
- }
-
- /**
- * Construct an instance of the service
- * @param name service name
- * @param closeable closeable to close (may be null)
- */
- public ClosingService(String name,
- C closeable) {
- super(name);
- this.closeable = closeable;
- }
-
- /**
- * Construct an instance of the service, using the default name
- * @param closeable closeable to close (may be null)
- */
- public ClosingService(C closeable) {
- this("ClosingService", closeable);
- }
-
-
- /**
- * Get the closeable
- * @return the closeable
- */
- public synchronized C getCloseable() {
- return closeable;
- }
-
- /**
- * Set or update the closeable.
- * @param closeable
- */
- public synchronized void setCloseable(C closeable) {
- this.closeable = closeable;
- }
-
- /**
- * Stop routine will close the closeable -if not null - and set the
- * reference to null afterwards
- * This operation does raise any exception on the close, though it does
- * record it
- */
- @Override
- protected void serviceStop() {
- C target = getCloseable();
- if (target != null) {
- try {
- target.close();
- } catch (IOException ioe) {
- noteFailure(ioe);
- }
- setCloseable(null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
deleted file mode 100644
index 352be49..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.ServiceStateException;
-import org.apache.slider.core.main.ServiceLaunchException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Service wrapper for an external program that is launched and can/will terminate.
- * This service is notified when the subprocess terminates, and stops itself
- * and converts a non-zero exit code into a failure exception.
- *
- * <p>
- * Key Features:
- * <ol>
- * <li>The property {@link #executionTimeout} can be set to set a limit
- * on the duration of a process</li>
- * <li>Output is streamed to the output logger provided</li>.
- * <li>The most recent lines of output are saved to a linked list</li>.
- * <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start
- * and finish of a process.</li>
- * </ol>
- *
- * Usage:
- * <p></p>
- * The service can be built in the constructor, {@link #ForkedProcessService(String, Map, List)},
- * or have its simple constructor used to instantiate the service, then the
- * {@link #build(Map, List)} command used to define the environment variables
- * and list of commands to execute. One of these two options MUST be exercised
- * before calling the services's {@link #start()} method.
- * <p></p>
- * The forked process is executed in the service's {@link #serviceStart()} method;
- * if still running when the service is stopped, {@link #serviceStop()} will
- * attempt to stop it.
- * <p></p>
- *
- * The service delegates process execution to {@link LongLivedProcess},
- * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}.
- * When the service receives a callback notifying that the process has completed,
- * it calls its {@link #stop()} method. If the error code was non-zero,
- * the service is logged as having failed.
- */
-public class ForkedProcessService
- extends WorkflowExecutorService<ExecutorService>
- implements LongLivedProcessLifecycleEvent, Runnable {
-
- /**
- * Log for the forked master process
- */
- private static final Logger LOG =
- LoggerFactory.getLogger(ForkedProcessService.class);
-
- private final AtomicBoolean processTerminated = new AtomicBoolean(false);
- private boolean processStarted = false;
- private LongLivedProcess process;
- private int executionTimeout = -1;
- private int timeoutCode = 1;
- /**
- log to log to; defaults to this service log
- */
- private Logger processLog = LOG;
-
- /**
- * Exit code set when the spawned process exits
- */
- private AtomicInteger exitCode = new AtomicInteger(0);
-
- /**
- * Create an instance of the service
- * @param name a name
- */
- public ForkedProcessService(String name) {
- super(name);
- }
-
- /**
- * Create an instance of the service, set up the process
- * @param name a name
- * @param commandList list of commands is inserted on the front
- * @param env environment variables above those generated by
- * @throws IOException IO problems
- */
- public ForkedProcessService(String name,
- Map<String, String> env,
- List<String> commandList) throws IOException {
- super(name);
- build(env, commandList);
- }
-
- @Override //AbstractService
- protected void serviceStart() throws Exception {
- if (process == null) {
- throw new ServiceStateException("Process not yet configured");
- }
- //now spawn the process -expect updates via callbacks
- process.start();
- }
-
- @Override //AbstractService
- protected void serviceStop() throws Exception {
- completed();
- stopForkedProcess();
- }
-
- private void stopForkedProcess() {
- if (process != null) {
- process.stop();
- }
- }
-
- /**
- * Set the process log. This may be null for "do not log"
- * @param processLog process log
- */
- public void setProcessLog(Logger processLog) {
- this.processLog = processLog;
- process.setProcessLog(processLog);
- }
-
- /**
- * Set the timeout by which time a process must have finished -or -1 for forever
- * @param timeout timeout in milliseconds
- */
- public void setTimeout(int timeout, int code) {
- this.executionTimeout = timeout;
- this.timeoutCode = code;
- }
-
- /**
- * Build the process to execute when the service is started
- * @param commandList list of commands is inserted on the front
- * @param env environment variables above those generated by
- * @throws IOException IO problems
- */
- public void build(Map<String, String> env,
- List<String> commandList)
- throws IOException {
- assert process == null;
-
- process = new LongLivedProcess(getName(), processLog, commandList);
- process.setLifecycleCallback(this);
- //set the env variable mapping
- process.putEnvMap(env);
- }
-
- @Override // notification from executed process
- public synchronized void onProcessStarted(LongLivedProcess process) {
- LOG.debug("Process has started");
- processStarted = true;
- if (executionTimeout > 0) {
- setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
- execute(this);
- }
- }
-
- @Override // notification from executed process
- public void onProcessExited(LongLivedProcess process,
- int uncorrected,
- int code) {
- try {
- synchronized (this) {
- completed();
- //note whether or not the service had already stopped
- LOG.debug("Process has exited with exit code {}", code);
- if (code != 0) {
- reportFailure(code, getName() + " failed with code " + code);
- }
- }
- } finally {
- stop();
- }
- }
-
- private void reportFailure(int code, String text) {
- //error
- ServiceLaunchException execEx = new ServiceLaunchException(code, text);
- LOG.debug("Noting failure", execEx);
- noteFailure(execEx);
- }
-
- /**
- * handle timeout response by escalating it to a failure
- */
- @Override
- public void run() {
- try {
- synchronized (processTerminated) {
- if (!processTerminated.get()) {
- processTerminated.wait(executionTimeout);
- }
- }
-
- } catch (InterruptedException e) {
- //assume signalled; exit
- }
- //check the status; if the marker isn't true, bail
- if (!processTerminated.getAndSet(true)) {
- LOG.info("process timeout: reporting error code {}", timeoutCode);
-
- //timeout
- if (isInState(STATE.STARTED)) {
- //trigger a failure
- stopForkedProcess();
- }
- reportFailure(timeoutCode, getName() + ": timeout after " + executionTimeout
- + " millis: exit code =" + timeoutCode);
- }
- }
-
- /**
- * Note the process as having completed.
- * The process marked as terminated
- * -and anything synchronized on <code>processTerminated</code>
- * is notified
- */
- protected void completed() {
- processTerminated.set(true);
- synchronized (processTerminated) {
- processTerminated.notify();
- }
- }
-
- public boolean isProcessTerminated() {
- return processTerminated.get();
- }
-
- public synchronized boolean isProcessStarted() {
- return processStarted;
- }
-
- /**
- * Is a process running: between started and terminated
- * @return true if the process is up.
- */
- public synchronized boolean isProcessRunning() {
- return processStarted && !isProcessTerminated();
- }
-
-
- public Integer getExitCode() {
- return process.getExitCode();
- }
-
- public int getExitCodeSignCorrected() {
- Integer exitCode = process.getExitCodeSignCorrected();
- if (exitCode == null) return -1;
- return exitCode;
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- * @return a possibly empty list
- */
- public List<String> getRecentOutput() {
- return process != null
- ? process.getRecentOutput()
- : new LinkedList<String>();
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- *
- * @param finalOutput flag to indicate "wait for the final output of the process"
- * @param duration the duration, in ms,
- * to wait for recent output to become non-empty
- * @return a possibly empty list
- */
- public List<String> getRecentOutput(boolean finalOutput, int duration) {
- if (process == null) {
- return new LinkedList<>();
- }
- return process.getRecentOutput(finalOutput, duration);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
deleted file mode 100644
index 9e9e7ac..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ /dev/null
@@ -1,598 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Execute a long-lived process.
- *
- * <p>
- * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
- * a short lived application; this class allows for the process to run for the
- * life of the Java process that forked it.
- * It is designed to be embedded inside a YARN service, though this is not
- * the sole way that it can be used
- * <p>
- * Key Features:
- * <ol>
- * <li>Output is streamed to the output logger provided</li>.
- * <li>the input stream is closed as soon as the process starts.</li>
- * <li>The most recent lines of output are saved to a linked list</li>.
- * <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent},
- * is raised on the start and finish of a process.</li>
- * </ol>
- *
- */
-public class LongLivedProcess implements Runnable {
- /**
- * Limit on number of lines to retain in the "recent" line list:{@value}
- */
- public static final int RECENT_LINE_LOG_LIMIT = 64;
-
- /**
- * Const defining the time in millis between polling for new text.
- */
- private static final int STREAM_READER_SLEEP_TIME = 200;
-
- /**
- * limit on the length of a stream before it triggers an automatic newline.
- */
- private static final int LINE_LENGTH = 256;
- private final ProcessBuilder processBuilder;
- private Process process;
- private Integer exitCode = null;
- private final String name;
- private final ExecutorService processExecutor;
- private final ExecutorService logExecutor;
-
- private ProcessStreamReader processStreamReader;
- //list of recent lines, recorded for extraction into reports
- private final List<String> recentLines = new LinkedList<>();
- private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
- private LongLivedProcessLifecycleEvent lifecycleCallback;
- private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false);
-
- /**
- * Log supplied in the constructor for the spawned process -accessible
- * to inner classes
- */
- private Logger processLog;
-
- /**
- * Class log -accessible to inner classes
- */
- private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
-
- /**
- * flag to indicate that the process is done
- */
- private final AtomicBoolean finished = new AtomicBoolean(false);
-
- /**
- * Create an instance
- * @param name process name
- * @param processLog log for output (or null)
- * @param commands command list
- */
- public LongLivedProcess(String name,
- Logger processLog,
- List<String> commands) {
- Preconditions.checkArgument(commands != null, "commands");
-
- this.name = name;
- this.processLog = processLog;
- ServiceThreadFactory factory = new ServiceThreadFactory(name, true);
- processExecutor = Executors.newSingleThreadExecutor(factory);
- logExecutor = Executors.newSingleThreadExecutor(factory);
- processBuilder = new ProcessBuilder(commands);
- processBuilder.redirectErrorStream(false);
- }
-
- /**
- * Set the limit on recent lines to retain
- * @param recentLineLimit size of rolling list of recent lines.
- */
- public void setRecentLineLimit(int recentLineLimit) {
- this.recentLineLimit = recentLineLimit;
- }
-
- /**
- * Set an optional application exit callback
- * @param lifecycleCallback callback to notify on application exit
- */
- public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
- this.lifecycleCallback = lifecycleCallback;
- }
-
- /**
- * Add an entry to the environment
- * @param envVar envVar -must not be null
- * @param val value
- */
- public void setEnv(String envVar, String val) {
- Preconditions.checkArgument(envVar != null, "envVar");
- Preconditions.checkArgument(val != null, "val");
- processBuilder.environment().put(envVar, val);
- }
-
- /**
- * Bulk set the environment from a map. This does
- * not replace the existing environment, just extend it/overwrite single
- * entries.
- * @param map map to add
- */
- public void putEnvMap(Map<String, String> map) {
- for (Map.Entry<String, String> entry : map.entrySet()) {
- String val = entry.getValue();
- String key = entry.getKey();
- setEnv(key, val);
- }
- }
-
- /**
- * Get the process environment
- * @param variable environment variable
- * @return the value or null if there is no match
- */
- public String getEnv(String variable) {
- return processBuilder.environment().get(variable);
- }
-
- /**
- * Set the process log. Ignored once the process starts
- * @param processLog new log ... may be null
- */
- public void setProcessLog(Logger processLog) {
- this.processLog = processLog;
- }
-
- /**
- * Get the process reference
- * @return the process -null if the process is not started
- */
- public Process getProcess() {
- return process;
- }
-
- /**
- * Get the process builder -this can be manipulated
- * up to the start() operation. As there is no synchronization
- * around it, it must only be used in the same thread setting up the commmand.
- * @return the process builder
- */
- public ProcessBuilder getProcessBuilder() {
- return processBuilder;
- }
-
- /**
- * Get the command list
- * @return the comands
- */
- public List<String> getCommands() {
- return processBuilder.command();
- }
-
- public String getCommand() {
- return getCommands().get(0);
- }
-
- /**
- * probe to see if the process is running
- * @return true iff the process has been started and is not yet finished
- */
- public boolean isRunning() {
- return process != null && !finished.get();
- }
-
- /**
- * Get the exit code: null until the process has finished
- * @return the exit code or null
- */
- public Integer getExitCode() {
- return exitCode;
- }
-
- /**
- * Get the exit code sign corrected: null until the process has finished
- * @return the exit code or null
- */
- public Integer getExitCodeSignCorrected() {
- Integer result;
- if (exitCode != null) {
- result = (exitCode << 24) >> 24;
- } else {
- result = null;
- }
- return result;
- }
-
- /**
- * Stop the process if it is running.
- * This will trigger an application completion event with the given exit code
- */
- public void stop() {
- if (!isRunning()) {
- return;
- }
- process.destroy();
- }
-
- /**
- * Get a text description of the builder suitable for log output
- * @return a multiline string
- */
- protected String describeBuilder() {
- StringBuilder buffer = new StringBuilder();
- for (String arg : processBuilder.command()) {
- buffer.append('"').append(arg).append("\" ");
- }
- return buffer.toString();
- }
-
- /**
- * Dump the environment to a string builder
- * @param buffer the buffer to append to
- */
- public void dumpEnv(StringBuilder buffer) {
- buffer.append("\nEnvironment\n-----------");
- Map<String, String> env = processBuilder.environment();
- Set<String> keys = env.keySet();
- List<String> sortedKeys = new ArrayList<String>(keys);
- Collections.sort(sortedKeys);
- for (String key : sortedKeys) {
- buffer.append(key).append("=").append(env.get(key)).append('\n');
- }
- }
-
- /**
- * Exec the process
- * @return the process
- * @throws IOException on aany failure to start the process
- * @throws FileNotFoundException if the process could not be found
- */
- private Process spawnChildProcess() throws IOException {
- if (process != null) {
- throw new IOException("Process already started");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Spawning process:\n " + describeBuilder());
- }
- try {
- process = processBuilder.start();
- } catch (IOException e) {
- // on windows, upconvert DOS error 2 from ::CreateProcess()
- // to its real meaning: FileNotFound
- if (e.toString().contains("CreateProcess error=2")) {
- FileNotFoundException fnfe =
- new FileNotFoundException(e.toString());
- fnfe.initCause(e);
- throw fnfe;
- } else {
- throw e;
- }
- }
- return process;
- }
-
- /**
- * Entry point for waiting for the program to finish
- */
- @Override // Runnable
- public void run() {
- Preconditions.checkNotNull(process, "null process");
- LOG.debug("Lifecycle callback thread running");
- //notify the callback that the process has started
- if (lifecycleCallback != null) {
- lifecycleCallback.onProcessStarted(this);
- }
- try {
- //close stdin for the process
- IOUtils.closeStream(process.getOutputStream());
- exitCode = process.waitFor();
- } catch (InterruptedException e) {
- LOG.debug("Process wait interrupted -exiting thread", e);
- } finally {
- //here the process has finished
- LOG.debug("process {} has finished", name);
- //tell the logger it has to finish too
- finished.set(true);
-
- // shut down the threads
- logExecutor.shutdown();
- try {
- logExecutor.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {
- //ignored
- }
-
- //now call the callback if it is set
- if (lifecycleCallback != null) {
- lifecycleCallback.onProcessExited(this, exitCode,
- getExitCodeSignCorrected());
- }
- }
- }
-
- /**
- * Spawn the application
- * @throws IOException IO problems
- */
- public void start() throws IOException {
-
- spawnChildProcess();
- processStreamReader =
- new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME);
- logExecutor.submit(processStreamReader);
- processExecutor.submit(this);
- }
-
- /**
- * Get the lines of recent output
- * @return the last few lines of output; an empty list if there are none
- * or the process is not actually running
- */
- public synchronized List<String> getRecentOutput() {
- return new ArrayList<String>(recentLines);
- }
-
- /**
- * @return whether lines of recent output are empty
- */
- public synchronized boolean isRecentOutputEmpty() {
- return recentLines.isEmpty();
- }
-
- /**
- * Query to see if the final output has been processed
- * @return
- */
- public boolean isFinalOutputProcessed() {
- return finalOutputProcessed.get();
- }
-
- /**
- * Get the recent output from the process, or [] if not defined
- *
- * @param finalOutput flag to indicate "wait for the final output of the process"
- * @param duration the duration, in ms,
- * ro wait for recent output to become non-empty
- * @return a possibly empty list
- */
- public List<String> getRecentOutput(boolean finalOutput, int duration) {
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start <= duration) {
- boolean finishedOutput;
- if (finalOutput) {
- // final flag means block until all data is done
- finishedOutput = isFinalOutputProcessed();
- } else {
- // there is some output
- finishedOutput = !isRecentOutputEmpty();
- }
- if (finishedOutput) {
- break;
- }
- try {
- Thread.sleep(100);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- return getRecentOutput();
- }
-
- /**
- * add the recent line to the list of recent lines; deleting
- * an earlier on if the limit is reached.
- *
- * Implementation note: yes, a circular array would be more
- * efficient, especially with some power of two as the modulo,
- * but is it worth the complexity and risk of errors for
- * something that is only called once per line of IO?
- * @param line line to record
- * @param isErrorStream is the line from the error stream
- * @param logger logger to log to - null for no logging
- */
- private synchronized void recordRecentLine(String line,
- boolean isErrorStream,
- Logger logger) {
- if (line == null) {
- return;
- }
- String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line;
- recentLines.add(entry);
- if (recentLines.size() > recentLineLimit) {
- recentLines.remove(0);
- }
- if (logger != null) {
- if (isErrorStream) {
- logger.warn(line);
- } else {
- logger.info(line);
- }
- }
- }
-
- /**
- * Class to read data from the two process streams, and, when run in a thread
- * to keep running until the <code>done</code> flag is set.
- * Lines are fetched from stdout and stderr and logged at info and error
- * respectively.
- */
-
- private class ProcessStreamReader implements Runnable {
- private final Logger streamLog;
- private final int sleepTime;
-
- /**
- * Create an instance
- * @param streamLog log -or null to disable logging (recent entries
- * will still be retained)
- * @param sleepTime time to sleep when stopping
- */
- private ProcessStreamReader(Logger streamLog, int sleepTime) {
- this.streamLog = streamLog;
- this.sleepTime = sleepTime;
- }
-
- /**
- * Return a character if there is one, -1 if nothing is ready yet
- * @param reader reader
- * @return the value from the reader, or -1 if it is not ready
- * @throws IOException IO problems
- */
- private int readCharNonBlocking(BufferedReader reader) throws IOException {
- if (reader.ready()) {
- return reader.read();
- } else {
- return -1;
- }
- }
-
- /**
- * Read in a line, or, if the limit has been reached, the buffer
- * so far
- * @param reader source of data
- * @param line line to build
- * @param limit limit of line length
- * @return true if the line can be printed
- * @throws IOException IO trouble
- */
- @SuppressWarnings("NestedAssignment")
- private boolean readAnyLine(BufferedReader reader,
- StringBuilder line,
- int limit)
- throws IOException {
- int next;
- while ((-1 != (next = readCharNonBlocking(reader)))) {
- if (next != '\n') {
- line.append((char) next);
- limit--;
- if (line.length() > limit) {
- //enough has been read in to print it any
- return true;
- }
- } else {
- //line end return flag to say so
- return true;
- }
- }
- //here the end of the stream is hit, or the limit
- return false;
- }
-
-
- @Override //Runnable
- @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
- public void run() {
- BufferedReader errReader = null;
- BufferedReader outReader = null;
- StringBuilder outLine = new StringBuilder(LINE_LENGTH);
- StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
- try {
- errReader = new BufferedReader(
- new InputStreamReader(process.getErrorStream()));
- outReader = new BufferedReader(
- new InputStreamReader(process.getInputStream()));
- while (!finished.get()) {
- boolean processed = false;
- if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
- recordRecentLine(errorLine.toString(), true, streamLog);
- errorLine.setLength(0);
- processed = true;
- }
- if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
- recordRecentLine(outLine.toString(), false, streamLog);
- outLine.setLength(0);
- processed |= true;
- }
- if (!processed && !finished.get()) {
- //nothing processed: wait a bit for data.
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- //ignore this, rely on the done flag
- LOG.debug("Ignoring ", e);
- }
- }
- }
- // finished: cleanup
-
- //print the current error line then stream through the rest
- recordFinalOutput(errReader, errorLine, true, streamLog);
- //now do the info line
- recordFinalOutput(outReader, outLine, false, streamLog);
-
- } catch (Exception ignored) {
- LOG.warn("encountered {}", ignored, ignored);
- //process connection has been torn down
- } finally {
- // close streams
- IOUtils.closeStream(errReader);
- IOUtils.closeStream(outReader);
- //mark output as done
- finalOutputProcessed.set(true);
- }
- }
-
- /**
- * Record the final output of a process stream
- * @param reader reader of output
- * @param lineBuilder string builder into which line is built
- * @param isErrorStream flag to indicate whether or not this is the
- * is the line from the error stream
- * @param logger logger to log to
- * @throws IOException
- */
- protected void recordFinalOutput(BufferedReader reader,
- StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws
- IOException {
- String line = lineBuilder.toString();
- recordRecentLine(line, isErrorStream, logger);
- line = reader.readLine();
- while (line != null) {
- recordRecentLine(line, isErrorStream, logger);
- line = reader.readLine();
- if (Thread.interrupted()) {
- break;
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
deleted file mode 100644
index a13b508..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-/**
- * Callback when a long-lived application exits
- */
-public interface LongLivedProcessLifecycleEvent {
-
- /**
- * Callback when a process is started
- * @param process the process invoking the callback
- */
- void onProcessStarted(LongLivedProcess process);
-
- /**
- * Callback when a process has finished
- * @param process the process invoking the callback
- * @param exitCode exit code from the process
- * @param signCorrectedCode the code- as sign corrected
- */
- void onProcessExited(LongLivedProcess process,
- int exitCode,
- int signCorrectedCode);
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
deleted file mode 100644
index a123584..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.Service;
-
-import java.util.List;
-
-/**
- * Interface for accessing services that contain one or more child
- * services.
- */
-public interface ServiceParent extends Service {
-
- /**
- * Add a child service. It must be in a consistent state with the
- * service to which it is being added.
- * @param service the service to add.
- */
- void addService(Service service);
-
- /**
- * Get an unmodifiable list of services
- * @return a list of child services at the time of invocation -
- * added services will not be picked up.
- */
- List<Service> getServices();
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
deleted file mode 100644
index 5ebf77c..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.Service;
-
-import java.util.concurrent.Callable;
-
-/**
- * A runnable which terminates its owner; it also catches any
- * exception raised and can serve it back.
- *
- */
-public class ServiceTerminatingCallable<V> implements Callable<V> {
-
- private final Service owner;
- private Exception exception;
- /**
- * This is the callback
- */
- private final Callable<V> callable;
-
-
- /**
- * Create an instance. If the owner is null, the owning service
- * is not terminated.
- * @param owner owning service -can be null
- * @param callable callback.
- */
- public ServiceTerminatingCallable(Service owner,
- Callable<V> callable) {
- Preconditions.checkArgument(callable != null, "null callable");
- this.owner = owner;
- this.callable = callable;
- }
-
-
- /**
- * Get the owning service
- * @return the service to receive notification when
- * the runnable completes.
- */
- public Service getOwner() {
- return owner;
- }
-
- /**
- * Any exception raised by inner <code>action's</code> run.
- * @return an exception or null.
- */
- public Exception getException() {
- return exception;
- }
-
- /**
- * Delegates the call to the callable supplied in the constructor,
- * then calls the stop() operation on its owner. Any exception
- * is caught, noted and rethrown
- * @return the outcome of the delegated call operation
- * @throws Exception if one was raised.
- */
- @Override
- public V call() throws Exception {
- try {
- return callable.call();
- } catch (Exception e) {
- exception = e;
- throw e;
- } finally {
- if (owner != null) {
- owner.stop();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
deleted file mode 100644
index dc591df..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.Service;
-
-/**
- * A runnable which terminates its after running; it also catches any
- * exception raised and can serve it back.
- */
-public class ServiceTerminatingRunnable implements Runnable {
-
- private final Service owner;
- private final Runnable action;
- private Exception exception;
-
- /**
- * Create an instance.
- * @param owner owning service
- * @param action action to execute before terminating the service
- */
- public ServiceTerminatingRunnable(Service owner, Runnable action) {
- Preconditions.checkArgument(owner != null, "null owner");
- Preconditions.checkArgument(action != null, "null action");
- this.owner = owner;
- this.action = action;
- }
-
- /**
- * Get the owning service.
- * @return the service to receive notification when
- * the runnable completes.
- */
- public Service getOwner() {
- return owner;
- }
-
- /**
- * Any exception raised by inner <code>action's</code> run.
- * @return an exception or null.
- */
- public Exception getException() {
- return exception;
- }
-
- @Override
- public void run() {
- try {
- action.run();
- } catch (Exception e) {
- exception = e;
- }
- owner.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
deleted file mode 100644
index 737197b..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A thread factory that creates threads (possibly daemon threads)
- * using the name and naming policy supplied.
- * The thread counter starts at 1, increments atomically,
- * and is supplied as the second argument in the format string.
- *
- * A static method, {@link #singleThreadExecutor(String, boolean)},
- * exists to simplify the construction of an executor with a single well-named
- * threads.
- *
- * Example
- * <pre>
- * ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
- * </pre>
- */
-public class ServiceThreadFactory implements ThreadFactory {
-
- private static final AtomicInteger counter = new AtomicInteger(1);
-
- /**
- * Default format for thread names: {@value}.
- */
- public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
- private final String name;
- private final boolean daemons;
- private final String namingFormat;
-
- /**
- * Create an instance
- * @param name base thread name
- * @param daemons flag to indicate the threads should be marked as daemons
- * @param namingFormat format string to generate thread names from
- */
- public ServiceThreadFactory(String name,
- boolean daemons,
- String namingFormat) {
- Preconditions.checkArgument(name != null, "null name");
- Preconditions.checkArgument(namingFormat != null, "null naming format");
- this.name = name;
- this.daemons = daemons;
- this.namingFormat = namingFormat;
- }
-
- /**
- * Create an instance with the default naming format.
- * @param name base thread name
- * @param daemons flag to indicate the threads should be marked as daemons
- */
- public ServiceThreadFactory(String name,
- boolean daemons) {
- this(name, daemons, DEFAULT_NAMING_FORMAT);
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Preconditions.checkArgument(r != null, "null runnable");
- String threadName =
- String.format(namingFormat, name, counter.getAndIncrement());
- Thread thread = new Thread(r, threadName);
- thread.setDaemon(daemons);
- return thread;
- }
-
- /**
- * Create a single thread executor using this naming policy.
- * @param name base thread name
- * @param daemons flag to indicate the threads should be marked as daemons
- * @return an executor
- */
- public static ExecutorService singleThreadExecutor(String name,
- boolean daemons) {
- return Executors.newSingleThreadExecutor(
- new ServiceThreadFactory(name, daemons));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
deleted file mode 100644
index 65d14b7..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A service that calls the supplied callback when it is started -after the
- * given delay.
- *
- * It can be configured to stop itself after the callback has
- * completed, marking any exception raised as the exception of this service.
- * The notifications come in on a callback thread -a thread that is only
- * started in this service's <code>start()</code> operation.
- */
-public class WorkflowCallbackService<V> extends
- WorkflowScheduledExecutorService<ScheduledExecutorService> {
- protected static final Logger LOG =
- LoggerFactory.getLogger(WorkflowCallbackService.class);
-
- /**
- * This is the callback.
- */
- private final Callable<V> callback;
- private final int delay;
- private final ServiceTerminatingCallable<V> command;
-
- private ScheduledFuture<V> scheduledFuture;
-
- /**
- * Create an instance of the service
- * @param name service name
- * @param callback callback to invoke
- * @param delay delay -or 0 for no delay
- * @param terminate terminate this service after the callback?
- */
- public WorkflowCallbackService(String name,
- Callable<V> callback,
- int delay,
- boolean terminate) {
- super(name);
- Preconditions.checkNotNull(callback, "Null callback argument");
- this.callback = callback;
- this.delay = delay;
- command = new ServiceTerminatingCallable<V>(
- terminate ? this : null,
- callback);
- }
-
- public ScheduledFuture<V> getScheduledFuture() {
- return scheduledFuture;
- }
-
- @Override
- protected void serviceStart() throws Exception {
- LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
- ScheduledExecutorService executorService =
- Executors.newSingleThreadScheduledExecutor(
- new ServiceThreadFactory(getName(), true));
- setExecutor(executorService);
- scheduledFuture =
- executorService.schedule(command, delay, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Stop the service.
- * If there is any exception noted from any executed notification,
- * note the exception in this class
- * @throws Exception exception.
- */
- @Override
- protected void serviceStop() throws Exception {
- super.serviceStop();
- // propagate any failure
- if (getCallbackException() != null) {
- throw getCallbackException();
- }
- }
-
- /**
- * Get the exception raised by a callback. Will always be null if the
- * callback has not been executed; will only be non-null after any success.
- * @return a callback
- */
- public Exception getCallbackException() {
- return command.getException();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
deleted file mode 100644
index 9c653f3..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * An extended composite service which stops itself if any child service
- * fails, or when all its children have successfully stopped without failure.
- *
- * Lifecycle
- * <ol>
- * <li>If any child exits with a failure: this service stops, propagating
- * the exception.</li>
- * <li>When all child services has stopped, this service stops itself</li>
- * </ol>
- *
- */
-public class WorkflowCompositeService extends CompositeService
- implements ServiceParent, ServiceStateChangeListener {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(WorkflowCompositeService.class);
-
- /**
- * Deadlock-avoiding overridden config for slider services; see SLIDER-1052
- */
- private volatile Configuration configuration;
-
- /**
- * Construct an instance
- * @param name name of this service instance
- */
- public WorkflowCompositeService(String name) {
- super(name);
- }
-
- @Override
- public Configuration getConfig() {
- return configuration;
- }
-
- @Override
- protected void setConfig(Configuration conf) {
- super.setConfig(conf);
- configuration = conf;
- }
-
- /**
- * Construct an instance with the default name.
- */
- public WorkflowCompositeService() {
- this("WorkflowCompositeService");
- }
-
- /**
- * Varargs constructor
- * @param name name of this service instance
- * @param children children
- */
- public WorkflowCompositeService(String name, Service... children) {
- this(name);
- for (Service child : children) {
- addService(child);
- }
- }
-
- /**
- * Construct with a list of children
- * @param name name of this service instance
- * @param children children to add
- */
- public WorkflowCompositeService(String name, List<Service> children) {
- this(name);
- for (Service child : children) {
- addService(child);
- }
- }
-
- /**
- * Add a service, and register it
- * @param service the {@link Service} to be added.
- * Important: do not add a service to a parent during your own serviceInit/start,
- * in Hadoop 2.2; you will trigger a ConcurrentModificationException.
- */
- @Override
- public synchronized void addService(Service service) {
- Preconditions.checkArgument(service != null, "null service argument");
- service.registerServiceListener(this);
- super.addService(service);
- }
-
- /**
- * When this service is started, any service stopping with a failure
- * exception is converted immediately into a failure of this service,
- * storing the failure and stopping ourselves.
- * @param child the service that has changed.
- */
- @Override
- public void stateChanged(Service child) {
- //if that child stopped while we are running:
- if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) {
- // a child service has stopped
- //did the child fail? if so: propagate
- Throwable failureCause = child.getFailureCause();
- if (failureCause != null) {
- LOG.info("Child service " + child + " failed", failureCause);
- //failure. Convert to an exception
- Exception e = (failureCause instanceof Exception) ?
- (Exception) failureCause : new Exception(failureCause);
- //flip ourselves into the failed state
- noteFailure(e);
- stop();
- } else {
- LOG.info("Child service completed {}", child);
- if (areAllChildrenStopped()) {
- LOG.info("All children are halted: stopping");
- stop();
- }
- }
- }
- }
-
- /**
- * Probe to query if all children are stopped -simply
- * by taking a snapshot of the child service list and enumerating
- * their state.
- * The state of the children may change during this operation -that will
- * not get picked up.
- * @return true if all the children are stopped.
- */
- private boolean areAllChildrenStopped() {
- List<Service> children = getServices();
- boolean stopped = true;
- for (Service child : children) {
- if (!child.isInState(STATE.STOPPED)) {
- stopped = false;
- break;
- }
- }
- return stopped;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
deleted file mode 100644
index 7409d32..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.AbstractService;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-/**
- * A service that hosts an executor -when the service is stopped,
- * {@link ExecutorService#shutdownNow()} is invoked.
- */
-public class WorkflowExecutorService<E extends ExecutorService> extends AbstractService {
-
- private E executor;
-
- /**
- * Construct an instance with the given name -but
- * no executor
- * @param name service name
- */
- public WorkflowExecutorService(String name) {
- this(name, null);
- }
-
- /**
- * Construct an instance with the given name and executor
- * @param name service name
- * @param executor exectuor
- */
- public WorkflowExecutorService(String name,
- E executor) {
- super(name);
- this.executor = executor;
- }
-
- /**
- * Get the executor
- * @return the executor
- */
- public synchronized E getExecutor() {
- return executor;
- }
-
- /**
- * Set the executor. Only valid if the current one is null
- * @param executor executor
- */
- public synchronized void setExecutor(E executor) {
- Preconditions.checkState(this.executor == null,
- "Executor already set");
- this.executor = executor;
- }
-
- /**
- * Execute the runnable with the executor (which
- * must have been created already)
- * @param runnable runnable to execute
- */
- public void execute(Runnable runnable) {
- getExecutor().execute(runnable);
- }
-
- /**
- * Submit a callable
- * @param callable callable
- * @param <V> type of the final get
- * @return a future to wait on
- */
- public <V> Future<V> submit(Callable<V> callable) {
- return getExecutor().submit(callable);
- }
-
- /**
- * Stop the service: halt the executor.
- * @throws Exception exception.
- */
- @Override
- protected void serviceStop() throws Exception {
- stopExecutor();
- super.serviceStop();
- }
-
- /**
- * Stop the executor if it is not null.
- * This uses {@link ExecutorService#shutdownNow()}
- * and so does not block until they have completed.
- */
- protected synchronized void stopExecutor() {
- if (executor != null) {
- executor.shutdownNow();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
deleted file mode 100644
index b71530f..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.AbstractService;
-
-import java.net.InetSocketAddress;
-
-/**
- * A YARN service that maps the start/stop lifecycle of an RPC server
- * to the YARN service lifecycle.
- */
-public class WorkflowRpcService extends AbstractService {
-
- /** RPC server*/
- private final Server server;
-
- /**
- * Construct an instance
- * @param name service name
- * @param server service to stop
- */
- public WorkflowRpcService(String name, Server server) {
- super(name);
- Preconditions.checkArgument(server != null, "Null server");
- this.server = server;
- }
-
- /**
- * Get the server
- * @return the server
- */
- public Server getServer() {
- return server;
- }
-
- /**
- * Get the socket address of this server
- * @return the address this server is listening on
- */
- public InetSocketAddress getConnectAddress() {
- return NetUtils.getConnectAddress(server);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- super.serviceStart();
- server.start();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- if (server != null) {
- server.stop();
- }
- }
-}