You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:00:37 UTC

[22/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor package in ./core/util

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/sshj/SshjTool.java
new file mode 100644
index 0000000..8262729
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/sshj/SshjTool.java
@@ -0,0 +1,1091 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.internal.ssh.sshj;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Throwables.getCausalChain;
+import static com.google.common.collect.Iterables.any;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import net.schmizz.sshj.connection.ConnectionException;
+import net.schmizz.sshj.connection.channel.direct.PTYMode;
+import net.schmizz.sshj.connection.channel.direct.Session;
+import net.schmizz.sshj.connection.channel.direct.Session.Command;
+import net.schmizz.sshj.connection.channel.direct.Session.Shell;
+import net.schmizz.sshj.connection.channel.direct.SessionChannel;
+import net.schmizz.sshj.sftp.FileAttributes;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.transport.TransportException;
+import net.schmizz.sshj.xfer.InMemorySourceFile;
+
+import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement;
+import org.apache.brooklyn.core.util.internal.ssh.BackoffLimitedRetryHandler;
+import org.apache.brooklyn.core.util.internal.ssh.ShellTool;
+import org.apache.brooklyn.core.util.internal.ssh.SshAbstractTool;
+import org.apache.brooklyn.core.util.internal.ssh.SshTool;
+import org.apache.commons.io.input.ProxyInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeTimeoutException;
+import brooklyn.util.io.FileUtil;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.stream.KnownSizeInputStream;
+import brooklyn.util.stream.StreamGobbler;
+import brooklyn.util.stream.Streams;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.CountingOutputStream;
+import com.google.common.net.HostAndPort;
+import com.google.common.primitives.Ints;
+
+/**
+ * For ssh and scp-style commands, using the sshj library.
+ */
+public class SshjTool extends SshAbstractTool implements SshTool {
+
+    /*
+     * TODO synchronization of connect/disconnect needs revisited!
+     * Saw SshjToolIntegrationTest.testExecBigConcurrentCommand fail with:
+     *     Caused by: java.lang.AssertionError
+     *         at net.schmizz.sshj.SSHClient.auth(SSHClient.java:204)
+     * i.e. another thread had called disconnect just before the failing thread
+     * did SSHClient.auth.
+     * Having multiple threads call connect/disconnect is going to be brittle. With
+     * our retries we can get away with it usually, but it's not good!
+     *
+     * TODO need to upgrade sshj version from 0.8.1 to 0.9, but jclouds 1.7.2 still 
+     * relies on 0.8.1. In 0.9, it fixes the https://github.com/shikhar/sshj/issues/89
+     * so does not throw AssertionError.
+     */
+
+    private static final Logger LOG = LoggerFactory.getLogger(SshjTool.class);
+
+    protected final int sshTries;
+    protected final long sshTriesTimeout;
+    protected final BackoffLimitedRetryHandler backoffLimitedRetryHandler;
+
+    /** Terminal type name for {@code allocatePTY} option. */
+    final static String TERM = "vt100"; // "dumb"
+    
+    private class CloseFtpChannelOnCloseInputStream extends ProxyInputStream {
+        private final SFTPClient sftp;
+
+        private CloseFtpChannelOnCloseInputStream(InputStream proxy, SFTPClient sftp) {
+            super(proxy);
+            this.sftp = sftp;
+        }
+
+        @Override
+        public void close() throws IOException {
+            super.close();
+            closeWhispering(sftp, this);
+        }
+    }
+
+    private final SshjClientConnection sshClientConnection;
+
+    public static SshjToolBuilder builder() {
+        return new SshjToolBuilder();
+    }
+    
+    public static class SshjToolBuilder extends Builder<SshjTool, SshjToolBuilder> {
+    }
+    
+    public static class Builder<T extends SshjTool, B extends Builder<T,B>> extends AbstractSshToolBuilder<T,B> {
+        protected long connectTimeout;
+        protected long sessionTimeout;
+        protected int sshTries = 4;  //allow 4 tries by default, much safer
+        protected long sshTriesTimeout = 2*60*1000;  //allow 2 minutes by default (so if too slow trying sshTries times, abort anyway)
+        protected long sshRetryDelay = 50L;
+        
+        @Override
+        public B from(Map<String,?> props) {
+            super.from(props);
+            sshTries = getOptionalVal(props, PROP_SSH_TRIES);
+            sshTriesTimeout = getOptionalVal(props, PROP_SSH_TRIES_TIMEOUT);
+            sshRetryDelay = getOptionalVal(props, PROP_SSH_RETRY_DELAY);
+            connectTimeout = getOptionalVal(props, PROP_CONNECT_TIMEOUT);
+            sessionTimeout = getOptionalVal(props, PROP_SESSION_TIMEOUT);
+            return self();
+        }
+        public B connectTimeout(int val) {
+            this.connectTimeout = val; return self();
+        }
+        public B sessionTimeout(int val) {
+            this.sessionTimeout = val; return self();
+        }
+        public B sshRetries(int val) {
+            this.sshTries = val; return self();
+        }
+        public B sshRetriesTimeout(int val) {
+            this.sshTriesTimeout = val; return self();
+        }
+        public B sshRetryDelay(long val) {
+            this.sshRetryDelay = val; return self();
+        }
+        @Override
+        @SuppressWarnings("unchecked")
+        public T build() {
+            return (T) new SshjTool(this);
+        }
+    }
+
+    public SshjTool(Map<String,?> map) {
+        this(builder().from(map));
+    }
+    
+    protected SshjTool(Builder<?,?> builder) {
+        super(builder);
+        
+        sshTries = builder.sshTries;
+        sshTriesTimeout = builder.sshTriesTimeout;
+        backoffLimitedRetryHandler = new BackoffLimitedRetryHandler(sshTries, builder.sshRetryDelay);
+
+        sshClientConnection = SshjClientConnection.builder()
+                .hostAndPort(HostAndPort.fromParts(host, port))
+                .username(user)
+                .password(password)
+                .privateKeyPassphrase(privateKeyPassphrase)
+                .privateKeyData(privateKeyData)
+                .privateKeyFile(privateKeyFile)
+                .strictHostKeyChecking(strictHostKeyChecking)
+                .connectTimeout(builder.connectTimeout)
+                .sessionTimeout(builder.sessionTimeout)
+                .build();
+        
+        if (LOG.isTraceEnabled()) LOG.trace("Created SshTool {} ({})", this, System.identityHashCode(this));
+    }
+    
+    @Override
+    public void connect() {
+        try {
+            if (LOG.isTraceEnabled()) LOG.trace("Connecting SshjTool {} ({})", this, System.identityHashCode(this));
+            acquire(sshClientConnection);
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) LOG.debug(toString()+" failed to connect (rethrowing)", e);
+            throw propagate(e, "failed to connect");
+        }
+    }
+
+    @Override
+    @Deprecated // see super
+    public void connect(int maxAttempts) {
+        connect(); // FIXME Should callers instead configure sshTries? But that would apply to all ssh attempts
+    }
+
+    @Override
+    public void disconnect() {
+        if (LOG.isTraceEnabled()) LOG.trace("Disconnecting SshjTool {} ({})", this, System.identityHashCode(this));
+        try {
+            Stopwatch perfStopwatch = Stopwatch.createStarted();
+            sshClientConnection.clear();
+            if (LOG.isTraceEnabled()) LOG.trace("SSH Performance: {} disconnect took {}", sshClientConnection.getHostAndPort(), Time.makeTimeStringRounded(perfStopwatch));
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return sshClientConnection.isConnected() && sshClientConnection.isAuthenticated();
+    }
+    
+    @Override
+    public int copyToServer(java.util.Map<String,?> props, byte[] contents, String pathAndFileOnRemoteServer) {
+        return copyToServer(props, newInputStreamSupplier(contents), contents.length, pathAndFileOnRemoteServer);
+    }
+    
+    @Override
+    public int copyToServer(Map<String,?> props, InputStream contents, String pathAndFileOnRemoteServer) {
+        /* sshj needs to:
+         *   1) to know the length of the InputStream to copy the file to perform copy; and
+         *   2) re-read the input stream on retry if the first attempt fails.
+         * For now, write it to a file, unless caller supplies a KnownSizeInputStream
+         * 
+         * (We could have a switch where we hold it in memory if less than some max size,
+         * but most the routines should supply a string or byte array or similar,
+         * so we probably don't come here too often.)
+         */
+        if (contents instanceof KnownSizeInputStream) {
+            return copyToServer(props, Suppliers.ofInstance(contents), ((KnownSizeInputStream)contents).length(), pathAndFileOnRemoteServer);
+        } else {
+            File tempFile = writeTempFile(contents);
+            try {
+                return copyToServer(props, tempFile, pathAndFileOnRemoteServer);
+            } finally {
+                tempFile.delete();
+            }
+        }
+    }
+    
+    @Override
+    public int copyToServer(Map<String,?> props, File localFile, String pathAndFileOnRemoteServer) {
+        return copyToServer(props, newInputStreamSupplier(localFile), (int)localFile.length(), pathAndFileOnRemoteServer);
+    }
+    
+    private int copyToServer(Map<String,?> props, Supplier<InputStream> contentsSupplier, long length, String pathAndFileOnRemoteServer) {
+        acquire(new PutFileAction(props, pathAndFileOnRemoteServer, contentsSupplier, length));
+        return 0; // TODO Can we assume put will have thrown exception if failed? Rather than exit code != 0?
+    }
+
+
+    @Override
+    public int copyFromServer(Map<String,?> props, String pathAndFileOnRemoteServer, File localFile) {
+        InputStream contents = acquire(new GetFileAction(pathAndFileOnRemoteServer));
+        try {
+            FileUtil.copyTo(contents, localFile);
+            return 0; // TODO Can we assume put will have thrown exception if failed? Rather than exit code != 0?
+        } finally {
+            Streams.closeQuietly(contents);
+        }
+    }
+
+    /**
+     * This creates a script containing the user's commands, copies it to the remote server, and
+     * executes the script. The script is then deleted.
+     * <p>
+     * Executing commands directly is fraught with dangers! Here are other options, and their problems:
+     * <ul>
+     *   <li>Use execCommands, rather than shell.
+     *       The user's environment will not be setup normally (e.g. ~/.bash_profile will not have been sourced)
+     *       so things like wget may not be on the PATH.
+     *   <li>Send the stream of commands to the shell.
+     *       But characters being sent can be lost.
+     *       Try the following (e.g. in an OS X terminal):
+     *        - sleep 5
+     *        - <paste a command that is 1000s of characters long>
+     *       Only the first 1024 characters appear. The rest are lost.
+     *       If sending a stream of commands, you need to be careful not send the next (big) command while the
+     *       previous one is still executing.
+     *   <li>Send a stream to the shell, but spot when the previous command has completed.
+     *       e.g. by looking for the prompt (but what if the commands being executed change the prompt?)
+     *       e.g. by putting every second command as "echo <uid>", and waiting for the stdout.
+     *       This gets fiddly...
+     * </ul>
+     * 
+     * So on balance, the script-based approach seems most reliable, even if there is an overhead
+     * of separate message(s) for copying the file!
+     * 
+     * Another consideration is long-running scripts. On some clouds when executing a script that takes 
+     * several minutes, we have seen it fail with -1 (e.g. 1 in 20 times). This suggests the ssh connection
+     * is being dropped. To avoid this problem, we can execute the script asynchronously, writing to files
+     * the stdout/stderr/pid/exitStatus. We then periodically poll to retrieve the contents of these files.
+     * Use {@link #PROP_EXEC_ASYNC} to force this mode of execution.
+     */
+    @Override
+    public int execScript(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
+        Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
+        if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) {
+            return execScriptAsyncAndPoll(props, commands, env);
+        } else {
+            if (Boolean.TRUE.equals(execAsync)) {
+                if (LOG.isDebugEnabled()) LOG.debug("Ignoring ssh exec-async configuration, because feature is disabled");
+            }
+            return new ToolAbstractExecScript(props) {
+                public int run() {
+                    String scriptContents = toScript(props, commands, env);
+                    if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as script: {}", host, scriptContents);
+                    copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
+                    return asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
+                }
+            }.run();
+        }
+    }
+
+    /**
+     * Executes the script in the background (`nohup ... &`), and then executes other ssh commands to poll for the
+     * stdout, stderr and exit code of that original process (which will each have been written to separate files).
+     * 
+     * The polling is a "long poll". That is, it executes a long-running ssh command to retrieve the stdout, etc.
+     * If that long-poll command fails, then we just execute another one to pick up from where it left off.
+     * This means we do not need to execute many ssh commands (which are expensive), but can still return promptly
+     * when the command completes.
+     * 
+     * Much of this was motivated by https://issues.apache.org/jira/browse/BROOKLYN-106, which is no longer
+     * an issue. The retries (e.g. in the upload-script) are arguably overkill given that {@link #acquire(SshAction)}
+     * will already retry. However, leaving this in place as it could prove useful when working with flakey
+     * networks in the future.
+     * 
+     * TODO There are (probably) issues with this method when using {@link ShellTool#PROP_RUN_AS_ROOT}.
+     * I (Aled) saw the .pid file having an owner of root:root, and a failure message in stderr of:
+     *   -bash: line 3: /tmp/brooklyn-20150113-161203056-XMEo-move_install_dir_from_user_to_.pid: Permission denied
+     */
+    protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) {
+        return new ToolAbstractAsyncExecScript(props) {
+            private int maxConsecutiveSshFailures = 3;
+            private Duration maxDelayBetweenPolls = Duration.seconds(20);
+            private Duration pollTimeout = getOptionalVal(props, PROP_EXEC_ASYNC_POLLING_TIMEOUT, Duration.FIVE_MINUTES);
+            private int iteration = 0;
+            private int consecutiveSshFailures = 0;
+            private int stdoutCount = 0;
+            private int stderrCount = 0;
+            private Stopwatch timer;
+            
+            public int run() {
+                timer = Stopwatch.createStarted();
+                final String scriptContents = toScript(props, commands, env);
+                if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents);
+                
+                // Upload script; try repeatedly because have seen timeout intermittently on vcloud-director (BROOKLYN-106 related).
+                boolean uploadSuccess = Repeater.create("async script upload on "+SshjTool.this.toString()+" (for "+getSummary()+")")
+                        .backoffTo(maxDelayBetweenPolls)
+                        .limitIterationsTo(3)
+                        .rethrowException()
+                        .until(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                iteration++;
+                                if (LOG.isDebugEnabled()) {
+                                    String msg = "Uploading (iteration="+iteration+") for async script on "+SshjTool.this.toString()+" (for "+getSummary()+")";
+                                    if (iteration == 1) {
+                                        LOG.trace(msg);
+                                    } else {
+                                        LOG.debug(msg);
+                                    }
+                                }
+                                copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath);
+                                return true;
+                            }})
+                        .run();
+                
+                if (!uploadSuccess) {
+                    // Unexpected! Should have either returned true or have rethrown the exception; should never get false.
+                    String msg = "Unexpected state: repeated failure for async script upload on "+SshjTool.this.toString()+" ("+getSummary()+")";
+                    LOG.warn(msg+"; rethrowing");
+                    throw new IllegalStateException(msg);
+                }
+                
+                // Execute script asynchronously
+                int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
+                if (execResult != 0) return execResult;
+
+                // Long polling to get the status
+                try {
+                    final AtomicReference<Integer> result = new AtomicReference<Integer>();
+                    boolean success = Repeater.create("async script long-poll on "+SshjTool.this.toString()+" (for "+getSummary()+")")
+                            .backoffTo(maxDelayBetweenPolls)
+                            .limitTimeTo(execTimeout)
+                            .until(new Callable<Boolean>() {
+                                @Override
+                                public Boolean call() throws Exception {
+                                    iteration++;
+                                    if (LOG.isDebugEnabled()) LOG.debug("Doing long-poll (iteration="+iteration+") for async script to complete on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                                    Integer exitstatus = longPoll();
+                                    result.set(exitstatus);
+                                    return exitstatus != null;
+                                }})
+                            .run();
+                    
+                    if (!success) {
+                        // Timed out
+                        String msg = "Timeout for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")";
+                        LOG.warn(msg+"; rethrowing");
+                        throw new TimeoutException(msg);
+                    }
+                    
+                    return result.get();
+                    
+                } catch (Exception e) {
+                    LOG.debug("Problem polling for async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); rethrowing after deleting temporary files", e);
+                    throw Exceptions.propagate(e);
+                } finally {
+                    // Delete the temporary files created (and the `tail -c` commands that might have been left behind by long-polls).
+                    // Using pollTimeout so doesn't wait forever, but waits for a reasonable (configurable) length of time.
+                    // TODO also execute this if the `buildRunScriptCommand` fails, as that might have left files behind?
+                    try {
+                        int execDeleteResult = asInt(acquire(new ShellAction(deleteTemporaryFilesCommand(), out, err, pollTimeout)), -1);
+                        if (execDeleteResult != 0) {
+                            LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"): exit status "+execDeleteResult);
+                        }
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); continuing", e);
+                    }
+                }
+            }
+            
+            Integer longPoll() throws IOException {
+                // Long-polling to get stdout, stderr + exit status of async task.
+                // If our long-poll disconnects, we will just re-execute.
+                // We wrap the stdout/stderr so that we can get the size count. 
+                // If we disconnect, we will pick up from that char of the stream.
+                // TODO Additional stdout/stderr written by buildLongPollCommand() could interfere, 
+                //      causing us to miss some characters.
+                Duration nextPollTimeout = Duration.min(pollTimeout, Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS)));
+                CountingOutputStream countingOut = (out == null) ? null : new CountingOutputStream(out);
+                CountingOutputStream countingErr = (err == null) ? null : new CountingOutputStream(err);
+                List<String> pollCommand = buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout);
+                Duration sshJoinTimeout = nextPollTimeout.add(Duration.TEN_SECONDS);
+                ShellAction action = new ShellAction(pollCommand, countingOut, countingErr, sshJoinTimeout);
+                
+                int longPollResult;
+                try {
+                    longPollResult = asInt(acquire(action, 3, nextPollTimeout), -1);
+                } catch (RuntimeTimeoutException e) {
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll timed out on "+SshjTool.this.toString()+" (for "+getSummary()+"): "+e);
+                    return null;
+                }
+                stdoutCount += (countingOut == null) ? 0 : countingOut.getCount();
+                stderrCount += (countingErr == null) ? 0 : countingErr.getCount();
+                
+                if (longPollResult == 0) {
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll succeeded (exit status 0) on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    return longPollResult; // success
+                    
+                } else if (longPollResult == -1) {
+                    // probably a connection failure; try again
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    return null;
+
+                } else if (longPollResult == 125) {
+                    // 125 is the special code for timeout in long-poll (see buildLongPollCommand).
+                    // However, there is a tiny chance that the underlying command might have returned that exact exit code!
+                    // Don't treat a timeout as a "consecutiveSshFailure".
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; most likely timeout; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    return retrieveStatusCommand();
+
+                } else {
+                    // want to double-check whether this is the exit-code from the async process, or
+                    // some unexpected failure in our long-poll command.
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    Integer result = retrieveStatusCommand();
+                    if (result != null) {
+                        return result;
+                    }
+                }
+                    
+                consecutiveSshFailures++;
+                if (consecutiveSshFailures > maxConsecutiveSshFailures) {
+                    LOG.warn("Aborting on "+consecutiveSshFailures+" consecutive ssh connection errors (return -1) when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
+                    return -1;
+                } else {
+                    LOG.info("Retrying after ssh connection error when polling for async script to complete on "+SshjTool.this.toString()+" ("+getSummary()+")");
+                    return null;
+                }
+            }
+            
+            Integer retrieveStatusCommand() throws IOException {
+                // want to double-check whether this is the exit-code from the async process, or
+                // some unexpected failure in our long-poll command.
+                ByteArrayOutputStream statusOut = new ByteArrayOutputStream();
+                ByteArrayOutputStream statusErr = new ByteArrayOutputStream();
+                int statusResult = asInt(acquire(new ShellAction(buildRetrieveStatusCommand(), statusOut, statusErr, execTimeout)), -1);
+                
+                if (statusResult == 0) {
+                    // The status we retrieved really is valid; return it.
+                    // TODO How to ensure no additional output in stdout/stderr when parsing below?
+                    String statusOutStr = new String(statusOut.toByteArray()).trim();
+                    if (Strings.isEmpty(statusOutStr)) {
+                        // suggests not yet completed; will retry with long-poll
+                        if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; command successful but no result available on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                        return null;
+                    } else {
+                        if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieved status directly; returning '"+statusOutStr+"' on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                        int result = Integer.parseInt(statusOutStr);
+                        return result;
+                    }
+
+                } else if (statusResult == -1) {
+                    // probably a connection failure; try again with long-poll
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status directly received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    return null;
+                    
+                } else {
+                    if (out != null) {
+                        out.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stdout follow)"));
+                        out.write(statusOut.toByteArray());
+                    }
+                    if (err != null) {
+                        err.write(toUTF8ByteArray("retrieving status failed with exit code "+statusResult+" (stderr follow)"));
+                        err.write(statusErr.toByteArray());
+                    }
+                    
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll retrieving status failed; returning "+statusResult+" on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    return statusResult;
+                }
+            }
+        }.run();
+    }
+    
+    public int execShellDirect(Map<String,?> props, List<String> commands, Map<String,?> env) {
+        OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
+        OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
+        Duration execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
+        
+        List<String> cmdSequence = toCommandSequence(commands, env);
+        List<String> allcmds = ImmutableList.<String>builder()
+                .add(getOptionalVal(props, PROP_DIRECT_HEADER))
+                .addAll(cmdSequence)
+                .add("exit $?")
+                .build();
+        
+        if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {}: {}", host, allcmds);
+        
+        Integer result = acquire(new ShellAction(allcmds, out, err, execTimeout));
+        if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} completed: return status {}", host, result);
+        return asInt(result, -1);
+    }
+
+    @Override
+    public int execCommands(Map<String,?> props, List<String> commands, Map<String,?> env) {
+        if (Boolean.FALSE.equals(props.get("blocks"))) {
+            throw new IllegalArgumentException("Cannot exec non-blocking: command="+commands);
+        }
+        
+        // If async is set, then do it as execScript
+        Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
+        if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) {
+            return execScriptAsyncAndPoll(props, commands, env);
+        }
+
+        OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
+        OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
+        String separator = getOptionalVal(props, PROP_SEPARATOR);
+        Duration execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT);
+
+        List<String> allcmds = toCommandSequence(commands, env);
+        String singlecmd = Joiner.on(separator).join(allcmds);
+
+        if (Boolean.TRUE.equals(getOptionalVal(props, PROP_RUN_AS_ROOT))) {
+            LOG.warn("Cannot run as root when executing as command; run as a script instead (will run as normal user): "+singlecmd);
+        }
+        
+        if (LOG.isTraceEnabled()) LOG.trace("Running command at {}: {}", host, singlecmd);
+        
+        Command result = acquire(new ExecAction(singlecmd, out, err, execTimeout));
+        if (LOG.isTraceEnabled()) LOG.trace("Running command at {} completed: exit code {}", host, result.getExitStatus());
+        // can be null if no exit status is received (observed on kill `ps aux | grep thing-to-grep-for | awk {print $2}`
+        if (result.getExitStatus()==null) LOG.warn("Null exit status running at {}: {}", host, singlecmd);
+        
+        return asInt(result.getExitStatus(), -1);
+    }
+
+    protected void checkConnected() {
+        if (!isConnected()) {
+            throw new IllegalStateException(String.format("(%s) ssh not connected!", toString()));
+        }
+    }
+
+    protected void backoffForAttempt(int retryAttempt, String message) {
+        backoffLimitedRetryHandler.imposeBackoffExponentialDelay(retryAttempt, message);
+    }
+
+    protected <T, C extends SshAction<T>> T acquire(C action) {
+        return acquire(action, sshTries, sshTriesTimeout == 0 ? Duration.PRACTICALLY_FOREVER : Duration.millis(sshTriesTimeout));
+    }
+    
+    protected <T, C extends SshAction<T>> T acquire(C action, int sshTries, Duration sshTriesTimeout) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        
+        for (int i = 0; i < sshTries; i++) {
+            try {
+                action.clear();
+                if (LOG.isTraceEnabled()) LOG.trace(">> ({}) acquiring {}", toString(), action);
+                Stopwatch perfStopwatch = Stopwatch.createStarted();
+                
+                T returnVal;
+                try {
+                    returnVal = action.create();
+                } catch (AssertionError e) {
+                    /*
+                     * TODO In net.schmizz.sshj.SSHClient.auth(SSHClient.java:204) throws AssertionError
+                     * if not connected. This can happen if another thread has called disconnect
+                     * concurrently. This is changed in sshj v0.9.0 to instead throw an IllegalStateException.
+                     * 
+                     * For now, we'll retry. See "TODO" at top of class about synchronization.
+                     */
+                    throw new IllegalStateException("Problem in "+toString()+" for "+action, e);
+                }
+                
+                if (LOG.isTraceEnabled()) LOG.trace("<< ({}) acquired {}", toString(), returnVal);
+                if (LOG.isTraceEnabled()) LOG.trace("SSH Performance: {} {} took {}", new Object[] {
+                        sshClientConnection.getHostAndPort(), 
+                        action.getClass().getSimpleName() != null ? action.getClass().getSimpleName() : action, 
+                        Time.makeTimeStringRounded(perfStopwatch)});
+                return returnVal;
+            } catch (Exception e) {
+                // uninformative net.schmizz.sshj.connection.ConnectionException: 
+                //    Request failed (reason=UNKNOWN) may mean remote Subsytem is disabled (e.g. for FTP)
+                // if key is missing, get a UserAuth error
+                String errorMessage = String.format("(%s) error acquiring %s", toString(), action);
+                String fullMessage = String.format("%s (attempt %s/%s, in time %s/%s)", 
+                        errorMessage, (i+1), sshTries, Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), 
+                        (sshTriesTimeout.equals(Duration.PRACTICALLY_FOREVER) ? "unlimited" : Time.makeTimeStringRounded(sshTriesTimeout)));
+                try {
+                    disconnect();
+                } catch (Exception e2) {
+                    LOG.debug("<< ("+toString()+") error closing connection: "+e+" / "+e2, e);
+                }
+                if (i + 1 == sshTries) {
+                    LOG.debug("<< {} (rethrowing, out of retries): {}", fullMessage, e.getMessage());
+                    throw propagate(e, fullMessage + "; out of retries");
+                } else if (sshTriesTimeout.isShorterThan(stopwatch)) {
+                    LOG.debug("<< {} (rethrowing, out of time - max {}): {}", new Object[] { fullMessage, Time.makeTimeStringRounded(sshTriesTimeout), e.getMessage() });
+                    throw new RuntimeTimeoutException(fullMessage + "; out of time", e);
+                } else {
+                    if (LOG.isDebugEnabled()) LOG.debug("<< {}: {}", fullMessage, e.getMessage());
+                    backoffForAttempt(i + 1, errorMessage + ": " + e.getMessage());
+                    if (action != sshClientConnection)
+                        connect();
+                    continue;
+                }
+            }
+        }
+        assert false : "should not reach here";
+        return null;
+    }
+
+    private final SshAction<SFTPClient> sftpConnection = new SshAction<SFTPClient>() {
+
+        private SFTPClient sftp;
+
+        @Override
+        public void clear() {
+            closeWhispering(sftp, this);
+            sftp = null;
+        }
+
+        @Override
+        public SFTPClient create() throws IOException {
+            checkConnected();
+            sftp = sshClientConnection.ssh.newSFTPClient();
+            return sftp;
+        }
+
+        @Override
+        public String toString() {
+            return "SFTPClient()";
+        }
+    };
+
+    private class GetFileAction implements SshAction<InputStream> {
+        private final String path;
+        private SFTPClient sftp;
+
+        GetFileAction(String path) {
+            this.path = checkNotNull(path, "path");
+        }
+
+        @Override
+        public void clear() throws IOException {
+            closeWhispering(sftp, this);
+            sftp = null;
+        }
+
+        @Override
+        public InputStream create() throws Exception {
+            sftp = acquire(sftpConnection);
+            return new CloseFtpChannelOnCloseInputStream(
+                    sftp.getSFTPEngine().open(path).getInputStream(), sftp);
+        }
+
+        @Override
+        public String toString() {
+            return "Payload(path=[" + path + "])";
+        }
+    }
+
+    private class PutFileAction implements SshAction<Void> {
+        // TODO support backup as a property?
+        
+        private SFTPClient sftp;
+        private final String path;
+        private final int permissionsMask;
+        private final long lastModificationDate;
+        private final long lastAccessDate;
+        private final int uid;
+        private final Supplier<InputStream> contentsSupplier;
+        private final Integer length;
+        
+        PutFileAction(Map<String,?> props, String path, Supplier<InputStream> contentsSupplier, long length) {
+            String permissions = getOptionalVal(props, PROP_PERMISSIONS);
+            long lastModificationDateVal = getOptionalVal(props, PROP_LAST_MODIFICATION_DATE);
+            long lastAccessDateVal = getOptionalVal(props, PROP_LAST_ACCESS_DATE);
+            if (lastAccessDateVal <= 0 ^ lastModificationDateVal <= 0) {
+                lastAccessDateVal = Math.max(lastAccessDateVal, lastModificationDateVal);
+                lastModificationDateVal = Math.max(lastAccessDateVal, lastModificationDateVal);
+            }
+            this.permissionsMask = Integer.parseInt(permissions, 8);
+            this.lastAccessDate = lastAccessDateVal;
+            this.lastModificationDate = lastModificationDateVal;
+            this.uid = getOptionalVal(props, PROP_OWNER_UID);
+            this.path = checkNotNull(path, "path");
+            this.contentsSupplier = checkNotNull(contentsSupplier, "contents");
+            this.length = Ints.checkedCast(checkNotNull((long)length, "size"));
+        }
+
+        @Override
+        public void clear() {
+            closeWhispering(sftp, this);
+            sftp = null;
+        }
+
+        @Override
+        public Void create() throws Exception {
+            final AtomicReference<InputStream> inputStreamRef = new AtomicReference<InputStream>();
+            sftp = acquire(sftpConnection);
+            try {
+                sftp.put(new InMemorySourceFile() {
+                    @Override public String getName() {
+                        return path;
+                    }
+                    @Override public long getLength() {
+                        return length;
+                    }
+                    @Override public InputStream getInputStream() throws IOException {
+                        InputStream contents = contentsSupplier.get();
+                        inputStreamRef.set(contents);
+                        return contents;
+                    }
+                }, path);
+                sftp.chmod(path, permissionsMask);
+                if (uid != -1) {
+                    sftp.chown(path, uid);
+                }
+                if (lastAccessDate > 0) {
+                    sftp.setattr(path, new FileAttributes.Builder()
+                            .withAtimeMtime(lastAccessDate, lastModificationDate)
+                            .build());
+                }
+            } finally {
+                closeWhispering(inputStreamRef.get(), this);
+            }
+            return null;
+        }
+
+        @Override
+        public String toString() {
+            return "Put(path=[" + path + " "+length+"])";
+        }
+    }
+
+    // TODO simpler not to use predicates
+    @VisibleForTesting
+    Predicate<String> causalChainHasMessageContaining(final Exception from) {
+        return new Predicate<String>() {
+            @Override
+            public boolean apply(final String input) {
+                return any(getCausalChain(from), new Predicate<Throwable>() {
+                    @Override
+                    public boolean apply(Throwable throwable) {
+                        return (throwable.toString().contains(input))
+                                || (throwable.getMessage() != null && throwable.getMessage().contains(input));
+                    }
+                });
+            }
+        };
+    }
+    
+    protected SshAction<Session> newSessionAction() {
+
+        return new SshAction<Session>() {
+
+            private Session session = null;
+
+            @Override
+            public void clear() throws TransportException, ConnectionException {
+                closeWhispering(session, this);
+                session = null;
+            }
+
+            @Override
+            public Session create() throws Exception {
+                checkConnected();
+                session = sshClientConnection.ssh.startSession();
+                if (allocatePTY) {
+                    session.allocatePTY(TERM, 80, 24, 0, 0, Collections.<PTYMode, Integer> emptyMap());
+                }
+                return session;
+            }
+
+            @Override
+            public String toString() {
+                return "Session()";
+            }
+        };
+
+    }
+
+    class ExecAction implements SshAction<Command> {
+        private final String command;
+        private final OutputStream out;
+        private final OutputStream err;
+        private final Duration timeout;
+        
+        private Session session;
+        private Shell shell;
+        private StreamGobbler outgobbler;
+        private StreamGobbler errgobbler;
+        
+        ExecAction(String command, OutputStream out, OutputStream err, Duration timeout) {
+            this.command = checkNotNull(command, "command");
+            this.out = out;
+            this.err = err;
+            Duration sessionTimeout = (sshClientConnection.getSessionTimeout() == 0) 
+                    ? Duration.PRACTICALLY_FOREVER 
+                    : Duration.millis(sshClientConnection.getSessionTimeout());
+            this.timeout = (timeout == null) ? sessionTimeout : Duration.min(timeout, sessionTimeout);
+        }
+
+        @Override
+        public void clear() throws TransportException, ConnectionException {
+            closeWhispering(session, this);
+            closeWhispering(shell, this);
+            closeWhispering(outgobbler, this);
+            closeWhispering(errgobbler, this);
+            session = null;
+            shell = null;
+        }
+
+        @Override
+        public Command create() throws Exception {
+            try {
+                session = acquire(newSessionAction());
+                
+                Command output = session.exec(checkNotNull(command, "command"));
+                
+                if (out != null) {
+                    outgobbler = new StreamGobbler(output.getInputStream(), out, (Logger)null);
+                    outgobbler.start();
+                }
+                if (err != null) {
+                    errgobbler = new StreamGobbler(output.getErrorStream(), err, (Logger)null);
+                    errgobbler.start();
+                }
+                try {
+                    output.join((int)Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE), TimeUnit.MILLISECONDS);
+                    return output;
+                    
+                } finally {
+                    // wait for all stdout/stderr to have been re-directed
+                    try {
+                        // Don't use forever (i.e. 0) because BROOKLYN-106: ssh hangs
+                        long joinTimeout = 10*1000;
+                        if (outgobbler != null) outgobbler.join(joinTimeout);
+                        if (errgobbler != null) errgobbler.join(joinTimeout);
+                    } catch (InterruptedException e) {
+                        LOG.warn("Interrupted gobbling streams from ssh: "+command, e);
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                
+            } finally {
+                clear();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Exec(command=[" + command + "])";
+        }
+    }
+
+    class ShellAction implements SshAction<Integer> {
+        @VisibleForTesting
+        final List<String> commands;
+        @VisibleForTesting
+        final OutputStream out;
+        @VisibleForTesting
+        final OutputStream err;
+        
+        private Session session;
+        private Shell shell;
+        private StreamGobbler outgobbler;
+        private StreamGobbler errgobbler;
+        private Duration timeout;
+
+        ShellAction(List<String> commands, OutputStream out, OutputStream err, Duration timeout) {
+            this.commands = checkNotNull(commands, "commands");
+            this.out = out;
+            this.err = err;
+            Duration sessionTimeout = (sshClientConnection.getSessionTimeout() == 0) 
+                    ? Duration.PRACTICALLY_FOREVER 
+                    : Duration.millis(sshClientConnection.getSessionTimeout());
+            this.timeout = (timeout == null) ? sessionTimeout : Duration.min(timeout, sessionTimeout);
+        }
+
+        @Override
+        public void clear() throws TransportException, ConnectionException {
+            closeWhispering(session, this);
+            closeWhispering(shell, this);
+            closeWhispering(outgobbler, this);
+            closeWhispering(errgobbler, this);
+            session = null;
+            shell = null;
+        }
+
+        @Override
+        public Integer create() throws Exception {
+            try {
+                session = acquire(newSessionAction());
+                
+                shell = session.startShell();
+                
+                if (out != null) {
+                    InputStream outstream = shell.getInputStream();
+                    outgobbler = new StreamGobbler(outstream, out, (Logger)null);
+                    outgobbler.start();
+                }
+                if (err != null) {
+                    InputStream errstream = shell.getErrorStream();
+                    errgobbler = new StreamGobbler(errstream, err, (Logger)null);
+                    errgobbler.start();
+                }
+                
+                OutputStream output = shell.getOutputStream();
+
+                for (CharSequence cmd : commands) {
+                    try {
+                        output.write(toUTF8ByteArray(cmd+"\n"));
+                        output.flush();
+                    } catch (ConnectionException e) {
+                        if (!shell.isOpen()) {
+                            // shell is closed; presumably the user command did `exit`
+                            if (LOG.isDebugEnabled()) LOG.debug("Shell closed to {} when executing {}", SshjTool.this.toString(), commands);
+                            break;
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+                // workaround attempt for SSHJ deadlock - https://github.com/shikhar/sshj/issues/105
+                synchronized (shell.getOutputStream()) {
+                    shell.sendEOF();
+                }
+                closeWhispering(output, this);
+                
+                boolean timedOut = false;
+                try {
+                    long timeoutMillis = Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE);
+                    long timeoutEnd = System.currentTimeMillis() + timeoutMillis;
+                    Exception last = null;
+                    do {
+                        if (!shell.isOpen() && ((SessionChannel)session).getExitStatus()!=null)
+                            // shell closed, and exit status returned
+                            break;
+                        boolean endBecauseReturned =
+                            // if either condition is satisfied, then wait 1s in hopes the other does, then return
+                            (!shell.isOpen() || ((SessionChannel)session).getExitStatus()!=null);
+                        try {
+                            shell.join(1000, TimeUnit.MILLISECONDS);
+                        } catch (ConnectionException e) {
+                            last = e;
+                        }
+                        if (endBecauseReturned) {
+                            // shell is still open, ie some process is running
+                            // but we have a result code, so main shell is finished
+                            // we waited one second extra to allow any background process 
+                            // which is nohupped to really be in the background (#162)
+                            // now let's bail out
+                            break;
+                        }
+                    } while (System.currentTimeMillis() < timeoutEnd);
+                    if (shell.isOpen() && ((SessionChannel)session).getExitStatus()==null) {
+                        LOG.debug("Timeout ({}) in SSH shell to {}", timeout, this);
+                        // we timed out, or other problem -- reproduce the error.
+                        // The shell.join should always have thrown ConnectionExceptoin (looking at code of
+                        // AbstractChannel), but javadoc of Channel doesn't explicity say that so play it safe.
+                        timedOut = true;
+                        throw (last != null) ? last : new TimeoutException("Timeout after "+timeout+" executing "+this);
+                    }
+                    return ((SessionChannel)session).getExitStatus();
+                } finally {
+                    // wait for all stdout/stderr to have been re-directed
+                    closeWhispering(shell, this);
+                    shell = null;
+                    try {
+                        // Don't use forever (i.e. 0) because BROOKLYN-106: ssh hangs
+                        long joinTimeout = (timedOut) ? 1000 : 10*1000;
+                        if (outgobbler != null) {
+                            outgobbler.join(joinTimeout);
+                            outgobbler.close();
+                        }
+                        if (errgobbler != null) {
+                            errgobbler.join(joinTimeout);
+                            errgobbler.close();
+                        }
+                    } catch (InterruptedException e) {
+                        LOG.warn("Interrupted gobbling streams from ssh: "+commands, e);
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                
+            } finally {
+                clear();
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Shell(command=[" + commands + "])";
+        }
+    }
+
+    private byte[] toUTF8ByteArray(String string) {
+        return org.bouncycastle.util.Strings.toUTF8ByteArray(string);
+    }
+    
+    private Supplier<InputStream> newInputStreamSupplier(final byte[] contents) {
+        return new Supplier<InputStream>() {
+            @Override public InputStream get() {
+                return new ByteArrayInputStream(contents);
+            }
+        };
+    }
+
+    private Supplier<InputStream> newInputStreamSupplier(final File file) {
+        return new Supplier<InputStream>() {
+            @Override public InputStream get() {
+                try {
+                    return new FileInputStream(file);
+                } catch (FileNotFoundException e) {
+                    throw Exceptions.propagate(e);
+                }
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/javalang/ReflectionScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/javalang/ReflectionScanner.java b/core/src/main/java/org/apache/brooklyn/core/util/javalang/ReflectionScanner.java
new file mode 100644
index 0000000..e0b824a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/javalang/ReflectionScanner.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.javalang;
+
+import java.lang.annotation.Annotation;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.reflections.ReflectionUtils;
+import org.reflections.Reflections;
+import org.reflections.Store;
+import org.reflections.scanners.Scanner;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.text.Strings;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+/** Facade on {@link Reflections} which logs warnings for unloadable classes but does not fail */
+public class ReflectionScanner {
+
+    private static final Logger log = LoggerFactory.getLogger(ReflectionScanner.class);
+    
+    protected final ClassLoader[] classLoaders;
+    protected final Reflections reflections;
+
+    /** scanner which will look in the given urls 
+     * (or if those are null attempt to infer from the first entry in the classloaders,
+     * although currently that seems to only pick up directories, not JAR's),
+     * optionally filtering for the given prefix;
+     * any or all arguments can be null to accept all (and use default classpath for classloading).
+     **/
+    public ReflectionScanner(
+            final Iterable<URL> urlsToScan, 
+            final String optionalPrefix,
+            final ClassLoader ...classLoaders) {
+        reflections = new Reflections(new ConfigurationBuilder() {
+            {
+                final Predicate<String> filter = 
+                        Strings.isNonEmpty(optionalPrefix) ? new FilterBuilder.Include(FilterBuilder.prefix(optionalPrefix)) : null;
+
+                if (urlsToScan!=null)
+                    setUrls(ImmutableSet.copyOf(urlsToScan));
+                else if (classLoaders.length>0 && classLoaders[0]!=null)
+                    setUrls(
+                            ClasspathHelper.forPackage(Strings.isNonEmpty(optionalPrefix) ? optionalPrefix : "",
+                                    asClassLoaderVarArgs(classLoaders[0])));
+                
+                if (filter!=null) filterInputsBy(filter);
+
+                Scanner typeScanner = new TypeAnnotationsScanner();
+                if (filter!=null) typeScanner = typeScanner.filterResultsBy(filter);
+                Scanner subTypeScanner = new SubTypesScanner();
+                if (filter!=null) subTypeScanner = subTypeScanner.filterResultsBy(filter);
+                setScanners(typeScanner, subTypeScanner);
+                
+                for (ClassLoader cl: classLoaders)
+                    if (cl!=null) addClassLoader(cl);
+            }
+        });
+        this.classLoaders = Iterables.toArray(Iterables.filter(Arrays.asList(classLoaders), Predicates.notNull()), ClassLoader.class);
+    }
+
+    private static ClassLoader[] asClassLoaderVarArgs(final ClassLoader classLoaderToSearch) {
+        return classLoaderToSearch==null ? new ClassLoader[0] : new ClassLoader[] { classLoaderToSearch };
+    }
+
+    public Store getStore() {
+        return reflections.getStore();
+    }
+    
+    /** overrides delegate so as to log rather than throw exception if a class cannot be loaded */
+    public <T> Set<Class<? extends T>> getSubTypesOf(final Class<T> type) {
+        Set<String> subTypes = getStore().getSubTypesOf(type.getName());
+        return ImmutableSet.copyOf(this.<T>forNames(subTypes, "sub-type of "+type));
+    }
+    
+    /** overrides delegate so as to log rather than throw exception if a class cannot be loaded */
+    public Set<Class<?>> getTypesAnnotatedWith(Class<? extends Annotation> annotation) {
+        Set<String> annotatedWith = getStore().getTypesAnnotatedWith(annotation.getName());
+        return ImmutableSet.copyOf(this.forNames(annotatedWith, "annotated "+annotation.getName()));
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> List<Class<? extends T>> forNames(Set<String> classNames, final String context) {
+        List<Class<? extends T>> result = new ArrayList<Class<? extends T>>();
+        for (String className : classNames) {
+            //noinspection unchecked
+            try {
+                Class<? extends T> clazz = (Class<? extends T>) loadClass(className);
+                if (clazz != null) {
+                    result.add(clazz);
+                } else {
+                    log.warn("Unable to instantiate '"+className+"' ("+context+")");
+                }
+            } catch (Throwable e) {
+                log.warn("Unable to instantiate '"+className+"' ("+context+"): "+e);
+            }
+        }
+        return result;
+    }
+    
+    protected Class<?> loadClass(String className) {
+        return ReflectionUtils.forName(className, classLoaders);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/javalang/UrlClassLoader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/javalang/UrlClassLoader.java b/core/src/main/java/org/apache/brooklyn/core/util/javalang/UrlClassLoader.java
new file mode 100644
index 0000000..afd0c8e
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/javalang/UrlClassLoader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.javalang;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+
+import org.apache.brooklyn.core.util.ResourceUtils;
+
+import brooklyn.util.exceptions.Exceptions;
+
+/** like URLClassLoader (and delegates to it) but:
+ * * has a nice toString
+ * * supports var args constructor
+ * * supports file://~/xxx semantics (remaps it to user.home); 
+ *      ideally we'd also support mvn, classpath, osgi, etc
+ */
+public class UrlClassLoader extends URLClassLoader {
+
+    private URL[] urls;
+
+    public UrlClassLoader(URL ...urls) {
+        super(tidy(urls));
+        this.urls = urls;
+    }
+
+    public UrlClassLoader(String ...urls) {
+        this(asUrls(urls));
+    }
+    
+    private static URL[] asUrls(String[] urls) {
+        URL[] urlo = new URL[urls.length];
+        try {
+        for (int i=0; i<urls.length; i++)
+            urlo[i] = new URL(urls[i]);
+        } catch (MalformedURLException e) {
+            throw Exceptions.propagate(e);
+        }
+        return urlo;
+    }
+
+    private static URL[] tidy(URL[] urls) {
+        for (int i=0; i<urls.length; i++)
+            urls[i] = ResourceUtils.tidy(urls[i]);
+        return urls;
+    }
+
+    @Override
+    public String toString() {
+        return "UrlClassLoader"+Arrays.asList(urls);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/mutex/MutexSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/mutex/MutexSupport.java b/core/src/main/java/org/apache/brooklyn/core/util/mutex/MutexSupport.java
new file mode 100644
index 0000000..2a90f95
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/mutex/MutexSupport.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.mutex;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.brooklyn.core.util.task.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+public class MutexSupport implements WithMutexes {
+
+    private static final Logger log = LoggerFactory.getLogger(MutexSupport.class);
+    private final Map<String,SemaphoreWithOwners> semaphores = new LinkedHashMap<String,SemaphoreWithOwners>();
+
+    protected synchronized SemaphoreWithOwners getSemaphore(String mutexId) {
+        return getSemaphore(mutexId, false);
+    }
+    // NB: the map could be "lock-striped" (hashed on mutexId) to avoid the central lock 
+    protected synchronized SemaphoreWithOwners getSemaphore(String mutexId, boolean requestBeforeReturning) {
+        SemaphoreWithOwners s = semaphores.get(mutexId);
+        if (s==null) {
+            s = new SemaphoreWithOwners(mutexId);
+            semaphores.put(mutexId, s);
+        }
+        if (requestBeforeReturning) s.indicateCallingThreadWillRequest();
+        return s;
+    }
+    /** forces deletion of the given mutex if it is unused; 
+     * normally not required as is done automatically on close
+     * (but possibly needed where there are cancelations and risk of memory leaks) */
+    public synchronized void cleanupMutex(String mutexId) {
+        SemaphoreWithOwners s = semaphores.get(mutexId);
+        if (!s.isInUse()) semaphores.remove(mutexId);
+    }
+    public synchronized void cleanup() {
+        Iterator<SemaphoreWithOwners> si = semaphores.values().iterator();
+        while (si.hasNext()) {
+            SemaphoreWithOwners s = si.next();
+            if (!s.isInUse()) si.remove();
+        }
+    }
+
+    @Override
+    public synchronized boolean hasMutex(String mutexId) {
+        SemaphoreWithOwners s = semaphores.get(mutexId);
+        if (s!=null) return s.isCallingThreadAnOwner();
+        return false;
+    }
+    
+    @Override
+    public void acquireMutex(String mutexId, String description) throws InterruptedException {
+        SemaphoreWithOwners s = getSemaphore(mutexId, true);
+        if (description!=null) Tasks.setBlockingDetails(description+" - waiting for "+mutexId);
+        if (log.isDebugEnabled())
+            log.debug("Acquiring mutex: "+mutexId+"@"+this+" - "+description);
+        s.acquire(); 
+        if (description!=null) Tasks.setBlockingDetails(null);
+        s.setDescription(description);
+        if (log.isDebugEnabled())
+            log.debug("Acquired mutex: "+mutexId+"@"+this+" - "+description);
+    }
+
+    @Override
+    public boolean tryAcquireMutex(String mutexId, String description) {
+        SemaphoreWithOwners s = getSemaphore(mutexId, true);
+        if (s.tryAcquire()) {
+            if (log.isDebugEnabled())
+                log.debug("Acquired mutex (opportunistic): "+mutexId+"@"+this+" - "+description);
+            s.setDescription(description);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public synchronized void releaseMutex(String mutexId) {
+        SemaphoreWithOwners s;
+        if (log.isDebugEnabled())
+            log.debug("Releasing mutex: "+mutexId+"@"+this);
+        synchronized (this) { s = semaphores.get(mutexId); }
+        if (s==null) throw new IllegalStateException("No mutex known for '"+mutexId+"'");
+        s.release();
+        cleanupMutex(mutexId);
+    }
+    
+    @Override
+    public synchronized String toString() {
+        return super.toString()+"["+semaphores.size()+" semaphores: "+semaphores.values()+"]";
+    }
+    
+    /** Returns the semaphores in use at the time the method is called, for inspection purposes (and testing).
+     * The semaphores used by this class may change over time so callers are strongly discouraged
+     * from manipulating the semaphore objects themselves. 
+     */
+    public synchronized Map<String,SemaphoreWithOwners> getAllSemaphores() {
+        return ImmutableMap.<String,SemaphoreWithOwners>copyOf(semaphores);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreForTasks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreForTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreForTasks.java
new file mode 100644
index 0000000..7e805b0
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreForTasks.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.mutex;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.core.util.task.Tasks;
+
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+
+/** A subclass of {@link SemaphoreWithOwners} 
+ * which additionally sets Task blocking information. 
+ * <p>
+ * TODO As tasks are distributed this should support distribution across the management context. */
+public class SemaphoreForTasks extends SemaphoreWithOwners {
+    
+    private static final long serialVersionUID = 7898283056223005952L;
+    
+    /** unused at present, but wanted on the API for when this may be federated */
+    @SuppressWarnings("unused")
+    private final ManagementContext mgmt;
+    
+    final private MutableList<Task<?>> owningTasks = new MutableList<Task<?>>();
+    final private MutableSet<Task<?>> requestingTasks = new MutableSet<Task<?>>();
+
+    public SemaphoreForTasks(String name, ManagementContext mgmt) {
+        super(name);
+        this.mgmt = Preconditions.checkNotNull(mgmt);
+    }
+    
+    public SemaphoreForTasks(String name, int permits, boolean fair, ManagementContext mgmt) {
+        super(name, permits, fair);
+        this.mgmt = Preconditions.checkNotNull(mgmt);
+    }
+    
+    public synchronized Set<Task<?>> getRequestingTasks() {
+        return ImmutableSet.copyOf(requestingTasks);
+    }
+    
+    public synchronized List<Task<?>> getOwningTasks() {
+        return ImmutableList.copyOf(owningTasks);
+    }
+
+    @Override
+    protected synchronized void onRequesting() {
+        if (!owningTasks.isEmpty() || !requestingTasks.isEmpty()) {
+            Tasks.setBlockingTask( !requestingTasks.isEmpty() ? Iterables.getLast(requestingTasks) : Iterables.getFirst(owningTasks, null) );
+            Tasks.setBlockingDetails("Waiting on semaphore "+getName()+" ("+getDescription()+"); "
+                + "queued at "+Time.makeDateString()+" when "+getRequestingThreads().size()+" ahead in queue");
+        }
+        requestingTasks.addIfNotNull(Tasks.current());
+        super.onRequesting();
+    }
+    
+    @Override
+    protected synchronized void onRequestFinished() {
+        super.onRequestFinished();
+        requestingTasks.removeIfNotNull(Tasks.current());
+        
+        Tasks.resetBlockingDetails();
+        Tasks.resetBlockingTask();
+    }
+    
+    @Override
+    protected synchronized void onAcquired(int permits) {
+        super.onAcquired(permits);
+        for (int i=0; i<permits; i++)
+            owningTasks.appendIfNotNull(Tasks.current());
+    }
+    
+    @Override
+    protected synchronized void onReleased(int permits) {
+        super.onReleased(permits);
+        for (int i=0; i<permits; i++)
+            owningTasks.removeIfNotNull(Tasks.current());
+    }
+    
+    @Override
+    public synchronized String toString() {
+        return super.toString()+"["
+            + "owningTasks="+owningTasks
+            + "; requestingTasks="+requestingTasks+"]";
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreWithOwners.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreWithOwners.java b/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreWithOwners.java
new file mode 100644
index 0000000..0144d4d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/mutex/SemaphoreWithOwners.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.mutex;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.collect.ImmutableList;
+
+/** a subclass of {@link Semaphore} 
+ * which tracks who created and released the semaphores,
+ * and which requires the same thread to release as created it. */
+public class SemaphoreWithOwners extends Semaphore {
+    public SemaphoreWithOwners(String name) {
+        this(name, 1, true);
+    }
+    public SemaphoreWithOwners(String name, int permits, boolean fair) {
+        super(permits, fair);
+        this.name = name;
+    }
+    private static final long serialVersionUID = -5303474637353009454L;
+    final private List<Thread> owningThreads = new ArrayList<Thread>();
+    final private Set<Thread> requestingThreads = new LinkedHashSet<Thread>();
+    
+    @Override
+    public void acquire() throws InterruptedException {
+        try {
+            onRequesting();
+            super.acquire();
+            onAcquired(1);
+        } finally {
+            onRequestFinished();
+        }
+    }
+    @Override
+    public void acquire(int permits) throws InterruptedException {
+        try {
+            onRequesting();
+            super.acquire(permits);
+            onAcquired(permits);
+        } finally {
+            onRequestFinished();
+        }
+    }
+    @Override
+    public void acquireUninterruptibly() {
+        try {
+            onRequesting();
+            super.acquireUninterruptibly();
+            onAcquired(1);
+        } finally {
+            onRequestFinished();
+        }
+    }
+    @Override
+    public void acquireUninterruptibly(int permits) {
+        try {
+            onRequesting();
+            super.acquireUninterruptibly(permits);
+            onAcquired(permits);
+        } finally {
+            onRequestFinished();
+        }
+    }
+
+    public void acquireUnchecked() {
+        try {
+            acquire();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    public void acquireUnchecked(int numPermits) {
+        try {
+            acquire(numPermits);
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    
+    @Override
+    public boolean tryAcquire() {
+        try {
+            onRequesting();
+            if (super.tryAcquire()) {
+                onAcquired(1);
+                return true;
+            }
+            return false;
+        } finally {
+            onRequestFinished();
+        }
+    }
+    @Override
+    public boolean tryAcquire(int permits) {
+        try {
+            onRequesting();
+            if (super.tryAcquire(permits)) {
+                onAcquired(permits);
+                return true;
+            }
+            return false;
+        } finally {
+            onRequestFinished();
+        }
+    }
+    @Override
+    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
+        try {
+            onRequesting();
+            if (super.tryAcquire(permits, timeout, unit)) {
+                onAcquired(permits);
+                return true;
+            }
+            return false;
+        } finally {
+            onRequestFinished();
+        }
+    }
+    @Override
+    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
+        try {
+            onRequesting();
+            if (super.tryAcquire(timeout, unit)) {
+                onAcquired(1);
+                return true;
+            }
+            return false;
+        } finally {
+            onRequestFinished();
+        }
+    }
+
+    /** invoked when a caller successfully acquires a mutex, before {@link #onRequestFinished()} */
+    protected synchronized void onAcquired(int permits) {
+        for (int i=0; i<permits; i++) owningThreads.add(Thread.currentThread());
+    }
+    /** invoked when a caller is about to request a semaphore (before it might block);
+     * guaranteed to call {@link #onRequestFinished()} after the blocking,
+     * with a call to {@link #onAcquired(int)} beforehand if the acquisition was successful */
+    protected synchronized void onRequesting() {
+        requestingThreads.add(Thread.currentThread());
+    }
+    /** invoked when a caller has completed requesting a mutex, whether successful, aborted, or interrupted */
+    protected synchronized void onRequestFinished() {
+        requestingThreads.remove(Thread.currentThread());
+    }
+
+    @Override
+    public void release() {
+        super.release();
+        onReleased(1);
+    }
+    @Override
+    public void release(int permits) {
+        super.release(permits);
+        onReleased(permits);
+    }
+
+    /** invoked when a caller has released permits */
+    protected synchronized void onReleased(int permits) {
+        boolean result = true;
+        for (int i=0; i<permits; i++) result = owningThreads.remove(Thread.currentThread()) & result;
+        if (!result) throw new IllegalStateException("Thread "+Thread.currentThread()+" which released "+this+" did not own it.");  
+    }
+
+    /** true iff there are any owners or any requesters (callers blocked trying to acquire) */
+    public synchronized boolean isInUse() {
+        return !owningThreads.isEmpty() || !requestingThreads.isEmpty();
+    }
+
+    /** true iff the calling thread is one of the owners */ 
+    public synchronized boolean isCallingThreadAnOwner() {
+        return owningThreads.contains(Thread.currentThread());
+    }
+
+    private final String name;
+    /** constructor-time supplied name */
+    public String getName() { return name; }
+
+    private String description;
+    public void setDescription(String description) { this.description = description; }
+    /** caller supplied description */
+    public String getDescription() { return description; }
+
+    /** unmodifiable view of threads owning the permits; threads with multiple permits listed multiply */
+    public synchronized List<Thread> getOwningThreads() {
+        return ImmutableList.<Thread>copyOf(owningThreads);
+    }
+    /** unmodifiable view of threads requesting access (blocked or briefly trying to acquire);
+     * this is guaranteed to be cleared _after_ getOwners 
+     * (synchronizing on this class while reading both fields will give canonical access) */
+    public synchronized List<Thread> getRequestingThreads() {
+        return ImmutableList.<Thread>copyOf(requestingThreads);
+    }
+    
+    @Override
+    public synchronized String toString() {
+        return super.toString()+"["+name+"; description="+description+"; owning="+owningThreads+"; requesting="+requestingThreads+"]";
+    }
+    
+    /** Indicate that the calling thread is going to acquire or tryAcquire, 
+     * in order to set up the semaphore's isInUse() value appropriately for certain checks.
+     * It *must* do so after invoking this method. */ 
+    public void indicateCallingThreadWillRequest() {
+        requestingThreads.add(Thread.currentThread());
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/mutex/WithMutexes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/util/mutex/WithMutexes.java b/core/src/main/java/org/apache/brooklyn/core/util/mutex/WithMutexes.java
new file mode 100644
index 0000000..82eab02
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/util/mutex/WithMutexes.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.util.mutex;
+
+/** interface which allows multiple callers to co-operate using named mutexes, inspectably,
+ * and containing implementation as inner class
+ * <p>
+ * MutexSupport is a common implementation of this.
+ * mixin code frequently delegates to this, 
+ * as shown in the test case's WithMutexesTest.SampleWithMutexesDelegatingMixin class 
+ **/
+public interface WithMutexes {
+
+    /** returns true if the calling thread has the mutex with the given ID */
+    public boolean hasMutex(String mutexId);
+    
+    /** acquires a mutex, if available, otherwise blocks on its becoming available;
+     * caller must release after use */
+    public void acquireMutex(String mutexId, String description) throws InterruptedException;
+
+    /** acquires a mutex and returns true, if available; otherwise immediately returns false;
+     * caller must release after use if this returns true */
+    public boolean tryAcquireMutex(String mutexId, String description);
+
+    /** releases a mutex, triggering another thread to use it or cleaning it up if no one else is waiting;
+     * this should only be called by the mutex owner (thread) */
+    public void releaseMutex(String mutexId);
+    
+}