You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/12/23 12:06:34 UTC

[11/71] [abbrv] incubator-brooklyn git commit: Merge commit 'e430723' into reorg2

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 0000000,02277a1..d90b1a1
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@@ -1,0 -1,782 +1,783 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.brooklyn.util.core.task;
+ 
+ import static com.google.common.base.Preconditions.checkNotNull;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.CancellationException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledThreadPoolExecutor;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import org.apache.brooklyn.api.mgmt.ExecutionManager;
+ import org.apache.brooklyn.api.mgmt.HasTaskChildren;
+ import org.apache.brooklyn.api.mgmt.Task;
+ import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+ import org.apache.brooklyn.core.BrooklynFeatureEnablement;
++import org.apache.brooklyn.core.config.Sanitizer;
+ import org.apache.brooklyn.util.collections.MutableList;
+ import org.apache.brooklyn.util.exceptions.Exceptions;
+ import org.apache.brooklyn.util.text.Identifiers;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.annotations.Beta;
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.CaseFormat;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.ExecutionList;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.ThreadFactoryBuilder;
+ 
+ /**
+  * Manages the execution of atomic tasks and scheduled (recurring) tasks,
+  * including setting tags and invoking callbacks.
+  */
+ public class BasicExecutionManager implements ExecutionManager {
+     private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class);
+ 
+     private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS);
+     
+     private static class PerThreadCurrentTaskHolder {
+         public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>();
+     }
+ 
+     public static ThreadLocal<Task<?>> getPerThreadCurrentTask() {
+         return PerThreadCurrentTaskHolder.perThreadCurrentTask;
+     }
+ 
+     private final ThreadFactory threadFactory;
+     
+     private final ThreadFactory daemonThreadFactory;
+     
+     private final ExecutorService runner;
+         
+     private final ScheduledExecutorService delayedRunner;
+     
+     // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag,
+     // so the same task could be listed multiple times if it has multiple tags...
+ 
+     //access to this field AND to members in this field is synchronized, 
+     //to allow us to preserve order while guaranteeing thread-safe
+     //(but more testing is needed before we are completely sure it is thread-safe!)
+     //synch blocks are as finely grained as possible for efficiency;
+     //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty
+     private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>();
+     
+     private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>();
+ 
+     private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>();
+ 
+     /** count of all tasks submitted, including finished */
+     private final AtomicLong totalTaskCount = new AtomicLong();
+     
+     /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */
+     private Map<String,String> incompleteTaskIds = new ConcurrentHashMap<String,String>();
+     
+     /** tasks started but not yet finished */
+     private final AtomicInteger activeTaskCount = new AtomicInteger();
+     
+     private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>();
+     
+     private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() {
+         protected String initialValue() {
+             // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked 
+             log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current());
+             return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8);
+         }
+     };
+     
+     public BasicExecutionManager(String contextid) {
+         threadFactory = newThreadFactory(contextid);
+         daemonThreadFactory = new ThreadFactoryBuilder()
+                 .setThreadFactory(threadFactory)
+                 .setDaemon(true)
+                 .build();
+                 
+         // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown!
+         runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 
+                 daemonThreadFactory);
+             
+         delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory);
+     }
+     
+     private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler {
+         @Override
+         public void uncaughtException(Thread t, Throwable e) {
+             log.error("Uncaught exception in thread "+t.getName(), e);
+         }
+     }
+     
+     /** 
+      * For use by overriders to use custom thread factory.
+      * But be extremely careful: called by constructor, so before sub-class' constructor will
+      * have been invoked!
+      */
+     protected ThreadFactory newThreadFactory(String contextid) {
+         return new ThreadFactoryBuilder()
+                 .setNameFormat("brooklyn-execmanager-"+contextid+"-%d")
+                 .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation())
+                 .build();
+     }
+     
+     public void shutdownNow() {
+         runner.shutdownNow();
+         delayedRunner.shutdownNow();
+     }
+     
+     public void addListener(ExecutionListener listener) {
+         listeners.add(listener);
+     }
+     
+     public void removeListener(ExecutionListener listener) {
+         listeners.remove(listener);
+     }
+     
+     /**
+      * Deletes the given tag, including all tasks using this tag.
+      * 
+      * Useful, for example, if an entity is being expunged so that we don't keep holding
+      * a reference to it as a tag.
+      */
+     public void deleteTag(Object tag) {
+         Set<Task<?>> tasks;
+         synchronized (tasksByTag) {
+             tasks = tasksByTag.remove(tag);
+         }
+         if (tasks != null) {
+             for (Task<?> task : tasks) {
+                 deleteTask(task);
+             }
+         }
+     }
+ 
+     public void deleteTask(Task<?> task) {
+         boolean removed = deleteTaskNonRecursive(task);
+         if (!removed) return;
+         
+         if (task instanceof HasTaskChildren) {
+             List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren());
+             for (Task<?> child : children) {
+                 deleteTask(child);
+             }
+         }
+     }
+ 
+     protected boolean deleteTaskNonRecursive(Task<?> task) {
+         Set<?> tags = checkNotNull(task, "task").getTags();
+         for (Object tag : tags) {
+             synchronized (tasksByTag) {
+                 Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
+                 if (tasks != null) {
+                     tasks.remove(task);
+                     if (tasks.isEmpty()) {
+                         tasksByTag.remove(tag);
+                     }
+                 }
+             }
+         }
+         Task<?> removed = tasksById.remove(task.getId());
+         incompleteTaskIds.remove(task.getId());
+         if (removed!=null && removed.isSubmitted() && !removed.isDone()) {
+             log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?");
+         }
+         return removed != null;
+     }
+ 
+     public boolean isShutdown() {
+         return runner.isShutdown();
+     }
+     
+     /** count of all tasks submitted */
+     public long getTotalTasksSubmitted() {
+         return totalTaskCount.get();
+     }
+     
+     /** count of tasks submitted but not ended */
+     public long getNumIncompleteTasks() {
+         return incompleteTaskIds.size();
+     }
+     
+     /** count of tasks started but not ended */
+     public long getNumActiveTasks() {
+         return activeTaskCount.get();
+     }
+ 
+     /** count of tasks kept in memory, often including ended tasks */
+     public long getNumInMemoryTasks() {
+         return tasksById.size();
+     }
+ 
+     private Set<Task<?>> tasksWithTagCreating(Object tag) {
+         Preconditions.checkNotNull(tag);
+         synchronized (tasksByTag) {
+             Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+             if (result==null) {
+                 result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>());
+                 tasksByTag.put(tag, result);
+             }
+             return result;
+         }
+     }
+ 
+     /** exposes live view, for internal use only */
+     @Beta
+     public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
+         synchronized (tasksByTag) {
+             return tasksByTag.get(tag);
+         }
+     }
+ 
+     @Override
+     public Task<?> getTask(String id) {
+         return tasksById.get(id);
+     }
+     
+     /** not on interface because potentially expensive */
+     public List<Task<?>> getAllTasks() {
+         // not sure if synching makes any difference; have not observed CME's yet
+         // (and so far this is only called when a CME was caught on a previous operation)
+         synchronized (tasksById) {
+             return MutableList.copyOf(tasksById.values());
+         }
+     }
+     
+     @Override
+     public Set<Task<?>> getTasksWithTag(Object tag) {
+         Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+         if (result==null) return Collections.emptySet();
+         synchronized (result) {
+             return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result));
+         }
+     }
+     
+     @Override
+     public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) {
+         Set<Task<?>> result = new LinkedHashSet<Task<?>>();
+         Iterator<?> ti = tags.iterator();
+         while (ti.hasNext()) {
+             Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next());
+             if (tasksForTag!=null) {
+                 synchronized (tasksForTag) {
+                     result.addAll(tasksForTag);
+                 }
+             }
+         }
+         return Collections.unmodifiableSet(result);
+     }
+ 
+     /** only works with at least one tag; returns empty if no tags */
+     @Override
+     public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) {
+         //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!)
+         //by first looking for the least-used tag, getting those tasks, and then for each of those tasks
+         //checking whether it contains the other tags (looking for second-least used, then third-least used, etc)
+         Set<Task<?>> result = new LinkedHashSet<Task<?>>();
+         boolean first = true;
+         Iterator<?> ti = tags.iterator();
+         while (ti.hasNext()) {
+             Object tag = ti.next();
+             if (first) { 
+                 first = false;
+                 result.addAll(getTasksWithTag(tag));
+             } else {
+                 result.retainAll(getTasksWithTag(tag));
+             }
+         }
+         return Collections.unmodifiableSet(result);
+     }
+ 
+     /** live view of all tasks, for internal use only */
+     @Beta
+     public Collection<Task<?>> allTasksLive() { return tasksById.values(); }
+     
+     public Set<Object> getTaskTags() { 
+         synchronized (tasksByTag) {
+             return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet())); 
+         }
+     }
+ 
+     public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
+     public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); }
+ 
+     public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
+     public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); }
+ 
+     public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); }
+     public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) {
+         if (!(task instanceof Task))
+             task = task.asTask();
+         synchronized (task) {
+             if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task;
+             return submitNewTask(flags, (Task<T>) task);
+         }
+     }
+ 
+     public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); }
+     public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) {
+         synchronized (task) {
+             if (((TaskInternal<?>)task).getInternalFuture()!=null) return task;
+             return submitNewTask(flags, task);
+         }
+     }
+ 
+     protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
+         tasksById.put(task.getId(), task);
+         totalTaskCount.incrementAndGet();
+         
+         beforeSubmitScheduledTaskAllIterations(flags, task);
+         
+         return submitSubsequentScheduledTask(flags, task);
+     }
+     
+     @SuppressWarnings("unchecked")
+     protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
+         if (!task.isDone()) {
+             task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
+                 task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
+         } else {
+             afterEndScheduledTaskAllIterations(flags, task);
+         }
+         return task;
+     }
+ 
+     protected class ScheduledTaskCallable implements Callable<Object> {
+         public ScheduledTask task;
+         public Map<?,?> flags;
+ 
+         public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) {
+             this.task = task;
+             this.flags = flags;
+         }
+ 
+         @SuppressWarnings({ "rawtypes", "unchecked" })
+         public Object call() {
+             if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis();
+             TaskInternal<?> taskScheduled = null;
+             try {
+                 beforeStartScheduledTaskSubmissionIteration(flags, task);
+                 taskScheduled = (TaskInternal<?>) task.newTask();
+                 taskScheduled.setSubmittedByTask(task);
+                 final Callable<?> oldJob = taskScheduled.getJob();
+                 final TaskInternal<?> taskScheduledF = taskScheduled;
+                 taskScheduled.setJob(new Callable() { public Object call() {
+                     boolean shouldResubmit = true;
+                     task.recentRun = taskScheduledF;
+                     try {
+                         synchronized (task) {
+                             task.notifyAll();
+                         }
+                         Object result;
+                         try {
+                             result = oldJob.call();
+                             task.lastThrownType = null;
+                         } catch (Exception e) {
+                             shouldResubmit = shouldResubmitOnException(oldJob, e);
+                             throw Exceptions.propagate(e);
+                         }
+                         return result;
+                     } finally {
+                         // do in finally block in case we were interrupted
+                         if (shouldResubmit) {
+                             resubmit();
+                         } else {
+                             afterEndScheduledTaskAllIterations(flags, task);
+                         }
+                     }
+                 }});
+                 task.nextRun = taskScheduled;
+                 BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
+                 if (ec!=null) return ec.submit(taskScheduled);
+                 else return submit(taskScheduled);
+             } finally {
+                 afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled);
+             }
+         }
+ 
+         private void resubmit() {
+             task.runCount++;
+             if (task.period!=null && !task.isCancelled()) {
+                 task.delay = task.period;
+                 submitSubsequentScheduledTask(flags, task);
+             }
+         }
+ 
+         private boolean shouldResubmitOnException(Callable<?> oldJob, Exception e) {
+             String message = "Error executing " + oldJob + " (scheduled job of " + task + " - " + task.getDescription() + ")";
+             if (Tasks.isInterrupted()) {
+                 log.debug(message + "; cancelling scheduled execution: " + e);
+                 return false;
+             } else if (task.cancelOnException) {
+                 log.warn(message + "; cancelling scheduled execution.", e);
+                 return false;
+             } else {
+                 message += "; resubmitting task and throwing: " + e;
+                 if (!e.getClass().equals(task.lastThrownType)) {
+                     task.lastThrownType = e.getClass();
+                     message += " (logging subsequent exceptions at trace)";
+                     log.debug(message);
+                 } else {
+                     message += " (repeat exception)";
+                     log.trace(message);
+                 }
+                 return true;
+             }
+         }
+ 
+         @Override
+         public String toString() {
+             return "ScheduledTaskCallable["+task+","+flags+"]";
+         }
+     }
+ 
+     private final class SubmissionCallable<T> implements Callable<T> {
+         private final Map<?, ?> flags;
+         private final Task<T> task;
+ 
+         private SubmissionCallable(Map<?, ?> flags, Task<T> task) {
+             this.flags = flags;
+             this.task = task;
+         }
+ 
+         public T call() {
+             try {
+                 T result = null;
+                 Throwable error = null;
+                 String oldThreadName = Thread.currentThread().getName();
+                 try {
+                     if (RENAME_THREADS) {
+                         String newThreadName = oldThreadName+"-"+task.getDisplayName()+
+                             "["+task.getId().substring(0, 8)+"]";
+                         Thread.currentThread().setName(newThreadName);
+                     }
+                     beforeStartAtomicTask(flags, task);
+                     if (!task.isCancelled()) {
+                         result = ((TaskInternal<T>)task).getJob().call();
+                     } else throw new CancellationException();
+                 } catch(Throwable e) {
+                     error = e;
+                 } finally {
+                     if (RENAME_THREADS) {
+                         Thread.currentThread().setName(oldThreadName);
+                     }
+                     afterEndAtomicTask(flags, task);
+                 }
+                 if (error!=null) {
+                     /* we throw, after logging debug.
+                      * the throw means the error is available for task submitters to monitor.
+                      * however it is possible no one is monitoring it, in which case we will have debug logging only for errors.
+                      * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!) 
+                      */
+                     if (log.isDebugEnabled()) {
+                         // debug only here, because most submitters will handle failures
+                         log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+                         if (log.isTraceEnabled())
+                             log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+                     }
+                     throw Exceptions.propagate(error);
+                 }
+                 return result;
+             } finally {
+                 ((TaskInternal<?>)task).runListeners();
+             }
+         }
+ 
+         @Override
+         public String toString() {
+             return "BEM.call("+task+","+flags+")";
+         }
+     }
+ 
+     private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
+         private final Task<T> task;
+ 
+         private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
+             super(delegate, list);
+             this.task = task;
+         }
+ 
+         @Override
+         public boolean cancel(boolean mayInterruptIfRunning) {
+             boolean result = false;
+             if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
+             result |= super.cancel(mayInterruptIfRunning);
+             ((TaskInternal<?>)task).runListeners();
+             return result;
+         }
+     }
+ 
+     private final class SubmissionListenerToCallOtherListeners<T> implements Runnable {
+         private final Task<T> task;
+ 
+         private SubmissionListenerToCallOtherListeners(Task<T> task) {
+             this.task = task;
+         }
+ 
+         @Override
+         public void run() {
+             try {
+                 ((TaskInternal<?>)task).runListeners();
+             } catch (Exception e) {
+                 log.warn("Error running task listeners for task "+task+" done", e);
+             }
+             
+             for (ExecutionListener listener : listeners) {
+                 try {
+                     listener.onTaskDone(task);
+                 } catch (Exception e) {
+                     log.warn("Error running execution listener "+listener+" of task "+task+" done", e);
+                 }
+             }
+         }
+     }
+ 
+     @SuppressWarnings("unchecked")
+     protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
+         if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}", 
 -                new Object[] {task.getId(), task, flags, task.getTags(), 
++                new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(), 
+                 (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
+         
+         if (task instanceof ScheduledTask)
+             return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
+         
+         tasksById.put(task.getId(), task);
+         totalTaskCount.incrementAndGet();
+         
+         beforeSubmitAtomicTask(flags, task);
+         
+         if (((TaskInternal<T>)task).getJob() == null) 
+             throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied.");
+         
+         Callable<T> job = new SubmissionCallable<T>(flags, task);
+         
+         // If there's a scheduler then use that; otherwise execute it directly
+         Set<TaskScheduler> schedulers = null;
+         for (Object tago: task.getTags()) {
+             TaskScheduler scheduler = getTaskSchedulerForTag(tago);
+             if (scheduler!=null) {
+                 if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2);
+                 schedulers.add(scheduler);
+             }
+         }
+         Future<T> future;
+         if (schedulers!=null && !schedulers.isEmpty()) {
+             if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers);
+             future = schedulers.iterator().next().submit(job);
+         } else {
+             future = runner.submit(job);
+         }
+         // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
+         // (and we make sure the same ExecutionList is used in the future as in the task)
+         ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
+         // doesn't matter whether the listener is added to the listenableFuture or the task,
+         // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
+         // (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
+         // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
+         ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
+         
+         ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
+         
+         return task;
+     }
+     
+     protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
+         internalBeforeSubmit(flags, task);
+     }
+     protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
+         internalBeforeSubmit(flags, task);
+     }
+     /** invoked when a task is submitted */
+     protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) {
+         incompleteTaskIds.put(task.getId(), task.getId());
+         
+         Task<?> currentTask = Tasks.current();
+         if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask);
+         ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis());
+         
+         if (flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag"));
+         if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags"));
+ 
+         for (Object tag: ((TaskInternal<?>)task).getTags()) {
+             tasksWithTagCreating(tag).add(task);
+         }
+     }
+ 
+     protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) {
+         internalBeforeStart(flags, task);
+     }
+     protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
+         internalBeforeStart(flags, task);
+     }
+     
+     /** invoked in a task's thread when a task is starting to run (may be some time after submitted), 
+      * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */
+     protected void internalBeforeStart(Map<?,?> flags, Task<?> task) {
+         activeTaskCount.incrementAndGet();
+         
+         //set thread _before_ start time, so we won't get a null thread when there is a start-time
+         if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task);
+         if (!task.isCancelled()) {
+             Thread thread = Thread.currentThread();
+             ((TaskInternal<?>)task).setThread(thread);
+             if (RENAME_THREADS) {
+                 threadOriginalName.set(thread.getName());
+                 String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
+                 thread.setName(newThreadName);
+             }
+             PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
+             ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
+         }
+         ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
+     }
+ 
+     /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
+     protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
+         internalAfterEnd(flags, task, false, true);
+     }
+     /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task)},
+      * with a per-iteration task generated by the surrounding scheduled task */
+     protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> scheduledTask, Task<?> taskIteration) {
+         internalAfterEnd(flags, scheduledTask, true, false);
+     }
+     /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked,
+      * and normally (if not interrupted prior to start) 
+      * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */
+     protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) {
+         internalAfterEnd(flags, task, true, true);
+     }
+     /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)},
+      * and, for atomic tasks and scheduled-task submission iterations where 
+      * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */
+     protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations) {
+         if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
+         if (startedInThisThread) {
+             activeTaskCount.decrementAndGet();
+         }
+         if (isEndingAllIterations) {
+             incompleteTaskIds.remove(task.getId());
+             ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+             ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
+         }
+ 
+         if (startedInThisThread) {
+             PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
+             //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time
+             if (RENAME_THREADS && startedInThisThread) {
+                 Thread thread = task.getThread();
+                 if (thread==null) {
+                     log.warn("BasicTask.afterEnd invoked without corresponding beforeStart");
+                 } else {
+                     thread.setName(threadOriginalName.get());
+                     threadOriginalName.remove();
+                 }
+             }
+             ((TaskInternal<?>)task).setThread(null);
+         }
+         synchronized (task) { task.notifyAll(); }
+     }
+ 
+     public TaskScheduler getTaskSchedulerForTag(Object tag) {
+         return schedulerByTag.get(tag);
+     }
+     
+     public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) {
+         synchronized (schedulerByTag) {
+             TaskScheduler old = getTaskSchedulerForTag(tag);
+             if (old!=null) {
+                 if (scheduler.isAssignableFrom(old.getClass())) {
+                     /* already have such an instance */
+                     return;
+                 }
+                 //might support multiple in future...
+                 throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")");
+             }
+             try {
+                 TaskScheduler schedulerI = scheduler.newInstance();
+                 // allow scheduler to have a nice name, for logging etc
+                 if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag);
+                 setTaskSchedulerForTag(tag, schedulerI);
+             } catch (InstantiationException e) {
+                 throw Exceptions.propagate(e);
+             } catch (IllegalAccessException e) {
+                 throw Exceptions.propagate(e);
+             }
+         }
+     }
+     
+     /**
+      * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag.
+      *
+      * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class)
+      * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two.
+      *
+      * @see #setTaskSchedulerForTag(Object, Class)
+      */
+     public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) {
+         synchronized (schedulerByTag) {
+             scheduler.injectExecutor(runner);
+ 
+             Object old = schedulerByTag.put(tag, scheduler);
+             if (old!=null && old!=scheduler) {
+                 //might support multiple in future...
+                 throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")");
+             }
+         }
+     }
+ 
+     /**
+      * Forgets that any scheduler was associated with a tag.
+      *
+      * @see #setTaskSchedulerForTag(Object, TaskScheduler)
+      * @see #setTaskSchedulerForTag(Object, Class)
+      */
+     public boolean clearTaskSchedulerForTag(Object tag) {
+         synchronized (schedulerByTag) {
+             Object old = schedulerByTag.remove(tag);
+             return (old!=null);
+         }
+     }
+     
+     @VisibleForTesting
+     public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() {
+         return schedulerByTag;
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
index 0000000,ab87480..20e6180
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
@@@ -1,0 -1,130 +1,131 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.brooklyn.util.core.task.system.internal;
+ 
+ import java.io.File;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
++import org.apache.brooklyn.core.config.Sanitizer;
+ import org.apache.brooklyn.location.ssh.SshMachineLocation;
+ import org.apache.brooklyn.util.collections.MutableMap;
+ import org.apache.brooklyn.util.core.config.ConfigBag;
+ import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
+ import org.apache.brooklyn.util.core.internal.ssh.process.ProcessTool;
+ import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+ 
+ import com.google.common.base.Function;
+ 
+ public class SystemProcessTaskFactory<T extends SystemProcessTaskFactory<T,RET>,RET> extends AbstractProcessTaskFactory<T, RET> {
+ 
+     private static final Logger log = LoggerFactory.getLogger(SystemProcessTaskFactory.class);
+     
+     // FIXME Plum this through?!
+     private File directory;
+     private Boolean loginShell;
+ 
+     public SystemProcessTaskFactory(String ...commands) {
+         super(commands);
+     }
+     
+     public T directory(File directory) {
+         markDirty();
+         this.directory = directory;
+         return self();
+     }
+     
+     public T loginShell(boolean loginShell) {
+         markDirty();
+         this.loginShell = loginShell;
+         return self();
+     }
+     
+     @Override
+     public T machine(SshMachineLocation machine) {
+         log.warn("Not permitted to set machines on "+this+" (ignoring - "+machine+")");
+         if (log.isDebugEnabled())
+             log.debug("Source of attempt to set machines on "+this+" ("+machine+")",
+                     new Throwable("Source of attempt to set machines on "+this+" ("+machine+")"));
+         return self();
+     }
+ 
+     @Override
+     public ProcessTaskWrapper<RET> newTask() {
+         return new SystemProcessTaskWrapper();
+     }
+ 
+     protected class SystemProcessTaskWrapper extends ProcessTaskWrapper<RET> {
+         protected final String taskTypeShortName;
+         
+         public SystemProcessTaskWrapper() {
+             this("Process");
+         }
+         public SystemProcessTaskWrapper(String taskTypeShortName) {
+             super(SystemProcessTaskFactory.this);
+             this.taskTypeShortName = taskTypeShortName;
+         }
+         @Override
+         protected ConfigBag getConfigForRunning() {
+             ConfigBag result = super.getConfigForRunning();
+             if (directory != null) config.put(ProcessTool.PROP_DIRECTORY, directory.getAbsolutePath());
+             if (loginShell != null) config.put(ProcessTool.PROP_LOGIN_SHELL, loginShell);
+             return result;
+         }
+         @Override
+         protected void run(ConfigBag config) {
+             if (Boolean.FALSE.equals(this.runAsScript)) {
+                 this.exitCode = newExecWithLoggingHelpers().execCommands(config.getAllConfig(), getSummary(), getCommands(), getShellEnvironment());
+             } else { // runScript = null or TRUE
+                 this.exitCode = newExecWithLoggingHelpers().execScript(config.getAllConfig(), getSummary(), getCommands(), getShellEnvironment());
+             }
+         }
+         @Override
+         protected String taskTypeShortName() { return taskTypeShortName; }
+     }
+     
+     protected ExecWithLoggingHelpers newExecWithLoggingHelpers() {
+         return new ExecWithLoggingHelpers("Process") {
+             @Override
+             protected <U> U execWithTool(MutableMap<String, Object> props, Function<ShellTool, U> task) {
+                 // properties typically passed to both
+                 if (log.isDebugEnabled() && props!=null && !props.isEmpty())
 -                    log.debug("Ignoring flags "+props+" when running "+this);
++                    log.debug("Ignoring flags "+Sanitizer.sanitize(props)+" when running "+this);
+                 return task.apply(new ProcessTool());
+             }
+             @Override
+             protected void preExecChecks() {}
+             @Override
+             protected String constructDefaultLoggingPrefix(ConfigBag execFlags) {
+                 return "system.exec";
+             }
+             @Override
+             protected String getTargetName() {
+                 return "local host";
+             }
+         }.logger(log);
+     }
+ 
+     /** concrete instance (for generics) */
+     public static class ConcreteSystemProcessTaskFactory<RET> extends SystemProcessTaskFactory<ConcreteSystemProcessTaskFactory<RET>, RET> {
+         public ConcreteSystemProcessTaskFactory(String ...commands) {
+             super(commands);
+         }
+     }
+     
+ }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
index 0000000,e7d43cd..3c8ce83
mode 000000,100644..100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
@@@ -1,0 -1,82 +1,86 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.brooklyn.core.catalog.internal;
+ 
+ import static org.testng.Assert.assertEquals;
+ import static org.testng.Assert.assertTrue;
+ 
 -import org.testng.annotations.Test;
+ import org.apache.brooklyn.api.catalog.CatalogItem;
 -import org.apache.brooklyn.core.catalog.internal.CatalogItemBuilder;
 -import org.apache.brooklyn.core.catalog.internal.CatalogItemComparator;
++import org.testng.annotations.Test;
+ 
+ public class CatalogItemComparatorTest {
+     private static final String RC2 = "10.5.8-rc2";
+     private static final String STABLE = "10.5.8";
+ 
++    @SuppressWarnings({ "unchecked", "rawtypes" })
+     @Test
+     public void testComparison() {
+         compare("0.0.1", "0.0.2", 1);
+         compare("0.0.2", "0.0.1", -1);
+         compare("0.0.1-qual", "0.0.2", 1);
+         compare("0.0.1.qual", "0.0.2", 1);
 -        compare("0.0.1-qual", "0.0.1_qual", 0);
++        
++        // NB: semantics of this changed in 090-SNAPSHOT not to be 0 unless identical
++        // (remove when we're used to this)
++//        compare("0.0.1-qual", "0.0.1_qual", 0);
++        
+         compare("0.0.1.qual", "0.0.1.qual", 0);
+         compare("0.0.1", "0.0.2-SNAPSHOT", -1);
+         compare("0.0.1", "0.0.2.SNAPSHOT", -1);
+         compare("0.0.0_SNAPSHOT", "0.0.1-SNAPSHOT-20141111114709760", 1);
+         compare("0.0.0.SNAPSHOT", "0.0.1.SNAPSHOT-20141111114709760", 1);
+         compare("2.0", "2.0.1-BUILD", 1);
+         compare("2.0", "2.0.1.BUILD", 1);
+         compare("2.0.1", "2.0-BUILD", -1);
+         compare("2.0.1", "2.0.0.BUILD", -1);
+         compare("2.0", "2.0-BUILD", -1);
+         // Note not true for .qualifier: compare("2.0", "2.0.0.BUILD", -1);
+         compare("2.1", "2.0-BUILD", -1);
+         compare("2.1", "2.0.0.BUILD", -1);
+         compare("1", "1.3", 1);
+         compare("1-beta", "1-rc2", 1);
+         // Note not true for .qualifier: compare("1.0.0.beta", "1.0.0.rc2", 1);
+         compare("1-beta1", "1-beta10", 1);
+ 
+         compare(STABLE, "10.5", -1);
+         compare(STABLE, STABLE, 0);
+ 
+         compare(STABLE, "10.6", 1);
+         compare(STABLE, "10.5.8.1", 1);
+ 
+         compare("10.5.8-rc2", "10.5.8-rc3", 1) ;
+         compare("10.5.8-rc2", "10.5.8-rc1", -1);
+ 
+         compare(STABLE, RC2, -1);
+ 
+         CatalogItemComparator cmp = CatalogItemComparator.INSTANCE;
+         assertTrue(cmp.compare(v(RC2), v("10.5.8-beta1")) == cmp.compare(v(RC2), v("10.5.8-beta3")));
+     }
+     
++    @SuppressWarnings({ "unchecked", "rawtypes" })
+     private void compare(String v1, String v2, int expected) {
+         CatalogItemComparator cmp = CatalogItemComparator.INSTANCE;
+         assertEquals(cmp.compare(v(v1), v(v2)), expected);
+         assertEquals(cmp.compare(v(v2), v(v1)), -expected);
+     }
+     
+     private CatalogItem<?, ?> v(String version) {
+         return CatalogItemBuilder.newEntity("xxx", version).build();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
index 0000000,c1ae306..77ba9c6
mode 000000,100644..100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
@@@ -1,0 -1,228 +1,248 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.brooklyn.core.entity;
+ 
+ import static org.testng.Assert.assertEquals;
+ import static org.testng.Assert.assertTrue;
+ 
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ 
+ import org.apache.brooklyn.api.entity.Application;
+ import org.apache.brooklyn.api.entity.EntitySpec;
+ import org.apache.brooklyn.api.sensor.AttributeSensor;
+ import org.apache.brooklyn.core.sensor.AttributeMap;
+ import org.apache.brooklyn.core.sensor.Sensors;
+ import org.apache.brooklyn.core.test.entity.TestApplication;
+ import org.apache.brooklyn.core.test.entity.TestEntity;
+ import org.apache.brooklyn.core.test.entity.TestEntityImpl;
+ import org.apache.brooklyn.test.Asserts;
+ import org.apache.brooklyn.util.collections.MutableMap;
+ import org.apache.brooklyn.util.guava.Maybe;
+ import org.testng.annotations.AfterMethod;
+ import org.testng.annotations.BeforeMethod;
+ import org.testng.annotations.Test;
+ 
+ import com.google.common.base.Function;
+ import com.google.common.base.Functions;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+ 
+ public class AttributeMapTest {
+     final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+ 
+     Application app;
+     TestEntity entity;
+     TestEntityImpl entityImpl;
+     AttributeMap map;
+     ExecutorService executor;
+     
+     @BeforeMethod(alwaysRun=true)
+     public void setUp() {
+         app = TestApplication.Factory.newManagedInstanceForTests();
+         TestEntity entity = app.addChild(EntitySpec.create(TestEntity.class));
+         entityImpl = (TestEntityImpl) Entities.deproxy(entity);
+         map = new AttributeMap(entityImpl, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
+         executor = Executors.newCachedThreadPool();
+     }
+     
+     @AfterMethod(alwaysRun=true)
+     public void tearDown() {
+         if (executor != null) executor.shutdownNow();
+         if (app != null) Entities.destroyAll(app.getManagementContext());
+     }
+     
+     // See ENGR-2111
+     @Test
+     public void testConcurrentUpdatesDoNotCauseConcurrentModificationException() throws Exception {
+         List<Future<?>> futures = Lists.newArrayList();
+         
+         for (int i = 0; i < NUM_TASKS; i++) {
+             final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+             Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+             futures.add(future);
+         }
+         
+         for (Future<?> future : futures) {
+             future.get();
+         }
+     }
+     
+     @Test
+     public void testConcurrentUpdatesAndGetsDoNotCauseConcurrentModificationException() throws Exception {
+         List<Future<?>> futures = Lists.newArrayList();
+         
+         for (int i = 0; i < NUM_TASKS; i++) {
+             final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+             Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+             Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
+             futures.add(future);
+             futures.add(future2);
+         }
+ 
+         for (Future<?> future : futures) {
+             future.get();
+         }
+     }
+     
+     @Test
++    public void testConcurrentUpdatesAllApplied() throws Exception {
++        List<Future<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("attributeMapTest.exampleSensor"+i);
++            Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, i));
++            futures.add(future);
++        }
++
++        for (Future<?> future : futures) {
++            future.get();
++        }
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("attributeMapTest.exampleSensor"+i);
++            assertEquals(map.getValue(nextSensor), (Integer)i);
++        }
++    }
++
++    @Test
+     public void testStoredSensorsCanBeRetrieved() throws Exception {
+         AttributeSensor<String> sensor1 = Sensors.newStringSensor("a", "");
+         AttributeSensor<String> sensor2 = Sensors.newStringSensor("b.c", "");
+         
+         map.update(sensor1, "1val");
+         map.update(sensor2, "2val");
+         
+         assertEquals(map.getValue(sensor1), "1val");
+         assertEquals(map.getValue(sensor2), "2val");
+         
+         assertEquals(map.getValue(ImmutableList.of("a")), "1val");
+         assertEquals(map.getValue(ImmutableList.of("b","c")), "2val");
+     }
+         
+     @Test
+     public void testStoredByPathCanBeRetrieved() throws Exception {
+         AttributeSensor<String> sensor1 = Sensors.newStringSensor("a", "");
+         AttributeSensor<String> sensor2 = Sensors.newStringSensor("b.c", "");
+         
+         map.update(ImmutableList.of("a"), "1val");
+         map.update(ImmutableList.of("b", "c"), "2val");
+         
+         assertEquals(map.getValue(sensor1), "1val");
+         assertEquals(map.getValue(sensor2), "2val");
+         
+         assertEquals(map.getValue(ImmutableList.of("a")), "1val");
+         assertEquals(map.getValue(ImmutableList.of("b","c")), "2val");
+     }
+         
+     @Test
+     public void testCanStoreSensorThenChildSensor() throws Exception {
+         AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
+         AttributeSensor<String> childSensor = Sensors.newStringSensor("a.b", "");
+         
+         map.update(sensor, "parentValue");
+         map.update(childSensor, "childValue");
+         
+         assertEquals(map.getValue(childSensor), "childValue");
+         assertEquals(map.getValue(sensor), "parentValue");
+     }
+     
+     @Test
+     public void testCanStoreChildThenParentSensor() throws Exception {
+         AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
+         AttributeSensor<String> childSensor = Sensors.newStringSensor("a.b", "");
+         
+         map.update(childSensor, "childValue");
+         map.update(sensor, "parentValue");
+         
+         assertEquals(map.getValue(childSensor), "childValue");
+         assertEquals(map.getValue(sensor), "parentValue");
+     }
+     
+     @Test
+     public void testConcurrentModifyAttributeCalls() throws Exception {
+         AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+         
+         Function<Integer, Maybe<Integer>> modifier = new Function<Integer, Maybe<Integer>>() {
+             @Override public Maybe<Integer> apply(Integer input) {
+                 return Maybe.of((input == null) ? 1 : input + 1);
+             }
+         };
+         
+         List<Future<?>> futures = Lists.newArrayList();
+         
+         for (int i = 0; i < NUM_TASKS; i++) {
+             Future<?> future = executor.submit(newModifyAttributeCallable(map, sensor, modifier));
+             futures.add(future);
+         }
+ 
+         for (Future<?> future : futures) {
+             future.get();
+         }
+ 
+         assertEquals(map.getValue(sensor), Integer.valueOf(NUM_TASKS));
+     }
+     
+     @Test
+     public void testModifyAttributeReturningAbsentDoesNotEmit() throws Exception {
+         AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+         AttributeSensor<Integer> childSensor = Sensors.newIntegerSensor("a.b", "");
+         
+         final RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+         entityImpl.subscriptions().subscribe(entityImpl, sensor, listener);
+         
+         map.modify(childSensor, Functions.constant(Maybe.<Integer>absent()));
+         
+         Asserts.succeedsContinually(new Runnable() {
+             @Override public void run() {
+                 assertTrue(Iterables.isEmpty(listener.getEvents()), "events="+listener.getEvents());
+             }});
+     }
+     
+     protected <T> Runnable newUpdateMapRunnable(final AttributeMap map, final AttributeSensor<T> attribute, final T val) {
+         return new Runnable() {
+             @Override public void run() {
+                 map.update(attribute, val);
+             }
+         };
+     }
+     
+     protected <T> Callable<T> newGetAttributeCallable(final AttributeMap map, final AttributeSensor<T> attribute) {
+         return new Callable<T>() {
+             @Override public T call() {
+                 return map.getValue(attribute);
+             }
+         };
+     }
+     
+     protected <T> Callable<T> newModifyAttributeCallable(final AttributeMap map, final AttributeSensor<T> attribute, final Function<? super T, Maybe<T>> modifier) {
+         return new Callable<T>() {
+             @Override public T call() {
+                 return map.modify(attribute, modifier);
+             }
+         };
+     }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
index 0000000,0000000..f606226
new file mode 100644
--- /dev/null
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
@@@ -1,0 -1,0 +1,275 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.brooklyn.core.entity;
++
++import static org.testng.Assert.assertEquals;
++
++import java.util.List;
++import java.util.concurrent.Executors;
++
++import org.apache.brooklyn.api.entity.EntitySpec;
++import org.apache.brooklyn.api.location.Location;
++import org.apache.brooklyn.api.location.LocationSpec;
++import org.apache.brooklyn.api.policy.PolicySpec;
++import org.apache.brooklyn.api.sensor.AttributeSensor;
++import org.apache.brooklyn.api.sensor.EnricherSpec;
++import org.apache.brooklyn.config.ConfigKey;
++import org.apache.brooklyn.core.config.ConfigKeys;
++import org.apache.brooklyn.core.enricher.BasicEnricherTest;
++import org.apache.brooklyn.core.feed.AbstractFeed;
++import org.apache.brooklyn.core.location.SimulatedLocation;
++import org.apache.brooklyn.core.policy.basic.BasicPolicyTest;
++import org.apache.brooklyn.core.sensor.Sensors;
++import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
++import org.apache.brooklyn.core.test.entity.TestEntity;
++import org.apache.brooklyn.entity.group.BasicGroup;
++import org.apache.brooklyn.entity.stock.BasicEntity;
++import org.apache.brooklyn.test.Asserts;
++import org.testng.annotations.AfterMethod;
++import org.testng.annotations.BeforeMethod;
++import org.testng.annotations.Test;
++
++import com.google.common.base.Predicates;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++import com.google.common.util.concurrent.Futures;
++import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.ListeningExecutorService;
++import com.google.common.util.concurrent.MoreExecutors;
++
++public class EntityConcurrencyTest extends BrooklynAppUnitTestSupport {
++    TestEntity entity;
++    ListeningExecutorService executor;
++    
++    @BeforeMethod(alwaysRun=true)
++    @Override
++    public void setUp() throws Exception {
++        super.setUp();
++        entity = app.addChild(EntitySpec.create(TestEntity.class));
++        executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
++    }
++    
++    @AfterMethod(alwaysRun=true)
++    @Override
++    public void tearDown() throws Exception {
++        super.tearDown();
++        if (executor != null) executor.shutdownNow();
++    }
++    
++    @Test
++    public void testConcurrentSetAttribute() throws Exception {
++        final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            final AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("EntityConcurrencyTest.exampleSensor"+i);
++            final int val = i;
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.sensors().set(nextSensor, val);
++                }});
++            futures.add(future);
++        }
++        
++        Futures.allAsList(futures).get();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("EntityConcurrencyTest.exampleSensor"+i);
++            assertEquals(entity.sensors().get(nextSensor), (Integer)i, "i="+i);
++        }
++    }
++    
++    @Test
++    public void testConcurrentSetConfig() throws Exception {
++        final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            final ConfigKey<Integer> nextKey = ConfigKeys.newIntegerConfigKey("EntityConcurrencyTest.exampleConfig"+i);
++            final int val = i;
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.config().set(nextKey, val);
++                }});
++            futures.add(future);
++        }
++        
++        Futures.allAsList(futures).get();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            final ConfigKey<Integer> nextKey = ConfigKeys.newIntegerConfigKey("EntityConcurrencyTest.exampleConfig"+i);
++            assertEquals(entity.config().get(nextKey), (Integer)i, "i="+i);
++        }
++    }
++    
++    @Test
++    public void testConcurrentAddTag() throws Exception {
++        final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        List<Integer> tags = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            final int val = i;
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.tags().addTag(val);
++                }});
++            futures.add(future);
++            tags.add(val);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        Asserts.assertEqualsIgnoringOrder(entity.tags().getTags(), tags);
++    }
++    
++    @Test
++    public void testConcurrentAddGroup() throws Exception {
++        final int NUM_TASKS = 100;
++        
++        List<BasicGroup> groups = Lists.newArrayList();
++        for (int i = 0; i < NUM_TASKS; i++) {
++            groups.add(app.addChild(EntitySpec.create(BasicGroup.class)));
++        }
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (final BasicGroup group : groups) {
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    group.addMember(entity);
++                }});
++            futures.add(future);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        Asserts.assertEqualsIgnoringOrder(entity.groups(), groups);
++    }
++    
++    @Test
++    public void testConcurrentAddChild() throws Exception {
++        final int NUM_TASKS = 100;
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.addChild(EntitySpec.create(BasicEntity.class));
++                }});
++            futures.add(future);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        assertEquals(entity.getChildren().size(), NUM_TASKS);
++        Asserts.assertEqualsIgnoringOrder(entity.getChildren(), mgmt.getEntityManager().findEntities(Predicates.instanceOf(BasicEntity.class)));
++    }
++    
++    @Test
++    public void testConcurrentAddLocation() throws Exception {
++        final int NUM_TASKS = 100;
++        
++        List<Location> locs = Lists.newArrayList();
++        for (int i = 0; i < NUM_TASKS; i++) {
++            locs.add(mgmt.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class)));
++        }
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (final Location loc : locs) {
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.addLocations(ImmutableList.of(loc));
++                }});
++            futures.add(future);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        Asserts.assertEqualsIgnoringOrder(entity.getLocations(), locs);
++    }
++    
++    @Test
++    public void testConcurrentAddPolicy() throws Exception {
++        final int NUM_TASKS = 100;
++        
++        int numPrePolicies = entity.policies().size();
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.policies().add(PolicySpec.create(BasicPolicyTest.MyPolicy.class));
++                }});
++            futures.add(future);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        assertEquals(entity.policies().size(), NUM_TASKS+numPrePolicies);
++    }
++    
++    @Test
++    public void testConcurrentAddEnricher() throws Exception {
++        final int NUM_TASKS = 100;
++        
++        int numPreEnrichers = entity.enrichers().size();
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.enrichers().add(EnricherSpec.create(BasicEnricherTest.MyEnricher.class));
++                }});
++            futures.add(future);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        assertEquals(entity.enrichers().size(), NUM_TASKS+numPreEnrichers);
++    }
++    
++    @Test
++    public void testConcurrentAddFeed() throws Exception {
++        final int NUM_TASKS = 100;
++        
++        List<ListenableFuture<?>> futures = Lists.newArrayList();
++        
++        for (int i = 0; i < NUM_TASKS; i++) {
++            ListenableFuture<?> future = executor.submit(new Runnable() {
++                @Override public void run() {
++                    entity.feeds().addFeed(new MyFeed());
++                }});
++            futures.add(future);
++        }
++
++        Futures.allAsList(futures).get();
++        
++        assertEquals(entity.feeds().getFeeds().size(), NUM_TASKS);
++    }
++    private static class MyFeed extends AbstractFeed {
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
index 0000000,7b68d28..3b68738
mode 000000,100644..100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
@@@ -1,0 -1,77 +1,83 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.brooklyn.core.entity;
+ 
+ import static org.testng.Assert.assertEquals;
+ import static org.testng.Assert.assertNull;
+ 
+ import org.apache.brooklyn.api.entity.EntitySpec;
+ import org.apache.brooklyn.api.location.Location;
+ import org.apache.brooklyn.core.entity.EntityFunctions;
+ import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+ import org.apache.brooklyn.core.test.entity.TestEntity;
+ import org.testng.annotations.BeforeMethod;
+ import org.testng.annotations.Test;
+ 
+ import com.google.common.base.Predicates;
+ import com.google.common.collect.ImmutableList;
+ 
+ public class EntityFunctionsTest extends BrooklynAppUnitTestSupport {
+ 
+     private TestEntity entity;
+     private Location loc;
+     
+     @BeforeMethod(alwaysRun=true)
+     @Override
+     public void setUp() throws Exception {
+         super.setUp();
+         entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).displayName("mydisplayname"));
+         loc = app.getManagementContext().getLocationRegistry().resolve("localhost");
+     }
+ 
+     @Test
+     public void testAttribute() throws Exception {
+         entity.sensors().set(TestEntity.NAME, "myname");
+         assertEquals(EntityFunctions.attribute(TestEntity.NAME).apply(entity), "myname");
+         assertNull(EntityFunctions.attribute(TestEntity.SEQUENCE).apply(entity));
+     }
++
++    @Test
++    public void testEntityAttributeTest() {
++        entity.sensors().set(TestEntity.NAME, "myname");
++        assertEquals(EntityFunctions.attribute(entity, TestEntity.NAME).apply(new Object()), "myname");
++    }
+     
+     @Test
+     public void testConfig() throws Exception {
+         entity.config().set(TestEntity.CONF_NAME, "myname");
+         assertEquals(EntityFunctions.config(TestEntity.CONF_NAME).apply(entity), "myname");
+         assertNull(EntityFunctions.config(TestEntity.CONF_OBJECT).apply(entity));
+     }
+     
+     @Test
+     public void testDisplayName() throws Exception {
+         assertEquals(EntityFunctions.displayName().apply(entity), "mydisplayname");
+     }
+     
+     @Test
+     public void testId() throws Exception {
+         assertEquals(EntityFunctions.id().apply(entity), entity.getId());
+     }
+     
+     @Test
+     public void testLocationMatching() throws Exception {
+         entity.addLocations(ImmutableList.of(loc));
+         assertEquals(EntityFunctions.locationMatching(Predicates.alwaysTrue()).apply(entity), loc);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistryTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistryTest.java
index 0000000,0000000..6f2f573
new file mode 100644
--- /dev/null
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistryTest.java
@@@ -1,0 -1,0 +1,186 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.brooklyn.core.typereg;
++
++import org.apache.brooklyn.api.typereg.RegisteredType;
++import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport;
++import org.apache.brooklyn.test.Asserts;
++import org.apache.brooklyn.util.collections.MutableList;
++import org.apache.brooklyn.util.collections.MutableSet;
++import org.testng.Assert;
++import org.testng.annotations.Test;
++
++import com.google.common.base.Predicates;
++import com.google.common.collect.Iterables;
++
++public class BasicBrooklynTypeRegistryTest extends BrooklynMgmtUnitTestSupport {
++
++    private BasicBrooklynTypeRegistry registry() {
++        return (BasicBrooklynTypeRegistry) mgmt.getTypeRegistry();
++    }
++    
++    private void add(RegisteredType type) {
++        add(type, false);
++    }
++    private void add(RegisteredType type, boolean canForce) {
++        registry().addToLocalUnpersistedTypeRegistry(type, canForce);
++    }
++    
++    private final static RegisteredType SAMPLE_TYPE = RegisteredTypes.bean("item.A", "1", new BasicTypeImplementationPlan("ignore", null), String.class);
++    private final static RegisteredType SAMPLE_TYPE2 = RegisteredTypes.bean("item.A", "2", new BasicTypeImplementationPlan("ignore", null), String.class);
++    
++    @Test
++    public void testAddAndGet() {
++        Assert.assertFalse( Iterables.contains(registry().getAll(), SAMPLE_TYPE) );
++        Assert.assertNull( registry().get(SAMPLE_TYPE.getSymbolicName(), SAMPLE_TYPE.getVersion()) );
++        Assert.assertNull( registry().get(SAMPLE_TYPE.getId()) );
++        add(SAMPLE_TYPE);
++        
++        Assert.assertTrue( Iterables.contains(registry().getAll(), SAMPLE_TYPE) );
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getSymbolicName(), SAMPLE_TYPE.getVersion()), SAMPLE_TYPE );
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++        
++        Assert.assertTrue( Iterables.contains(registry().getMatching(
++            RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName())), SAMPLE_TYPE) );
++    }
++
++    @Test
++    public void testCantAddSameIdUnlessSameInstanceOrForced() {
++        add(SAMPLE_TYPE);
++        RegisteredType sampleTypeClone = RegisteredTypes.bean("item.A", "1", new BasicTypeImplementationPlan("ignore", null), String.class);
++        add(sampleTypeClone, true);
++        Assert.assertNotEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++        
++        add(SAMPLE_TYPE, true);
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++
++        try {
++            add(sampleTypeClone);
++            Asserts.shouldHaveFailedPreviously();
++        } catch (Exception e) {
++            Asserts.expectedFailureContains(e, SAMPLE_TYPE.getSymbolicName());
++        }
++        
++        // only one entry
++        Assert.assertEquals( Iterables.size(registry().getMatching(
++            RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 1);
++        // unversioned request returns sample
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getSymbolicName()), SAMPLE_TYPE );
++    }
++
++    @Test
++    public void testGettingBestVersion() {
++        add(SAMPLE_TYPE);
++        add(SAMPLE_TYPE2);
++        
++        Assert.assertTrue( Iterables.contains(registry().getAll(), SAMPLE_TYPE) );
++        Assert.assertTrue( Iterables.contains(registry().getAll(), SAMPLE_TYPE2) );
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++        Assert.assertEquals( registry().get(SAMPLE_TYPE2.getId()), SAMPLE_TYPE2 );
++        Assert.assertNotEquals( registry().get(SAMPLE_TYPE2.getId()), SAMPLE_TYPE );
++        
++        Assert.assertEquals( Iterables.size(registry().getMatching(
++            RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 2);
++        
++        // unversioned request returns latest
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getSymbolicName()), SAMPLE_TYPE2 );
++    }
++
++    @Test
++    public void testGetWithFilter() {
++        add(SAMPLE_TYPE);
++        
++        Assert.assertEquals( Iterables.size(registry().getMatching(Predicates.and(
++            RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()),
++            RegisteredTypePredicates.subtypeOf(String.class)
++            ))), 1 );
++        Assert.assertTrue( Iterables.isEmpty(registry().getMatching(Predicates.and(
++                RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()),
++                RegisteredTypePredicates.subtypeOf(Integer.class)
++            ))) );
++    }
++    
++    @Test
++    public void testGetWithContext() {
++        add(SAMPLE_TYPE);
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getId(),  
++            RegisteredTypeLoadingContexts.bean(String.class)), SAMPLE_TYPE );
++        Assert.assertEquals( registry().get(SAMPLE_TYPE.getId(),  
++            RegisteredTypeLoadingContexts.bean(Integer.class)), null );
++    }
++
++    @Test
++    public void testAlias() {
++        add(SAMPLE_TYPE);
++        add(SAMPLE_TYPE2);
++        
++        RegisteredType sampleType15WithAliases = RegisteredTypes.addAliases(
++            RegisteredTypes.bean("item.A", "1.1", new BasicTypeImplementationPlan("ignore", null), String.class),
++            MutableList.of("my_a", "the_a"));
++        add(sampleType15WithAliases);
++        Assert.assertEquals(sampleType15WithAliases.getAliases(), MutableSet.of("my_a", "the_a"));
++        
++        Assert.assertEquals( Iterables.size(registry().getMatching(
++            RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 3);
++        
++        Assert.assertEquals( registry().get("my_a"), sampleType15WithAliases );
++        Assert.assertEquals( registry().get("the_a"), sampleType15WithAliases );
++        Assert.assertEquals( registry().get(sampleType15WithAliases.getId()), sampleType15WithAliases );
++        
++        // but unadorned type still returns v2
++        Assert.assertEquals( registry().get(sampleType15WithAliases.getSymbolicName()), SAMPLE_TYPE2 );
++        
++        // and filters work
++        Assert.assertEquals( registry().getMatching(RegisteredTypePredicates.alias("the_a")),
++            MutableList.of(sampleType15WithAliases) );
++        Assert.assertEquals( registry().get("my_a",  
++            RegisteredTypeLoadingContexts.bean(String.class)), sampleType15WithAliases );
++        Assert.assertEquals( registry().get("my_a",  
++            RegisteredTypeLoadingContexts.bean(Integer.class)), null );
++    }
++
++    @Test
++    public void testTags() {
++        add(SAMPLE_TYPE);
++        add(SAMPLE_TYPE2);
++        
++        RegisteredType sampleType15WithTags = RegisteredTypes.addTags(
++            RegisteredTypes.bean("item.A", "1.1", new BasicTypeImplementationPlan("ignore", null), String.class),
++            MutableList.of("my_a", "the_a"));
++        add(sampleType15WithTags);
++        Assert.assertEquals(sampleType15WithTags.getTags(), MutableSet.of("my_a", "the_a"));
++        
++        Assert.assertEquals( Iterables.size(registry().getMatching(
++            RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 3);
++        
++        Assert.assertEquals( registry().get(sampleType15WithTags.getId()), sampleType15WithTags );
++        
++        // and filters work
++        Assert.assertEquals( registry().getMatching(RegisteredTypePredicates.tag("the_a")),
++            MutableList.of(sampleType15WithTags) );
++        
++        // but can't lookup by tag as a get
++        Assert.assertEquals( registry().get("my_a"), null );
++        
++        // and unadorned type still returns v2
++        Assert.assertEquals( registry().get(sampleType15WithTags.getSymbolicName()), SAMPLE_TYPE2 );
++        
++    }
++
++}