You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/19 13:09:38 UTC
[20/72] [abbrv] incubator-brooklyn git commit: BROOKLYN-162 - apply
org.apache package prefix to software-base, tidying package names,
and moving a few sensory things to core
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/software/base/VanillaWindowsProcessWinRmDriver.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/VanillaWindowsProcessWinRmDriver.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/VanillaWindowsProcessWinRmDriver.java
new file mode 100644
index 0000000..f7d6ccf
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/VanillaWindowsProcessWinRmDriver.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base;
+
+import org.apache.brooklyn.api.internal.EntityLocal;
+import org.apache.brooklyn.entity.core.Attributes;
+import org.apache.brooklyn.location.basic.WinRmMachineLocation;
+import org.apache.brooklyn.util.net.UserAndHostAndPort;
+
+public class VanillaWindowsProcessWinRmDriver extends AbstractSoftwareProcessWinRmDriver implements VanillaWindowsProcessDriver {
+
+ public VanillaWindowsProcessWinRmDriver(EntityLocal entity, WinRmMachineLocation location) {
+ super(entity, location);
+ }
+
+ @Override
+ public void start() {
+ WinRmMachineLocation machine = (WinRmMachineLocation) location;
+ UserAndHostAndPort winrmAddress = UserAndHostAndPort.fromParts(machine.getUser(), machine.getAddress().getHostName(), machine.config().get(WinRmMachineLocation.WINRM_PORT));
+ getEntity().setAttribute(Attributes.WINRM_ADDRESS, winrmAddress);
+
+ super.start();
+ }
+
+ @Override
+ public void preInstall() {
+ super.preInstall();
+ executeCommand(VanillaWindowsProcess.PRE_INSTALL_COMMAND, VanillaWindowsProcess.PRE_INSTALL_POWERSHELL_COMMAND, true);
+ if (entity.getConfig(VanillaWindowsProcess.PRE_INSTALL_REBOOT_REQUIRED)) {
+ rebootAndWait();
+ }
+ }
+
+ @Override
+ public void install() {
+ // TODO: Follow install path of VanillaSoftwareProcessSshDriver
+ executeCommand(VanillaWindowsProcess.INSTALL_COMMAND, VanillaWindowsProcess.INSTALL_POWERSHELL_COMMAND, true);
+ if (entity.getConfig(VanillaWindowsProcess.INSTALL_REBOOT_REQUIRED)) {
+ rebootAndWait();
+ }
+ }
+
+ @Override
+ public void customize() {
+ // TODO: Follow customize path of VanillaSoftwareProcessSshDriver
+ executeCommand(VanillaWindowsProcess.CUSTOMIZE_COMMAND, VanillaWindowsProcess.CUSTOMIZE_POWERSHELL_COMMAND, true);
+ if (entity.getConfig(VanillaWindowsProcess.CUSTOMIZE_REBOOT_REQUIRED)) {
+ rebootAndWait();
+ }
+ }
+
+ @Override
+ public void launch() {
+ executeCommand(VanillaWindowsProcess.LAUNCH_COMMAND, VanillaWindowsProcess.LAUNCH_POWERSHELL_COMMAND, true);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return executeCommand(VanillaWindowsProcess.CHECK_RUNNING_COMMAND,
+ VanillaWindowsProcess.CHECK_RUNNING_POWERSHELL_COMMAND, false).getStatusCode() == 0;
+ }
+
+ @Override
+ public void stop() {
+ executeCommand(VanillaWindowsProcess.STOP_COMMAND, VanillaWindowsProcess.STOP_POWERSHELL_COMMAND, true);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
new file mode 100644
index 0000000..c46277b
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/MachineLifecycleEffectorTasks.java
@@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base.lifecycle;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.MachineLocation;
+import org.apache.brooklyn.api.location.MachineManagementMixins;
+import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+import org.apache.brooklyn.api.location.NoMachinesAvailableException;
+import org.apache.brooklyn.api.location.MachineManagementMixins.SuspendsMachines;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.config.Sanitizer;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.effector.core.EffectorBody;
+import org.apache.brooklyn.effector.core.Effectors;
+import org.apache.brooklyn.entity.core.Attributes;
+import org.apache.brooklyn.entity.core.BrooklynConfigKeys;
+import org.apache.brooklyn.entity.core.Entities;
+import org.apache.brooklyn.entity.core.EntityInternal;
+import org.apache.brooklyn.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic;
+import org.apache.brooklyn.entity.machine.MachineInitTasks;
+import org.apache.brooklyn.entity.machine.ProvidesProvisioningFlags;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess.RestartSoftwareParameters;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess.RestartSoftwareParameters.RestartMachineMode;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess.StopSoftwareParameters.StopMode;
+import org.apache.brooklyn.entity.stock.EffectorStartableImpl.StartParameters;
+import org.apache.brooklyn.entity.trait.Startable;
+import org.apache.brooklyn.entity.trait.StartableMethods;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import org.apache.brooklyn.location.basic.AbstractLocation;
+import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation;
+import org.apache.brooklyn.location.basic.Locations;
+import org.apache.brooklyn.location.basic.Machines;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import org.apache.brooklyn.location.cloud.CloudLocationConfig;
+import org.apache.brooklyn.sensor.feed.ConfigToAttributes;
+import org.apache.brooklyn.sensor.ssh.SshEffectorTasks;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.config.ConfigBag;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.net.UserAndHostAndPort;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.ssh.BashCommands;
+import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
+
+/**
+ * Default skeleton for start/stop/restart tasks on machines.
+ * <p>
+ * Knows how to provision machines, making use of {@link ProvidesProvisioningFlags#obtainProvisioningFlags(MachineProvisioningLocation)},
+ * and provides hooks for injecting behaviour at common places.
+ * <p>
+ * Methods are designed for overriding, with the convention that *Async methods should queue (and not block).
+ * The following methods are commonly overridden (and you can safely queue tasks, block, or return immediately in them):
+ * <ul>
+ * <li> {@link #startProcessesAtMachine(Supplier)} (required)
+ * <li> {@link #stopProcessesAtMachine()} (required, but can be left blank if you assume the VM will be destroyed)
+ * <li> {@link #preStartCustom(MachineLocation)}
+ * <li> {@link #postStartCustom()}
+ * <li> {@link #preStopCustom()}
+ * <li> {@link #postStopCustom()}
+ * </ul>
+ * Note methods at this level typically look after the {@link Attributes#SERVICE_STATE} sensor.
+ *
+ * @since 0.6.0
+ */
+@Beta
+public abstract class MachineLifecycleEffectorTasks {
+
+ private static final Logger log = LoggerFactory.getLogger(MachineLifecycleEffectorTasks.class);
+
+ public static final ConfigKey<Boolean> ON_BOX_BASE_DIR_RESOLVED = ConfigKeys.newBooleanConfigKey("onbox.base.dir.resolved",
+ "Whether the on-box base directory has been resolved (for internal use)");
+
+ public static final ConfigKey<Collection<? extends Location>> LOCATIONS = StartParameters.LOCATIONS;
+ public static final ConfigKey<Duration> STOP_PROCESS_TIMEOUT = ConfigKeys.newConfigKey(Duration.class,
+ "process.stop.timeout", "How long to wait for the processes to be stopped; use null to mean forever", Duration.TWO_MINUTES);
+
+ protected final MachineInitTasks machineInitTasks = new MachineInitTasks();
+
+ /** Attaches lifecycle effectors (start, restart, stop) to the given entity post-creation. */
+ public void attachLifecycleEffectors(Entity entity) {
+ ((EntityInternal) entity).getMutableEntityType().addEffector(newStartEffector());
+ ((EntityInternal) entity).getMutableEntityType().addEffector(newRestartEffector());
+ ((EntityInternal) entity).getMutableEntityType().addEffector(newStopEffector());
+ }
+
+ /**
+ * Return an effector suitable for setting in a {@code public static final} or attaching dynamically.
+ * <p>
+ * The effector overrides the corresponding effector from {@link Startable} with
+ * the behaviour in this lifecycle class instance.
+ */
+ public Effector<Void> newStartEffector() {
+ return Effectors.effector(Startable.START).impl(newStartEffectorTask()).build();
+ }
+
+ /** @see {@link #newStartEffector()} */
+ public Effector<Void> newRestartEffector() {
+ return Effectors.effector(Startable.RESTART)
+ .parameter(RestartSoftwareParameters.RESTART_CHILDREN)
+ .parameter(RestartSoftwareParameters.RESTART_MACHINE)
+ .impl(newRestartEffectorTask())
+ .build();
+ }
+
+ /** @see {@link #newStartEffector()} */
+ public Effector<Void> newStopEffector() {
+ return Effectors.effector(Startable.STOP)
+ .parameter(StopSoftwareParameters.STOP_PROCESS_MODE)
+ .parameter(StopSoftwareParameters.STOP_MACHINE_MODE)
+ .impl(newStopEffectorTask())
+ .build();
+ }
+
+ /** @see {@link #newStartEffector()} */
+ public Effector<Void> newSuspendEffector() {
+ return Effectors.effector(Void.class, "suspend")
+ .description("Suspend the process/service represented by an entity")
+ .parameter(StopSoftwareParameters.STOP_PROCESS_MODE)
+ .parameter(StopSoftwareParameters.STOP_MACHINE_MODE)
+ .impl(newSuspendEffectorTask())
+ .build();
+ }
+
+ /**
+ * Returns the {@link EffectorBody} which supplies the implementation for the start effector.
+ * <p>
+ * Calls {@link #start(Collection)} in this class.
+ */
+ public EffectorBody<Void> newStartEffectorTask() {
+ // TODO included anonymous inner class for backwards compatibility with persisted state.
+ new EffectorBody<Void>() {
+ @Override
+ public Void call(ConfigBag parameters) {
+ Collection<? extends Location> locations = null;
+
+ Object locationsRaw = parameters.getStringKey(LOCATIONS.getName());
+ locations = Locations.coerceToCollection(entity().getManagementContext(), locationsRaw);
+
+ if (locations==null) {
+ // null/empty will mean to inherit from parent
+ locations = Collections.emptyList();
+ }
+
+ start(locations);
+ return null;
+ }
+ };
+ return new StartEffectorBody();
+ }
+
+ private class StartEffectorBody extends EffectorBody<Void> {
+ @Override
+ public Void call(ConfigBag parameters) {
+ Collection<? extends Location> locations = null;
+
+ Object locationsRaw = parameters.getStringKey(LOCATIONS.getName());
+ locations = Locations.coerceToCollection(entity().getManagementContext(), locationsRaw);
+
+ if (locations == null) {
+ // null/empty will mean to inherit from parent
+ locations = Collections.emptyList();
+ }
+
+ start(locations);
+ return null;
+ }
+
+ }
+
+ /**
+ * Calls {@link #restart(ConfigBag)}.
+ *
+ * @see {@link #newStartEffectorTask()}
+ */
+ public EffectorBody<Void> newRestartEffectorTask() {
+ // TODO included anonymous inner class for backwards compatibility with persisted state.
+ new EffectorBody<Void>() {
+ @Override
+ public Void call(ConfigBag parameters) {
+ restart(parameters);
+ return null;
+ }
+ };
+ return new RestartEffectorBody();
+ }
+
+ private class RestartEffectorBody extends EffectorBody<Void> {
+ @Override
+ public Void call(ConfigBag parameters) {
+ restart(parameters);
+ return null;
+ }
+ }
+
+ /**
+ * Calls {@link #stop(ConfigBag)}.
+ *
+ * @see {@link #newStartEffectorTask()}
+ */
+ public EffectorBody<Void> newStopEffectorTask() {
+ // TODO included anonymous inner class for backwards compatibility with persisted state.
+ new EffectorBody<Void>() {
+ @Override
+ public Void call(ConfigBag parameters) {
+ stop(parameters);
+ return null;
+ }
+ };
+ return new StopEffectorBody();
+ }
+
+ private class StopEffectorBody extends EffectorBody<Void> {
+ @Override
+ public Void call(ConfigBag parameters) {
+ stop(parameters);
+ return null;
+ }
+ }
+
+ /**
+ * Calls {@link #suspend(ConfigBag)}.
+ *
+ * @see {@link #newStartEffectorTask()}
+ */
+ public EffectorBody<Void> newSuspendEffectorTask() {
+ return new SuspendEffectorBody();
+ }
+
+ private class SuspendEffectorBody extends EffectorBody<Void> {
+ @Override
+ public Void call(ConfigBag parameters) {
+ suspend(parameters);
+ return null;
+ }
+ }
+
+ protected EntityInternal entity() {
+ return (EntityInternal) BrooklynTaskTags.getTargetOrContextEntity(Tasks.current());
+ }
+
+ protected Location getLocation(@Nullable Collection<? extends Location> locations) {
+ if (locations==null || locations.isEmpty()) locations = entity().getLocations();
+ if (locations.isEmpty()) {
+ MachineProvisioningLocation<?> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
+ if (provisioner!=null) locations = Arrays.<Location>asList(provisioner);
+ }
+ locations = Locations.getLocationsCheckingAncestors(locations, entity());
+
+ Maybe<MachineLocation> ml = Locations.findUniqueMachineLocation(locations);
+ if (ml.isPresent()) return ml.get();
+
+ if (locations.isEmpty())
+ throw new IllegalArgumentException("No locations specified when starting "+entity());
+ if (locations.size() != 1 || Iterables.getOnlyElement(locations)==null)
+ throw new IllegalArgumentException("Ambiguous locations detected when starting "+entity()+": "+locations);
+ return Iterables.getOnlyElement(locations);
+ }
+
+ /** runs the tasks needed to start, wrapped by setting {@link Attributes#SERVICE_STATE_EXPECTED} appropriately */
+ public void start(Collection<? extends Location> locations) {
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING);
+ try {
+ startInLocations(locations);
+ DynamicTasks.waitForLast();
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING);
+ } catch (Throwable t) {
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE);
+ throw Exceptions.propagate(t);
+ }
+ }
+
+ /** Asserts there is a single location and calls {@link #startInLocation(Location)} with that location. */
+ protected void startInLocations(Collection<? extends Location> locations) {
+ startInLocation(getLocation(locations));
+ }
+
+ /** Dispatches to the appropriate method(s) to start in the given location. */
+ protected void startInLocation(final Location location) {
+ Supplier<MachineLocation> locationS = null;
+ if (location instanceof MachineProvisioningLocation) {
+ Task<MachineLocation> machineTask = provisionAsync((MachineProvisioningLocation<?>)location);
+ locationS = Tasks.supplier(machineTask);
+ } else if (location instanceof MachineLocation) {
+ locationS = Suppliers.ofInstance((MachineLocation)location);
+ }
+ Preconditions.checkState(locationS != null, "Unsupported location "+location+", when starting "+entity());
+
+ final Supplier<MachineLocation> locationSF = locationS;
+ preStartAtMachineAsync(locationSF);
+ DynamicTasks.queue("start (processes)", new StartProcessesAtMachineTask(locationSF));
+ postStartAtMachineAsync();
+ }
+
+ private class StartProcessesAtMachineTask implements Runnable {
+ private final Supplier<MachineLocation> machineSupplier;
+ private StartProcessesAtMachineTask(Supplier<MachineLocation> machineSupplier) {
+ this.machineSupplier = machineSupplier;
+ }
+ @Override
+ public void run() {
+ startProcessesAtMachine(machineSupplier);
+ }
+ }
+
+ /**
+ * Returns a queued {@link Task} which provisions a machine in the given location
+ * and returns that machine. The task can be used as a supplier to subsequent methods.
+ */
+ protected Task<MachineLocation> provisionAsync(final MachineProvisioningLocation<?> location) {
+ return DynamicTasks.queue(Tasks.<MachineLocation>builder().name("provisioning (" + location.getDisplayName() + ")").body(
+ new ProvisionMachineTask(location)).build());
+ }
+
+ private class ProvisionMachineTask implements Callable<MachineLocation> {
+ final MachineProvisioningLocation<?> location;
+
+ private ProvisionMachineTask(MachineProvisioningLocation<?> location) {
+ this.location = location;
+ }
+
+ public MachineLocation call() throws Exception {
+ // Blocks if a latch was configured.
+ entity().getConfig(BrooklynConfigKeys.PROVISION_LATCH);
+ final Map<String, Object> flags = obtainProvisioningFlags(location);
+ if (!(location instanceof LocalhostMachineProvisioningLocation))
+ log.info("Starting {}, obtaining a new location instance in {} with ports {}", new Object[]{entity(), location, flags.get("inboundPorts")});
+ entity().setAttribute(SoftwareProcess.PROVISIONING_LOCATION, location);
+ MachineLocation machine;
+ try {
+ machine = Tasks.withBlockingDetails("Provisioning machine in " + location, new ObtainLocationTask(location, flags));
+ if (machine == null)
+ throw new NoMachinesAvailableException("Failed to obtain machine in " + location.toString());
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("While starting {}, obtained new location instance {}", entity(),
+ (machine instanceof SshMachineLocation ?
+ machine + ", details " + ((SshMachineLocation) machine).getUser() + ":" + Sanitizer.sanitize(((SshMachineLocation) machine).config().getLocalBag())
+ : machine));
+ return machine;
+ }
+ }
+
+ private static class ObtainLocationTask implements Callable<MachineLocation> {
+ final MachineProvisioningLocation<?> location;
+ final Map<String, Object> flags;
+
+ private ObtainLocationTask(MachineProvisioningLocation<?> location, Map<String, Object> flags) {
+ this.flags = flags;
+ this.location = location;
+ }
+
+ public MachineLocation call() throws NoMachinesAvailableException {
+ return location.obtain(flags);
+ }
+ }
+
+ /** Wraps a call to {@link #preStartCustom(MachineLocation)}, after setting the hostname and address. */
+ protected void preStartAtMachineAsync(final Supplier<MachineLocation> machineS) {
+ DynamicTasks.queue("pre-start", new PreStartTask(machineS.get()));
+ }
+
+ private class PreStartTask implements Runnable {
+ final MachineLocation machine;
+ private PreStartTask(MachineLocation machine) {
+ this.machine = machine;
+ }
+ public void run() {
+ log.info("Starting {} on machine {}", entity(), machine);
+ Collection<Location> oldLocs = entity().getLocations();
+ if (!oldLocs.isEmpty()) {
+ List<MachineLocation> oldSshLocs = ImmutableList.copyOf(Iterables.filter(oldLocs, MachineLocation.class));
+ if (!oldSshLocs.isEmpty()) {
+ // check if existing locations are compatible
+ log.debug("Entity " + entity() + " had machine locations " + oldSshLocs + " when starting at " + machine + "; checking if they are compatible");
+ for (MachineLocation oldLoc : oldSshLocs) {
+ // machines are deemed compatible if hostname and address are the same, or they are localhost
+ // this allows a machine create by jclouds to then be defined with an ip-based spec
+ if (!"localhost".equals(machine.getConfig(AbstractLocation.ORIGINAL_SPEC))) {
+ checkLocationParametersCompatible(machine, oldLoc, "hostname",
+ oldLoc.getAddress().getHostName(), machine.getAddress().getHostName());
+ checkLocationParametersCompatible(machine, oldLoc, "address",
+ oldLoc.getAddress().getHostAddress(), machine.getAddress().getHostAddress());
+ }
+ }
+ log.debug("Entity " + entity() + " old machine locations " + oldSshLocs + " were compatible, removing them to start at " + machine);
+ entity().removeLocations(oldSshLocs);
+ }
+ }
+ entity().addLocations(ImmutableList.of((Location) machine));
+
+ // elsewhere we rely on (public) hostname being set _after_ subnet_hostname
+ // (to prevent the tiny possibility of races resulting in hostname being returned
+ // simply because subnet is still being looked up)
+ Maybe<String> lh = Machines.getSubnetHostname(machine);
+ Maybe<String> la = Machines.getSubnetIp(machine);
+ if (lh.isPresent()) entity().setAttribute(Attributes.SUBNET_HOSTNAME, lh.get());
+ if (la.isPresent()) entity().setAttribute(Attributes.SUBNET_ADDRESS, la.get());
+ entity().setAttribute(Attributes.HOSTNAME, machine.getAddress().getHostName());
+ entity().setAttribute(Attributes.ADDRESS, machine.getAddress().getHostAddress());
+ if (machine instanceof SshMachineLocation) {
+ @SuppressWarnings("resource")
+ SshMachineLocation sshMachine = (SshMachineLocation) machine;
+ UserAndHostAndPort sshAddress = UserAndHostAndPort.fromParts(sshMachine.getUser(), sshMachine.getAddress().getHostName(), sshMachine.getPort());
+ entity().setAttribute(Attributes.SSH_ADDRESS, sshAddress);
+ }
+
+ if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.OPEN_IPTABLES))) {
+ if (machine instanceof SshMachineLocation) {
+ Iterable<Integer> inboundPorts = (Iterable<Integer>) machine.config().get(CloudLocationConfig.INBOUND_PORTS);
+ machineInitTasks.openIptablesAsync(inboundPorts, (SshMachineLocation)machine);
+ } else {
+ log.warn("Ignoring flag OPEN_IPTABLES on non-ssh location {}", machine);
+ }
+ }
+ if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.STOP_IPTABLES))) {
+ if (machine instanceof SshMachineLocation) {
+ machineInitTasks.stopIptablesAsync((SshMachineLocation)machine);
+ } else {
+ log.warn("Ignoring flag STOP_IPTABLES on non-ssh location {}", machine);
+ }
+ }
+ if (Boolean.TRUE.equals(entity().getConfig(SoftwareProcess.DONT_REQUIRE_TTY_FOR_SUDO))) {
+ if (machine instanceof SshMachineLocation) {
+ machineInitTasks.dontRequireTtyForSudoAsync((SshMachineLocation)machine);
+ } else {
+ log.warn("Ignoring flag DONT_REQUIRE_TTY_FOR_SUDO on non-ssh location {}", machine);
+ }
+ }
+ resolveOnBoxDir(entity(), machine);
+ preStartCustom(machine);
+ }
+ }
+
+ /**
+ * Resolves the on-box dir.
+ * <p>
+ * Initialize and pre-create the right onbox working dir, if an ssh machine location.
+ * Logs a warning if not.
+ */
+ @SuppressWarnings("deprecation")
+ public static String resolveOnBoxDir(EntityInternal entity, MachineLocation machine) {
+ String base = entity.getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR);
+ if (base==null) base = machine.getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR);
+ if (base!=null && Boolean.TRUE.equals(entity.getConfig(ON_BOX_BASE_DIR_RESOLVED))) return base;
+ if (base==null) base = entity.getManagementContext().getConfig().getConfig(BrooklynConfigKeys.ONBOX_BASE_DIR);
+ if (base==null) base = entity.getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR);
+ if (base==null) base = machine.getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR);
+ if (base==null) base = entity.getManagementContext().getConfig().getConfig(BrooklynConfigKeys.BROOKLYN_DATA_DIR);
+ if (base==null) base = "~/brooklyn-managed-processes";
+ if (base.equals("~")) base=".";
+ if (base.startsWith("~/")) base = "."+base.substring(1);
+
+ String resolvedBase = null;
+ if (entity.getConfig(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION) || machine.getConfig(BrooklynConfigKeys.SKIP_ON_BOX_BASE_DIR_RESOLUTION)) {
+ if (log.isDebugEnabled()) log.debug("Skipping on-box base dir resolution for "+entity+" at "+machine);
+ if (!Os.isAbsolutish(base)) base = "~/"+base;
+ resolvedBase = Os.tidyPath(base);
+ } else if (machine instanceof SshMachineLocation) {
+ SshMachineLocation ms = (SshMachineLocation)machine;
+ ProcessTaskWrapper<Integer> baseTask = SshEffectorTasks.ssh(
+ BashCommands.alternatives("mkdir -p \"${BASE_DIR}\"",
+ BashCommands.chain(
+ BashCommands.sudo("mkdir -p \"${BASE_DIR}\""),
+ BashCommands.sudo("chown "+ms.getUser()+" \"${BASE_DIR}\""))),
+ "cd ~",
+ "cd ${BASE_DIR}",
+ "echo BASE_DIR_RESULT':'`pwd`:BASE_DIR_RESULT")
+ .environmentVariable("BASE_DIR", base)
+ .requiringExitCodeZero()
+ .summary("initializing on-box base dir "+base).newTask();
+ DynamicTasks.queueIfPossible(baseTask).orSubmitAsync(entity);
+ resolvedBase = Strings.getFragmentBetween(baseTask.block().getStdout(), "BASE_DIR_RESULT:", ":BASE_DIR_RESULT");
+ }
+ if (resolvedBase==null) {
+ if (!Os.isAbsolutish(base)) base = "~/"+base;
+ resolvedBase = Os.tidyPath(base);
+ log.warn("Could not resolve on-box directory for "+entity+" at "+machine+"; using "+resolvedBase+", though this may not be accurate at the target (and may fail shortly)");
+ }
+ entity.setConfig(BrooklynConfigKeys.ONBOX_BASE_DIR, resolvedBase);
+ entity.setConfig(ON_BOX_BASE_DIR_RESOLVED, true);
+ return resolvedBase;
+ }
+
+ protected void checkLocationParametersCompatible(MachineLocation oldLoc, MachineLocation newLoc, String paramSummary,
+ Object oldParam, Object newParam) {
+ if (oldParam==null || newParam==null || !oldParam.equals(newParam))
+ throw new IllegalStateException("Cannot start "+entity()+" in "+newLoc+" as it has already been started with incompatible location "+oldLoc+" " +
+ "("+paramSummary+" not compatible: "+oldParam+" / "+newParam+"); "+newLoc+" may require manual removal.");
+ }
+
+ /**
+ * Default pre-start hooks.
+ * <p>
+ * Can be extended by subclasses if needed.
+ */
+ protected void preStartCustom(MachineLocation machine) {
+ ConfigToAttributes.apply(entity());
+
+ // Opportunity to block startup until other dependent components are available
+ Object val = entity().getConfig(SoftwareProcess.START_LATCH);
+ if (val != null) log.debug("{} finished waiting for start-latch; continuing...", entity(), val);
+ }
+
+ protected Map<String, Object> obtainProvisioningFlags(final MachineProvisioningLocation<?> location) {
+ if (entity() instanceof ProvidesProvisioningFlags) {
+ return ((ProvidesProvisioningFlags)entity()).obtainProvisioningFlags(location).getAllConfig();
+ }
+ return MutableMap.<String, Object>of();
+ }
+
+ protected abstract String startProcessesAtMachine(final Supplier<MachineLocation> machineS);
+
+ protected void postStartAtMachineAsync() {
+ DynamicTasks.queue("post-start", new PostStartTask());
+ }
+
+ private class PostStartTask implements Runnable {
+ public void run() {
+ postStartCustom();
+ }
+ }
+
+ /**
+ * Default post-start hooks.
+ * <p>
+ * Can be extended by subclasses, and typically will wait for confirmation of start.
+ * The service not set to running until after this. Also invoked following a restart.
+ */
+ protected void postStartCustom() {
+ // nothing by default
+ }
+
+ /**
+ * whether when 'auto' mode is specified, the machine should be stopped when the restart effector is called
+ * <p>
+ * with {@link MachineLifecycleEffectorTasks}, a machine will always get created on restart if there wasn't one already
+ * (unlike certain subclasses which might attempt a shortcut process-level restart)
+ * so there is no reason for default behaviour of restart to throw away a provisioned machine,
+ * hence default impl returns <code>false</code>.
+ * <p>
+ * if it is possible to tell that a machine is unhealthy, or if {@link #restart(ConfigBag)} is overridden,
+ * then it might be appropriate to return <code>true</code> here.
+ */
+ protected boolean getDefaultRestartStopsMachine() {
+ return false;
+ }
+
+ /**
+ * Default restart implementation for an entity.
+ * <p>
+ * Stops processes if possible, then starts the entity again.
+ */
+ public void restart(ConfigBag parameters) {
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING);
+
+ RestartMachineMode isRestartMachine = parameters.get(RestartSoftwareParameters.RESTART_MACHINE_TYPED);
+ if (isRestartMachine==null)
+ isRestartMachine=RestartMachineMode.AUTO;
+ if (isRestartMachine==RestartMachineMode.AUTO)
+ isRestartMachine = getDefaultRestartStopsMachine() ? RestartMachineMode.TRUE : RestartMachineMode.FALSE;
+
+ // Calling preStopCustom without a corresponding postStopCustom invocation
+ // doesn't look right so use a separate callback pair; Also depending on the arguments
+ // stop() could be called which will call the {pre,post}StopCustom on its own.
+ DynamicTasks.queue("pre-restart", new PreRestartTask());
+
+ if (isRestartMachine==RestartMachineMode.FALSE) {
+ DynamicTasks.queue("stopping (process)", new StopProcessesAtMachineTask());
+ } else {
+ DynamicTasks.queue("stopping (machine)", new StopMachineTask());
+ }
+
+ DynamicTasks.queue("starting", new StartInLocationsTask());
+ restartChildren(parameters);
+ DynamicTasks.queue("post-restart", new PostRestartTask());
+
+ DynamicTasks.waitForLast();
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.RUNNING);
+ }
+
+ private class PreRestartTask implements Runnable {
+ @Override
+ public void run() {
+ preRestartCustom();
+ }
+ }
+ private class PostRestartTask implements Runnable {
+ @Override
+ public void run() {
+ postRestartCustom();
+ }
+ }
+ private class StartInLocationsTask implements Runnable {
+ @Override
+ public void run() {
+ // startInLocations will look up the location, and provision a machine if necessary
+ // (if it remembered the provisioning location)
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.STARTING);
+ startInLocations(null);
+ }
+ }
+
+ protected void restartChildren(ConfigBag parameters) {
+ // TODO should we consult ChildStartableMode?
+
+ Boolean isRestartChildren = parameters.get(RestartSoftwareParameters.RESTART_CHILDREN);
+ if (isRestartChildren==null || !isRestartChildren) {
+ return;
+ }
+
+ if (isRestartChildren) {
+ DynamicTasks.queue(StartableMethods.restartingChildren(entity(), parameters));
+ return;
+ }
+
+ throw new IllegalArgumentException("Invalid value '"+isRestartChildren+"' for "+RestartSoftwareParameters.RESTART_CHILDREN.getName());
+ }
+
+ /**
+ * Default stop implementation for an entity.
+ * <p>
+ * Aborts if already stopped, otherwise sets state {@link Lifecycle#STOPPING} then
+ * invokes {@link #preStopCustom()}, {@link #stopProcessesAtMachine()}, then finally
+ * {@link #stopAnyProvisionedMachines()} and sets state {@link Lifecycle#STOPPED}.
+ * If no errors were encountered call {@link #postStopCustom()} at the end.
+ */
+ public void stop(ConfigBag parameters) {
+ doStop(parameters, new StopAnyProvisionedMachinesTask());
+ }
+
+ /**
+ * As {@link #stop} but calling {@link #suspendAnyProvisionedMachines} rather than
+ * {@link #stopAnyProvisionedMachines}.
+ */
+ public void suspend(ConfigBag parameters) {
+ doStop(parameters, new SuspendAnyProvisionedMachinesTask());
+ }
+
+ protected void doStop(ConfigBag parameters, Callable<StopMachineDetails<Integer>> stopTask) {
+ preStopConfirmCustom();
+
+ log.info("Stopping {} in {}", entity(), entity().getLocations());
+
+ StopMode stopMachineMode = getStopMachineMode(parameters);
+ StopMode stopProcessMode = parameters.get(StopSoftwareParameters.STOP_PROCESS_MODE);
+
+ DynamicTasks.queue("pre-stop", new PreStopCustomTask());
+
+ Maybe<MachineLocation> machine = Machines.findUniqueMachineLocation(entity().getLocations());
+ Task<String> stoppingProcess = null;
+ if (canStop(stopProcessMode, entity())) {
+ stoppingProcess = DynamicTasks.queue("stopping (process)", new StopProcessesAtMachineTask());
+ }
+
+ Task<StopMachineDetails<Integer>> stoppingMachine = null;
+ if (canStop(stopMachineMode, machine.isAbsent())) {
+ // Release this machine (even if error trying to stop process - we rethrow that after)
+ stoppingMachine = DynamicTasks.queue("stopping (machine)", stopTask);
+
+ DynamicTasks.drain(entity().getConfig(STOP_PROCESS_TIMEOUT), false);
+
+ // shutdown the machine if stopping process fails or takes too long
+ synchronized (stoppingMachine) {
+ // task also used as mutex by DST when it submits it; ensure it only submits once!
+ if (!stoppingMachine.isSubmitted()) {
+ // force the stoppingMachine task to run by submitting it here
+ StringBuilder msg = new StringBuilder("Submitting machine stop early in background for ").append(entity());
+ if (stoppingProcess == null) {
+ msg.append(". Process stop skipped, pre-stop not finished?");
+ } else {
+ msg.append(" because process stop has ").append(
+ (stoppingProcess.isDone() ? "finished abnormally" : "not finished"));
+ }
+ log.warn(msg.toString());
+ Entities.submit(entity(), stoppingMachine);
+ }
+ }
+ }
+
+ try {
+ // This maintains previous behaviour of silently squashing any errors on the stoppingProcess task if the
+ // stoppingMachine exits with a nonzero value
+ boolean checkStopProcesses = (stoppingProcess != null && (stoppingMachine == null || stoppingMachine.get().value == 0));
+
+ if (checkStopProcesses) {
+ // TODO we should test for destruction above, not merely successful "stop", as things like localhost and ssh won't be destroyed
+ DynamicTasks.waitForLast();
+ if (machine.isPresent()) {
+ // throw early errors *only if* there is a machine and we have not destroyed it
+ stoppingProcess.get();
+ }
+ }
+ } catch (Throwable e) {
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.ON_FIRE);
+ Exceptions.propagate(e);
+ }
+ entity().setAttribute(SoftwareProcess.SERVICE_UP, false);
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPED);
+
+ DynamicTasks.queue("post-stop", new PostStopCustomTask());
+
+ if (log.isDebugEnabled()) log.debug("Stopped software process entity "+entity());
+ }
+
+ private class StopAnyProvisionedMachinesTask implements Callable<StopMachineDetails<Integer>> {
+ public StopMachineDetails<Integer> call() {
+ return stopAnyProvisionedMachines();
+ }
+ }
+
+ private class SuspendAnyProvisionedMachinesTask implements Callable<StopMachineDetails<Integer>> {
+ public StopMachineDetails<Integer> call() {
+ return suspendAnyProvisionedMachines();
+ }
+ }
+
+ private class StopProcessesAtMachineTask implements Callable<String> {
+ public String call() {
+ DynamicTasks.markInessential();
+ stopProcessesAtMachine();
+ DynamicTasks.waitForLast();
+ return "Stop processes completed with no errors.";
+ }
+ }
+
+ private class StopMachineTask implements Callable<String> {
+ public String call() {
+ DynamicTasks.markInessential();
+ stop(ConfigBag.newInstance().configure(StopSoftwareParameters.STOP_MACHINE_MODE, StopMode.IF_NOT_STOPPED));
+ DynamicTasks.waitForLast();
+ return "Stop of machine completed with no errors.";
+ }
+ }
+
+ private class PreStopCustomTask implements Callable<String> {
+ public String call() {
+ if (entity().getAttribute(SoftwareProcess.SERVICE_STATE_ACTUAL) == Lifecycle.STOPPED) {
+ log.debug("Skipping stop of entity " + entity() + " when already stopped");
+ return "Already stopped";
+ }
+ ServiceStateLogic.setExpectedState(entity(), Lifecycle.STOPPING);
+ entity().setAttribute(SoftwareProcess.SERVICE_UP, false);
+ preStopCustom();
+ return null;
+ }
+ }
+
+ private class PostStopCustomTask implements Callable<Void> {
+ public Void call() {
+ postStopCustom();
+ return null;
+ }
+ }
+
+ public static StopMode getStopMachineMode(ConfigBag parameters) {
+ final StopMode stopMachineMode = parameters.get(StopSoftwareParameters.STOP_MACHINE_MODE);
+ return stopMachineMode;
+ }
+
+ public static boolean canStop(StopMode stopMode, Entity entity) {
+ boolean isEntityStopped = entity.getAttribute(SoftwareProcess.SERVICE_STATE_ACTUAL)==Lifecycle.STOPPED;
+ return canStop(stopMode, isEntityStopped);
+ }
+
+ protected static boolean canStop(StopMode stopMode, boolean isStopped) {
+ return stopMode == StopMode.ALWAYS ||
+ stopMode == StopMode.IF_NOT_STOPPED && !isStopped;
+ }
+
+ /**
+ * Override to check whether stop can be executed.
+ * Throw if stop should be aborted.
+ */
+ protected void preStopConfirmCustom() {
+ // nothing needed here
+ }
+
+ protected void preStopCustom() {
+ // nothing needed here
+ }
+
+ protected void postStopCustom() {
+ // nothing needed here
+ }
+
+ protected void preRestartCustom() {
+ // nothing needed here
+ }
+
+ protected void postRestartCustom() {
+ // nothing needed here
+ }
+
+ public static class StopMachineDetails<T> implements Serializable {
+ private static final long serialVersionUID = 3256747214315895431L;
+ final String message;
+ final T value;
+ protected StopMachineDetails(String message, T value) {
+ this.message = message;
+ this.value = value;
+ }
+ @Override
+ public String toString() {
+ return message;
+ }
+ }
+
+ /**
+ * Return string message of result.
+ * <p>
+ * Can run synchronously or not, caller will submit/queue as needed, and will block on any submitted tasks.
+ */
+ protected abstract String stopProcessesAtMachine();
+
+ /**
+ * Stop and release the {@link MachineLocation} the entity is provisioned at.
+ * <p>
+ * Can run synchronously or not, caller will submit/queue as needed, and will block on any submitted tasks.
+ */
+ protected StopMachineDetails<Integer> stopAnyProvisionedMachines() {
+ @SuppressWarnings("unchecked")
+ MachineProvisioningLocation<MachineLocation> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
+
+ if (Iterables.isEmpty(entity().getLocations())) {
+ log.debug("No machine decommissioning necessary for "+entity()+" - no locations");
+ return new StopMachineDetails<Integer>("No machine decommissioning necessary - no locations", 0);
+ }
+
+ // Only release this machine if we ourselves provisioned it (e.g. it might be running other services)
+ if (provisioner==null) {
+ log.debug("No machine decommissioning necessary for "+entity()+" - did not provision");
+ return new StopMachineDetails<Integer>("No machine decommissioning necessary - did not provision", 0);
+ }
+
+ Location machine = getLocation(null);
+ if (!(machine instanceof MachineLocation)) {
+ log.debug("No decommissioning necessary for "+entity()+" - not a machine location ("+machine+")");
+ return new StopMachineDetails<Integer>("No machine decommissioning necessary - not a machine ("+machine+")", 0);
+ }
+
+ clearEntityLocationAttributes(machine);
+ provisioner.release((MachineLocation)machine);
+
+ return new StopMachineDetails<Integer>("Decommissioned "+machine, 1);
+ }
+
+ /**
+ * Suspend the {@link MachineLocation} the entity is provisioned at.
+ * <p>
+ * Expects the entity's {@link SoftwareProcess#PROVISIONING_LOCATION provisioner} to be capable of
+ * {@link SuspendsMachines suspending machines}.
+ *
+ * @throws java.lang.UnsupportedOperationException if the entity's provisioner cannot suspend machines.
+ * @see MachineManagementMixins.SuspendsMachines
+ */
+ protected StopMachineDetails<Integer> suspendAnyProvisionedMachines() {
+ @SuppressWarnings("unchecked")
+ MachineProvisioningLocation<MachineLocation> provisioner = entity().getAttribute(SoftwareProcess.PROVISIONING_LOCATION);
+
+ if (Iterables.isEmpty(entity().getLocations())) {
+ log.debug("No machine decommissioning necessary for " + entity() + " - no locations");
+ return new StopMachineDetails<>("No machine suspend necessary - no locations", 0);
+ }
+
+ // Only release this machine if we ourselves provisioned it (e.g. it might be running other services)
+ if (provisioner == null) {
+ log.debug("No machine decommissioning necessary for " + entity() + " - did not provision");
+ return new StopMachineDetails<>("No machine suspend necessary - did not provision", 0);
+ }
+
+ Location machine = getLocation(null);
+ if (!(machine instanceof MachineLocation)) {
+ log.debug("No decommissioning necessary for " + entity() + " - not a machine location (" + machine + ")");
+ return new StopMachineDetails<>("No machine suspend necessary - not a machine (" + machine + ")", 0);
+ }
+
+ if (!(provisioner instanceof SuspendsMachines)) {
+ log.debug("Location provisioner ({}) cannot suspend machines", provisioner);
+ throw new UnsupportedOperationException("Location provisioner cannot suspend machines: " + provisioner);
+ }
+
+ clearEntityLocationAttributes(machine);
+ SuspendsMachines.class.cast(provisioner).suspendMachine(MachineLocation.class.cast(machine));
+
+ return new StopMachineDetails<>("Suspended " + machine, 1);
+ }
+
+ /**
+ * Nulls the attached entity's hostname, address, subnet hostname and subnet address sensors
+ * and removes the given machine from its locations.
+ */
+ protected void clearEntityLocationAttributes(Location machine) {
+ entity().removeLocations(ImmutableList.of(machine));
+ entity().setAttribute(Attributes.HOSTNAME, null);
+ entity().setAttribute(Attributes.ADDRESS, null);
+ entity().setAttribute(Attributes.SUBNET_HOSTNAME, null);
+ entity().setAttribute(Attributes.SUBNET_ADDRESS, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/NaiveScriptRunner.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/NaiveScriptRunner.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/NaiveScriptRunner.java
new file mode 100644
index 0000000..29b729e
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/NaiveScriptRunner.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base.lifecycle;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+
+/** Marks something which can run scripts. Called "Naive" because it hides too much of the complexity,
+ * about script execution and other ssh-related tasks (put, etc). The {@link SshTasks} approach seems better.
+ * <p>
+ * Not gone so far as deprecating (yet, in 0.6.0) although we might. Feedback welcome.
+ * @since 0.6.0 */
+public interface NaiveScriptRunner {
+
+ /** Runs a script and returns the result code */
+ int execute(List<String> script, String summaryForLogging);
+
+ /** Runs a script and returns the result code, supporting flags including:
+ * out, err as output/error streams;
+ * logPrefix, prefix string to put in log output;
+ * env, map of environment vars to pass to shell environment */
+ @SuppressWarnings("rawtypes")
+ int execute(Map flags, List<String> script, String summaryForLogging);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptHelper.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptHelper.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptHelper.java
new file mode 100644
index 0000000..bb974c5
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptHelper.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base.lifecycle;
+
+import static java.lang.String.format;
+import groovy.lang.Closure;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskQueueingContext;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import org.apache.brooklyn.util.GroovyJavaMethods;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
+import org.apache.brooklyn.util.core.mutex.WithMutexes;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.TaskBuilder;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.stream.Streams;
+import org.apache.brooklyn.util.text.Identifiers;
+import org.apache.brooklyn.util.text.Strings;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+public class ScriptHelper {
+
+ public static final Logger log = LoggerFactory.getLogger(ScriptHelper.class);
+
+ protected final NaiveScriptRunner runner;
+ public final String summary;
+
+ public final ScriptPart header = new ScriptPart(this);
+ public final ScriptPart body = new ScriptPart(this);
+ public final ScriptPart footer = new ScriptPart(this);
+
+ @SuppressWarnings("rawtypes")
+ protected final Map flags = new LinkedHashMap();
+ protected Predicate<? super Integer> resultCodeCheck = Predicates.alwaysTrue();
+ protected Predicate<? super ScriptHelper> executionCheck = Predicates.alwaysTrue();
+
+ protected boolean isTransient = false;
+ protected boolean isInessential = false;
+ protected boolean closeSshConnection = false;
+ protected boolean gatherOutput = false;
+ protected boolean noExtraOutput = false;
+ protected ByteArrayOutputStream stdout, stderr;
+ protected Task<Integer> task;
+
+ public ScriptHelper(NaiveScriptRunner runner, String summary) {
+ this.runner = runner;
+ this.summary = summary;
+ }
+
+ /**
+ * Takes a closure which accepts this ScriptHelper and returns true or false
+ * as to whether the script needs to run (or can throw error if desired)
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public ScriptHelper executeIf(Closure c) {
+ Predicate<ScriptHelper> predicate = GroovyJavaMethods.predicateFromClosure(c);
+ return executeIf(predicate);
+ }
+
+ public ScriptHelper executeIf(Predicate<? super ScriptHelper> c) {
+ executionCheck = c;
+ return this;
+ }
+
+ public ScriptHelper skipIfBodyEmpty() {
+ Predicate<ScriptHelper> p = new Predicate<ScriptHelper>() {
+ @Override
+ public boolean apply(ScriptHelper input) {
+ return !input.body.isEmpty();
+ }
+ };
+
+ return executeIf(p);
+ }
+
+ public ScriptHelper failIfBodyEmpty() {
+ Predicate<ScriptHelper> p = new Predicate<ScriptHelper>() {
+ @Override
+ public boolean apply(ScriptHelper input) {
+ if (input.body.isEmpty()) {
+ throw new IllegalStateException("body empty for " + summary);
+ }
+ return true;
+ }
+ };
+
+ return executeIf(p);
+ }
+
+ public ScriptHelper failOnNonZeroResultCode(boolean val) {
+ if (val) {
+ failOnNonZeroResultCode();
+ } else {
+ requireResultCode(Predicates.alwaysTrue());
+ }
+ return this;
+ }
+
+ public ScriptHelper failOnNonZeroResultCode() {
+ return updateTaskAndFailOnNonZeroResultCode();
+ }
+
+ public ScriptHelper failOnNonZeroResultCodeWithoutUpdatingTask() {
+ requireResultCode(Predicates.equalTo(0));
+ return this;
+ }
+
+ public ScriptHelper updateTaskAndFailOnNonZeroResultCode() {
+ gatherOutput();
+ // a failure listener would be a cleaner way
+
+ resultCodeCheck = new Predicate<Integer>() {
+ @Override
+ public boolean apply(@Nullable Integer input) {
+ if (input==0) return true;
+
+ try {
+ String notes = "";
+ if (!getResultStderr().isEmpty())
+ notes += "STDERR\n" + getResultStderr()+"\n";
+ if (!getResultStdout().isEmpty())
+ notes += "\n" + "STDOUT\n" + getResultStdout()+"\n";
+ Tasks.setExtraStatusDetails(notes.trim());
+ } catch (Exception e) {
+ log.warn("Unable to collect additional metadata on failure of "+summary+": "+e);
+ }
+
+ return false;
+ }
+ };
+
+ return this;
+ }
+
+ /**
+ * Convenience for error-checking the result.
+ * <p/>
+ * Takes closure which accepts bash exit code (integer),
+ * and returns false if it is invalid. Default is that this resultCodeCheck
+ * closure always returns true (and the exit code is made available to the
+ * caller if they care)
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public ScriptHelper requireResultCode(Closure integerFilter) {
+ Predicate<Integer> objectPredicate = GroovyJavaMethods.predicateFromClosure(integerFilter);
+ return requireResultCode(objectPredicate);
+ }
+
+ public ScriptHelper requireResultCode(Predicate<? super Integer> integerFilter) {
+ resultCodeCheck = integerFilter;
+ return this;
+ }
+
+ protected Runnable mutexAcquire = new Runnable() {
+ public void run() {
+ }
+ };
+
+ protected Runnable mutexRelease = new Runnable() {
+ public void run() {
+ }
+ };
+
+ /**
+ * indicates that the script should acquire the given mutexId on the given mutexSupport
+ * and maintain it for the duration of script execution;
+ * typically used to prevent parallel scripts from conflicting in access to a resource
+ * (e.g. a folder, or a config file used by a process)
+ */
+ public ScriptHelper useMutex(final WithMutexes mutexSupport, final String mutexId, final String description) {
+ mutexAcquire = new Runnable() {
+ public void run() {
+ try {
+ mutexSupport.acquireMutex(mutexId, description);
+ } catch (InterruptedException e) {
+ throw new RuntimeInterruptedException(e);
+ }
+ }
+ };
+
+ mutexRelease = new Runnable() {
+ public void run() {
+ mutexSupport.releaseMutex(mutexId);
+ }
+ };
+
+ return this;
+ }
+
+ public ScriptHelper gatherOutput() {
+ return gatherOutput(true);
+ }
+ public ScriptHelper gatherOutput(boolean gather) {
+ gatherOutput = gather;
+ return this;
+ }
+
+ /**
+ * Indicate that no extra output should be appended to stdout.
+ * <p>
+ * By default Brooklyn appends a message like
+ * "<tt>Executed /tmp/brooklyn-20141010-164855950...sh, result 0</tt>"
+ * to script output.
+ */
+ public ScriptHelper noExtraOutput() {
+ return noExtraOutput(true);
+ }
+
+ /**
+ * @see #noExtraOutput()
+ */
+ private ScriptHelper noExtraOutput(boolean output) {
+ this.noExtraOutput = output;
+ return this;
+ }
+
+ /** The connection should be closed and disconnected once the commands have executed. */
+ public ScriptHelper closeSshConnection() {
+ closeSshConnection = true;
+ return this;
+ }
+
+ /** Unique ID for the command execution; ensures new SSH connection from the pool. */
+ public ScriptHelper uniqueSshConnection() {
+ setFlag(SshMachineLocation.UNIQUE_ID, Identifiers.makeRandomBase64Id(32));
+ return this;
+ }
+
+ /** indicates explicitly that the task can be safely forgotten about after it runs; useful for things like
+ * check_running which run repeatedly */
+ public void setTransient() {
+ isTransient = true;
+ }
+
+ public void setInessential() {
+ isInessential = true;
+ }
+
+ public ScriptHelper inessential() {
+ isInessential = true;
+ return this;
+ }
+
+ /** creates a task which will execute this script; note this can only be run once per instance of this class */
+ public synchronized Task<Integer> newTask() {
+ if (task!=null) throw new IllegalStateException("task can only be generated once");
+ TaskBuilder<Integer> tb = Tasks.<Integer>builder().name("ssh: "+summary).body(
+ new Callable<Integer>() {
+ public Integer call() throws Exception {
+ return executeInternal();
+ }
+ });
+
+ try {
+ ByteArrayOutputStream stdin = new ByteArrayOutputStream();
+ for (String line: getLines()) {
+ stdin.write(line.getBytes());
+ stdin.write("\n".getBytes());
+ }
+ tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDIN, stdin));
+ } catch (IOException e) {
+ log.warn("Error registering stream "+BrooklynTaskTags.STREAM_STDIN+" on "+tb+": "+e, e);
+ }
+
+ Map<?,?> env = (Map<?,?>) flags.get("env");
+ if (env!=null) {
+ // if not explicitly set, env will come from getShellEnv in AbstractSoftwareProcessSshDriver.execute,
+ // which will also update this tag appropriately
+ tb.tag(BrooklynTaskTags.tagForEnvStream(BrooklynTaskTags.STREAM_ENV, env));
+ }
+
+ if (gatherOutput) {
+ stdout = new ByteArrayOutputStream();
+ tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, stdout));
+ stderr = new ByteArrayOutputStream();
+ tb.tag(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, stderr));
+ }
+ task = tb.build();
+ if (isTransient) BrooklynTaskTags.setTransient(task);
+ if (isInessential) BrooklynTaskTags.setInessential(task);
+ return task;
+ }
+
+ /** returns the task, if it has been constructed, or null; use {@link #newTask()} to build
+ * (if it is null and you need a task) */
+ public Task<Integer> peekTask() {
+ return task;
+ }
+
+ /** queues the task for execution if we are in a {@link TaskQueueingContext} (e.g. EffectorTaskFactory);
+ * or if we aren't in a queueing context, it will submit the task (assuming there is an {@link ExecutionContext}
+ * _and_ block until completion, throwing on error */
+ @Beta
+ public Task<Integer> queue() {
+ return DynamicTasks.queueIfPossible(newTask()).orSubmitAndBlock().getTask();
+ }
+
+ public int execute() {
+ if (DynamicTasks.getTaskQueuingContext()!=null) {
+ return queue().getUnchecked();
+ } else {
+ return executeInternal();
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public int executeInternal() {
+ if (!executionCheck.apply(this)) {
+ return 0;
+ }
+
+ List<String> lines = getLines();
+ if (log.isTraceEnabled()) log.trace("executing: {} - {}", summary, lines);
+
+ int result;
+ try {
+ mutexAcquire.run();
+ Map flags = getFlags();
+ if (closeSshConnection) {
+ flags.put("close", true);
+ }
+ if (gatherOutput) {
+ if (stdout==null) stdout = new ByteArrayOutputStream();
+ if (stderr==null) stderr = new ByteArrayOutputStream();
+ flags.put("out", stdout);
+ flags.put("err", stderr);
+ }
+ flags.put(ShellTool.PROP_NO_EXTRA_OUTPUT.getName(), noExtraOutput);
+ result = runner.execute(flags, lines, summary);
+ } catch (RuntimeInterruptedException e) {
+ throw logWithDetailsAndThrow(format("Execution failed, invocation error for %s: %s", summary, e.getMessage()), e);
+ } catch (Exception e) {
+ throw logWithDetailsAndThrow(format("Execution failed, invocation error for %s: %s", summary, e.getMessage()), e);
+ } finally {
+ mutexRelease.run();
+ }
+ if (log.isTraceEnabled()) log.trace("finished executing: {} - result code {}", summary, result);
+
+ if (!resultCodeCheck.apply(result)) {
+ throw logWithDetailsAndThrow(format("Execution failed, invalid result %s for %s", result, summary), null);
+ }
+ return result;
+ }
+
+ protected RuntimeException logWithDetailsAndThrow(String message, Throwable optionalCause) {
+ log.warn(message+" (throwing)");
+ Streams.logStreamTail(log, "STDERR of problem in "+Tasks.current(), stderr, 1024);
+ Streams.logStreamTail(log, "STDOUT of problem in "+Tasks.current(), stdout, 1024);
+ Streams.logStreamTail(log, "STDIN of problem in "+Tasks.current(), Streams.byteArrayOfString(Strings.join(getLines(),"\n")), 4096);
+ if (optionalCause!=null) throw new IllegalStateException(message, optionalCause);
+ throw new IllegalStateException(message);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Map getFlags() {
+ return flags;
+ }
+
+ @SuppressWarnings("unchecked")
+ public ScriptHelper setFlag(String flag, Object value) {
+ flags.put(flag, value);
+ return this;
+ }
+
+ public <T> ScriptHelper setFlag(ConfigKey<T> flag, T value) {
+ return setFlag(flag.getName(), value);
+ }
+
+ /** ensures the script runs with no environment variables; by default they will be inherited */
+ public ScriptHelper environmentVariablesReset() {
+ return environmentVariablesReset(MutableMap.of());
+ }
+
+ /** overrides the default environment variables to use the given set; by default they will be inherited.
+ * TODO would be nice to have a way to add just a few, but there is no way currently to access the
+ * getShellEnvironment() from the driver which is what gets inherited (at execution time) */
+ public ScriptHelper environmentVariablesReset(Map<?,?> envVarsToSet) {
+ setFlag("env", envVarsToSet);
+ return this;
+ }
+
+ public List<String> getLines() {
+ List<String> result = new LinkedList<String>();
+ result.addAll(header.lines);
+ result.addAll(body.lines);
+ result.addAll(footer.lines);
+ return result;
+ }
+
+ public String getResultStdout() {
+ if (stdout==null) throw new IllegalStateException("output not available on "+this+"; ensure gatherOutput(true) is set");
+ return stdout.toString();
+ }
+ public String getResultStderr() {
+ if (stderr==null) throw new IllegalStateException("output not available on "+this+"; ensure gatherOutput(true) is set");
+ return stderr.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptPart.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptPart.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptPart.java
new file mode 100644
index 0000000..9174122
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/lifecycle/ScriptPart.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.software.base.lifecycle;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class ScriptPart {
+ protected ScriptHelper helper;
+ protected List<String> lines = new LinkedList<String>();
+
+ public ScriptPart(ScriptHelper helper) {
+ this.helper = helper;
+ }
+
+ public ScriptHelper append(CharSequence line) {
+ lines.add(line.toString());
+ return helper;
+ }
+
+ public ScriptHelper append(Collection<? extends CharSequence> lines) {
+ for (CharSequence line : lines) {
+ append(line);
+ }
+ return helper;
+ }
+
+ public ScriptHelper append(CharSequence... lines) {
+ return append(Arrays.asList(lines));
+ }
+
+ public ScriptHelper prepend(CharSequence line) {
+ lines.add(0, line.toString());
+ return helper;
+ }
+
+ public ScriptHelper prepend(Collection<? extends CharSequence> lines) {
+ List<CharSequence> reversedLines = new ArrayList<CharSequence>(lines);
+ Collections.reverse(reversedLines);
+ for (CharSequence line : reversedLines) {
+ prepend(line);
+ }
+ return helper;
+ }
+
+ public ScriptHelper prepend(CharSequence... lines) {
+ return prepend(Arrays.asList(lines));
+ }
+
+ public ScriptHelper reset(CharSequence line) {
+ return reset(Arrays.asList(line));
+ }
+
+ public ScriptHelper reset(List<? extends CharSequence> ll) {
+ lines.clear();
+ return append(ll);
+ }
+
+ public boolean isEmpty() {
+ return lines.isEmpty();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/system_service/EntityLaunchListener.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/EntityLaunchListener.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/EntityLaunchListener.java
new file mode 100644
index 0000000..9557277
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/EntityLaunchListener.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.system_service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.ExecutionManager;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.SensorEvent;
+import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.EffectorCallTag;
+import org.apache.brooklyn.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.util.core.task.Tasks;
+
+public class EntityLaunchListener implements Runnable, SensorEventListener<Lifecycle> {
+ private static final String SSH_LAUNCH_TASK_PREFIX = "ssh: launching";
+ private static final String LAUNCH_CHECK_SKIP_TAG = "system-service-update";
+
+ private final AtomicReference<Task<?>> launchTaskRef = new AtomicReference<Task<?>>();
+ private final SystemServiceEnricher enricher;
+
+ public EntityLaunchListener(SystemServiceEnricher enricher) {
+ this.enricher = checkNotNull(enricher, "enricher");
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Lifecycle> event) {
+ if (event.getValue() == Lifecycle.RUNNING) {
+ Task<?>launchTask = getLatestLaunchTask(enricher.getEntity());
+ if (launchTask != null) {
+ launchTaskRef.set(launchTask);
+ if (!launchTask.isDone()) {
+ launchTask.addListener(this, enricher.getEntityExecutionContext());
+ }
+ if (launchTask.isDone()) {
+ run();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ Task<?> launchTask = launchTaskRef.getAndSet(null);
+ if (launchTask == null) return;
+ if (launchTask.isError()) return;
+ enricher.onLaunched(launchTask);
+ }
+
+ private Task<?> getLatestLaunchTask(Entity entity) {
+ Task<?> startEffector = null;
+ ExecutionManager executionmgr = enricher.getManagementContext().getExecutionManager();
+ Set<Task<?>> entityTasks = BrooklynTaskTags.getTasksInEntityContext(executionmgr, entity);
+ for (Task<?> t : entityTasks) {
+ if (BrooklynTaskTags.isEffectorTask(t)) {
+ EffectorCallTag effectorTag = BrooklynTaskTags.getEffectorCallTag(t, false);
+ if (SystemServiceEnricher.LAUNCH_EFFECTOR_NAMES.contains(effectorTag.getEffectorName()) &&
+ !BrooklynTaskTags.hasTag(t, LAUNCH_CHECK_SKIP_TAG)) {
+ if (startEffector == null || startEffector.getStartTimeUtc() < t.getStartTimeUtc()) {
+ startEffector = t;
+ }
+ BrooklynTaskTags.addTagDynamically(t, LAUNCH_CHECK_SKIP_TAG);
+ }
+ }
+ }
+ if (startEffector != null) {
+ Task<?> launchTask = findSshLaunchChild(startEffector);
+ if (launchTask != null) {
+ return launchTask;
+ }
+ }
+ return null;
+ }
+
+ private Task<?> findSshLaunchChild(Task<?> t) {
+ Iterable<Task<?>> children = Tasks.children(t);
+ for (Task<?> c : children) {
+ if (c.getDisplayName().startsWith(SSH_LAUNCH_TASK_PREFIX)) {
+ return c;
+ }
+ }
+ for (Task<?> c : children) {
+ Task<?> launchTask = findSshLaunchChild(c);
+ if (launchTask != null) {
+ return launchTask;
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/system_service/InitdServiceInstaller.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/InitdServiceInstaller.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/InitdServiceInstaller.java
new file mode 100644
index 0000000..a0724f2
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/InitdServiceInstaller.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.system_service;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.File;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.objs.HasShortName;
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.effector.core.EffectorTasks;
+import org.apache.brooklyn.entity.core.Attributes;
+import org.apache.brooklyn.entity.core.EntityInternal;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import org.apache.brooklyn.location.cloud.names.AbstractCloudMachineNamer;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.ResourceUtils;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.core.task.ssh.SshPutTaskWrapper;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.core.text.TemplateProcessor;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.ssh.BashCommands;
+
+
+public class InitdServiceInstaller implements SystemServiceInstaller {
+ private static final ConfigKey<String> SERVICE_TEMPLATE = ConfigKeys.newStringConfigKey(
+ "service.initd.service_template", "URL of the template to be used as the /etc/init.d service", "classpath:///org/apache/brooklyn/entity/system_service/service.sh");
+
+ private final Entity entity;
+ private final Enricher enricher;
+
+ public InitdServiceInstaller(Entity entity, Enricher enricher) {
+ this.entity = checkNotNull(entity, "entity");
+ this.enricher = checkNotNull(enricher, "enricher");
+ }
+
+ @Override
+ public Task<?> getServiceInstallTask() {
+ ResourceUtils resource = new ResourceUtils(this);
+ String pidFile = entity.getAttribute(SoftwareProcess.PID_FILE);
+ String template = resource.getResourceAsString(enricher.config().get(SERVICE_TEMPLATE));
+ String serviceName = getServiceName();
+ SshMachineLocation sshMachine = EffectorTasks.getSshMachine(entity);
+ Map<String, Object> params = MutableMap.<String, Object>of(
+ "service.launch_script", Os.mergePaths(getRunDir(), getStartScriptName()),
+ "service.name", serviceName,
+ "service.user", sshMachine.getUser(),
+ "service.log_path", getLogLocation());
+ if (pidFile != null) {
+ params.put("service.pid_file", pidFile);
+ }
+ String service = TemplateProcessor.processTemplateContents(template, (EntityInternal)entity, params);
+ String tmpServicePath = Os.mergePaths(getRunDir(), serviceName);
+ String servicePath = "/etc/init.d/" + serviceName;
+ SshPutTaskWrapper putServiceTask = SshTasks.newSshPutTaskFactory(sshMachine, tmpServicePath)
+ .contents(service)
+ .newTask();
+ ProcessTaskWrapper<Integer> installServiceTask = SshTasks.newSshExecTaskFactory(sshMachine,
+ BashCommands.chain(
+ BashCommands.sudo("mv " + tmpServicePath + " " + servicePath),
+ BashCommands.sudo("chmod 0755 " + servicePath),
+ BashCommands.sudo("chkconfig --add " + serviceName),
+ BashCommands.sudo("chkconfig " + serviceName + " on")))
+ .requiringExitCodeZero()
+ .newTask();
+
+ return Tasks.<Void>builder()
+ .name("install (init.d)")
+ .description("Install init.d service")
+ .add(putServiceTask)
+ .add(installServiceTask)
+ .build();
+ }
+
+ private String getServiceName() {
+ String serviceNameTemplate = enricher.config().get(SystemServiceEnricher.SERVICE_NAME);
+ return serviceNameTemplate
+ .replace("${id}", entity.getId())
+ .replace("${entity_name}", getEntityName());
+ }
+
+ private CharSequence getEntityName() {
+ String name;
+ if (entity instanceof HasShortName) {
+ name = ((HasShortName)entity).getShortName();
+ } else if (entity instanceof Entity) {
+ name = ((Entity)entity).getDisplayName();
+ } else {
+ name = "brooklyn-service";
+ }
+ return AbstractCloudMachineNamer.sanitize(name.toString()).toLowerCase();
+ }
+
+ private String getStartScriptName() {
+ return enricher.config().get(SystemServiceEnricher.LAUNCH_SCRIPT_NAME);
+ }
+
+ private String getRunDir() {
+ return entity.getAttribute(SoftwareProcess.RUN_DIR);
+ }
+
+ private String getLogLocation() {
+ String logFileLocation = entity.getAttribute(Attributes.LOG_FILE_LOCATION);
+ if (logFileLocation != null) {
+ return new File(logFileLocation).getParent();
+ } else {
+ return "/tmp";
+ }
+ }
+
+}