You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/12/23 12:06:34 UTC
[11/71] [abbrv] incubator-brooklyn git commit: Merge commit 'e430723'
into reorg2
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 0000000,02277a1..d90b1a1
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@@ -1,0 -1,782 +1,783 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.util.core.task;
+
+ import static com.google.common.base.Preconditions.checkNotNull;
+
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+ import java.util.LinkedHashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.CancellationException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.CopyOnWriteArrayList;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledThreadPoolExecutor;
+ import java.util.concurrent.SynchronousQueue;
+ import java.util.concurrent.ThreadFactory;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.concurrent.atomic.AtomicLong;
+
+ import org.apache.brooklyn.api.mgmt.ExecutionManager;
+ import org.apache.brooklyn.api.mgmt.HasTaskChildren;
+ import org.apache.brooklyn.api.mgmt.Task;
+ import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+ import org.apache.brooklyn.core.BrooklynFeatureEnablement;
++import org.apache.brooklyn.core.config.Sanitizer;
+ import org.apache.brooklyn.util.collections.MutableList;
+ import org.apache.brooklyn.util.exceptions.Exceptions;
+ import org.apache.brooklyn.util.text.Identifiers;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.common.annotations.Beta;
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.base.CaseFormat;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.ExecutionList;
+ import com.google.common.util.concurrent.ListenableFuture;
+ import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+ /**
+ * Manages the execution of atomic tasks and scheduled (recurring) tasks,
+ * including setting tags and invoking callbacks.
+ */
+ public class BasicExecutionManager implements ExecutionManager {
+ private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class);
+
+ private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS);
+
+ private static class PerThreadCurrentTaskHolder {
+ public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>();
+ }
+
+ public static ThreadLocal<Task<?>> getPerThreadCurrentTask() {
+ return PerThreadCurrentTaskHolder.perThreadCurrentTask;
+ }
+
+ private final ThreadFactory threadFactory;
+
+ private final ThreadFactory daemonThreadFactory;
+
+ private final ExecutorService runner;
+
+ private final ScheduledExecutorService delayedRunner;
+
+ // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag,
+ // so the same task could be listed multiple times if it has multiple tags...
+
+ //access to this field AND to members in this field is synchronized,
+ //to allow us to preserve order while guaranteeing thread-safe
+ //(but more testing is needed before we are completely sure it is thread-safe!)
+ //synch blocks are as finely grained as possible for efficiency;
+ //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty
+ private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>();
+
+ private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>();
+
+ private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>();
+
+ /** count of all tasks submitted, including finished */
+ private final AtomicLong totalTaskCount = new AtomicLong();
+
+ /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */
+ private Map<String,String> incompleteTaskIds = new ConcurrentHashMap<String,String>();
+
+ /** tasks started but not yet finished */
+ private final AtomicInteger activeTaskCount = new AtomicInteger();
+
+ private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>();
+
+ private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() {
+ protected String initialValue() {
+ // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked
+ log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current());
+ return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8);
+ }
+ };
+
+ public BasicExecutionManager(String contextid) {
+ threadFactory = newThreadFactory(contextid);
+ daemonThreadFactory = new ThreadFactoryBuilder()
+ .setThreadFactory(threadFactory)
+ .setDaemon(true)
+ .build();
+
+ // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown!
+ runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ daemonThreadFactory);
+
+ delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory);
+ }
+
+ private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("Uncaught exception in thread "+t.getName(), e);
+ }
+ }
+
+ /**
+ * For use by overriders to use custom thread factory.
+ * But be extremely careful: called by constructor, so before sub-class' constructor will
+ * have been invoked!
+ */
+ protected ThreadFactory newThreadFactory(String contextid) {
+ return new ThreadFactoryBuilder()
+ .setNameFormat("brooklyn-execmanager-"+contextid+"-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation())
+ .build();
+ }
+
+ public void shutdownNow() {
+ runner.shutdownNow();
+ delayedRunner.shutdownNow();
+ }
+
+ public void addListener(ExecutionListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(ExecutionListener listener) {
+ listeners.remove(listener);
+ }
+
+ /**
+ * Deletes the given tag, including all tasks using this tag.
+ *
+ * Useful, for example, if an entity is being expunged so that we don't keep holding
+ * a reference to it as a tag.
+ */
+ public void deleteTag(Object tag) {
+ Set<Task<?>> tasks;
+ synchronized (tasksByTag) {
+ tasks = tasksByTag.remove(tag);
+ }
+ if (tasks != null) {
+ for (Task<?> task : tasks) {
+ deleteTask(task);
+ }
+ }
+ }
+
+ public void deleteTask(Task<?> task) {
+ boolean removed = deleteTaskNonRecursive(task);
+ if (!removed) return;
+
+ if (task instanceof HasTaskChildren) {
+ List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren());
+ for (Task<?> child : children) {
+ deleteTask(child);
+ }
+ }
+ }
+
+ protected boolean deleteTaskNonRecursive(Task<?> task) {
+ Set<?> tags = checkNotNull(task, "task").getTags();
+ for (Object tag : tags) {
+ synchronized (tasksByTag) {
+ Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
+ if (tasks != null) {
+ tasks.remove(task);
+ if (tasks.isEmpty()) {
+ tasksByTag.remove(tag);
+ }
+ }
+ }
+ }
+ Task<?> removed = tasksById.remove(task.getId());
+ incompleteTaskIds.remove(task.getId());
+ if (removed!=null && removed.isSubmitted() && !removed.isDone()) {
+ log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?");
+ }
+ return removed != null;
+ }
+
+ public boolean isShutdown() {
+ return runner.isShutdown();
+ }
+
+ /** count of all tasks submitted */
+ public long getTotalTasksSubmitted() {
+ return totalTaskCount.get();
+ }
+
+ /** count of tasks submitted but not ended */
+ public long getNumIncompleteTasks() {
+ return incompleteTaskIds.size();
+ }
+
+ /** count of tasks started but not ended */
+ public long getNumActiveTasks() {
+ return activeTaskCount.get();
+ }
+
+ /** count of tasks kept in memory, often including ended tasks */
+ public long getNumInMemoryTasks() {
+ return tasksById.size();
+ }
+
+ private Set<Task<?>> tasksWithTagCreating(Object tag) {
+ Preconditions.checkNotNull(tag);
+ synchronized (tasksByTag) {
+ Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+ if (result==null) {
+ result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>());
+ tasksByTag.put(tag, result);
+ }
+ return result;
+ }
+ }
+
+ /** exposes live view, for internal use only */
+ @Beta
+ public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
+ synchronized (tasksByTag) {
+ return tasksByTag.get(tag);
+ }
+ }
+
+ @Override
+ public Task<?> getTask(String id) {
+ return tasksById.get(id);
+ }
+
+ /** not on interface because potentially expensive */
+ public List<Task<?>> getAllTasks() {
+ // not sure if synching makes any difference; have not observed CME's yet
+ // (and so far this is only called when a CME was caught on a previous operation)
+ synchronized (tasksById) {
+ return MutableList.copyOf(tasksById.values());
+ }
+ }
+
+ @Override
+ public Set<Task<?>> getTasksWithTag(Object tag) {
+ Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
+ if (result==null) return Collections.emptySet();
+ synchronized (result) {
+ return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result));
+ }
+ }
+
+ @Override
+ public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) {
+ Set<Task<?>> result = new LinkedHashSet<Task<?>>();
+ Iterator<?> ti = tags.iterator();
+ while (ti.hasNext()) {
+ Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next());
+ if (tasksForTag!=null) {
+ synchronized (tasksForTag) {
+ result.addAll(tasksForTag);
+ }
+ }
+ }
+ return Collections.unmodifiableSet(result);
+ }
+
+ /** only works with at least one tag; returns empty if no tags */
+ @Override
+ public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) {
+ //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!)
+ //by first looking for the least-used tag, getting those tasks, and then for each of those tasks
+ //checking whether it contains the other tags (looking for second-least used, then third-least used, etc)
+ Set<Task<?>> result = new LinkedHashSet<Task<?>>();
+ boolean first = true;
+ Iterator<?> ti = tags.iterator();
+ while (ti.hasNext()) {
+ Object tag = ti.next();
+ if (first) {
+ first = false;
+ result.addAll(getTasksWithTag(tag));
+ } else {
+ result.retainAll(getTasksWithTag(tag));
+ }
+ }
+ return Collections.unmodifiableSet(result);
+ }
+
+ /** live view of all tasks, for internal use only */
+ @Beta
+ public Collection<Task<?>> allTasksLive() { return tasksById.values(); }
+
+ public Set<Object> getTaskTags() {
+ synchronized (tasksByTag) {
+ return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet()));
+ }
+ }
+
+ public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
+ public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); }
+
+ public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
+ public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); }
+
+ public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); }
+ public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) {
+ if (!(task instanceof Task))
+ task = task.asTask();
+ synchronized (task) {
+ if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task;
+ return submitNewTask(flags, (Task<T>) task);
+ }
+ }
+
+ public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); }
+ public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) {
+ synchronized (task) {
+ if (((TaskInternal<?>)task).getInternalFuture()!=null) return task;
+ return submitNewTask(flags, task);
+ }
+ }
+
+ protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
+ tasksById.put(task.getId(), task);
+ totalTaskCount.incrementAndGet();
+
+ beforeSubmitScheduledTaskAllIterations(flags, task);
+
+ return submitSubsequentScheduledTask(flags, task);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
+ if (!task.isDone()) {
+ task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
+ task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
+ } else {
+ afterEndScheduledTaskAllIterations(flags, task);
+ }
+ return task;
+ }
+
+ protected class ScheduledTaskCallable implements Callable<Object> {
+ public ScheduledTask task;
+ public Map<?,?> flags;
+
+ public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) {
+ this.task = task;
+ this.flags = flags;
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Object call() {
+ if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis();
+ TaskInternal<?> taskScheduled = null;
+ try {
+ beforeStartScheduledTaskSubmissionIteration(flags, task);
+ taskScheduled = (TaskInternal<?>) task.newTask();
+ taskScheduled.setSubmittedByTask(task);
+ final Callable<?> oldJob = taskScheduled.getJob();
+ final TaskInternal<?> taskScheduledF = taskScheduled;
+ taskScheduled.setJob(new Callable() { public Object call() {
+ boolean shouldResubmit = true;
+ task.recentRun = taskScheduledF;
+ try {
+ synchronized (task) {
+ task.notifyAll();
+ }
+ Object result;
+ try {
+ result = oldJob.call();
+ task.lastThrownType = null;
+ } catch (Exception e) {
+ shouldResubmit = shouldResubmitOnException(oldJob, e);
+ throw Exceptions.propagate(e);
+ }
+ return result;
+ } finally {
+ // do in finally block in case we were interrupted
+ if (shouldResubmit) {
+ resubmit();
+ } else {
+ afterEndScheduledTaskAllIterations(flags, task);
+ }
+ }
+ }});
+ task.nextRun = taskScheduled;
+ BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
+ if (ec!=null) return ec.submit(taskScheduled);
+ else return submit(taskScheduled);
+ } finally {
+ afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled);
+ }
+ }
+
+ private void resubmit() {
+ task.runCount++;
+ if (task.period!=null && !task.isCancelled()) {
+ task.delay = task.period;
+ submitSubsequentScheduledTask(flags, task);
+ }
+ }
+
+ private boolean shouldResubmitOnException(Callable<?> oldJob, Exception e) {
+ String message = "Error executing " + oldJob + " (scheduled job of " + task + " - " + task.getDescription() + ")";
+ if (Tasks.isInterrupted()) {
+ log.debug(message + "; cancelling scheduled execution: " + e);
+ return false;
+ } else if (task.cancelOnException) {
+ log.warn(message + "; cancelling scheduled execution.", e);
+ return false;
+ } else {
+ message += "; resubmitting task and throwing: " + e;
+ if (!e.getClass().equals(task.lastThrownType)) {
+ task.lastThrownType = e.getClass();
+ message += " (logging subsequent exceptions at trace)";
+ log.debug(message);
+ } else {
+ message += " (repeat exception)";
+ log.trace(message);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ScheduledTaskCallable["+task+","+flags+"]";
+ }
+ }
+
+ private final class SubmissionCallable<T> implements Callable<T> {
+ private final Map<?, ?> flags;
+ private final Task<T> task;
+
+ private SubmissionCallable(Map<?, ?> flags, Task<T> task) {
+ this.flags = flags;
+ this.task = task;
+ }
+
+ public T call() {
+ try {
+ T result = null;
+ Throwable error = null;
+ String oldThreadName = Thread.currentThread().getName();
+ try {
+ if (RENAME_THREADS) {
+ String newThreadName = oldThreadName+"-"+task.getDisplayName()+
+ "["+task.getId().substring(0, 8)+"]";
+ Thread.currentThread().setName(newThreadName);
+ }
+ beforeStartAtomicTask(flags, task);
+ if (!task.isCancelled()) {
+ result = ((TaskInternal<T>)task).getJob().call();
+ } else throw new CancellationException();
+ } catch(Throwable e) {
+ error = e;
+ } finally {
+ if (RENAME_THREADS) {
+ Thread.currentThread().setName(oldThreadName);
+ }
+ afterEndAtomicTask(flags, task);
+ }
+ if (error!=null) {
+ /* we throw, after logging debug.
+ * the throw means the error is available for task submitters to monitor.
+ * however it is possible no one is monitoring it, in which case we will have debug logging only for errors.
+ * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!)
+ */
+ if (log.isDebugEnabled()) {
+ // debug only here, because most submitters will handle failures
+ log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+ if (log.isTraceEnabled())
+ log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
+ }
+ throw Exceptions.propagate(error);
+ }
+ return result;
+ } finally {
+ ((TaskInternal<?>)task).runListeners();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "BEM.call("+task+","+flags+")";
+ }
+ }
+
+ private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
+ private final Task<T> task;
+
+ private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
+ super(delegate, list);
+ this.task = task;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean result = false;
+ if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
+ result |= super.cancel(mayInterruptIfRunning);
+ ((TaskInternal<?>)task).runListeners();
+ return result;
+ }
+ }
+
+ private final class SubmissionListenerToCallOtherListeners<T> implements Runnable {
+ private final Task<T> task;
+
+ private SubmissionListenerToCallOtherListeners(Task<T> task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ try {
+ ((TaskInternal<?>)task).runListeners();
+ } catch (Exception e) {
+ log.warn("Error running task listeners for task "+task+" done", e);
+ }
+
+ for (ExecutionListener listener : listeners) {
+ try {
+ listener.onTaskDone(task);
+ } catch (Exception e) {
+ log.warn("Error running execution listener "+listener+" of task "+task+" done", e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
+ if (log.isTraceEnabled()) log.trace("Submitting task {} ({}), with flags {}, and tags {}, job {}",
- new Object[] {task.getId(), task, flags, task.getTags(),
++ new Object[] {task.getId(), task, Sanitizer.sanitize(flags), task.getTags(),
+ (task instanceof TaskInternal ? ((TaskInternal<T>)task).getJob() : "<unavailable>")});
+
+ if (task instanceof ScheduledTask)
+ return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
+
+ tasksById.put(task.getId(), task);
+ totalTaskCount.incrementAndGet();
+
+ beforeSubmitAtomicTask(flags, task);
+
+ if (((TaskInternal<T>)task).getJob() == null)
+ throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied.");
+
+ Callable<T> job = new SubmissionCallable<T>(flags, task);
+
+ // If there's a scheduler then use that; otherwise execute it directly
+ Set<TaskScheduler> schedulers = null;
+ for (Object tago: task.getTags()) {
+ TaskScheduler scheduler = getTaskSchedulerForTag(tago);
+ if (scheduler!=null) {
+ if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2);
+ schedulers.add(scheduler);
+ }
+ }
+ Future<T> future;
+ if (schedulers!=null && !schedulers.isEmpty()) {
+ if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers);
+ future = schedulers.iterator().next().submit(job);
+ } else {
+ future = runner.submit(job);
+ }
+ // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
+ // (and we make sure the same ExecutionList is used in the future as in the task)
+ ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
+ // doesn't matter whether the listener is added to the listenableFuture or the task,
+ // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
+ // (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
+ // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
+ ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
+
+ ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
+
+ return task;
+ }
+
+ protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
+ internalBeforeSubmit(flags, task);
+ }
+ protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
+ internalBeforeSubmit(flags, task);
+ }
+ /** invoked when a task is submitted */
+ protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) {
+ incompleteTaskIds.put(task.getId(), task.getId());
+
+ Task<?> currentTask = Tasks.current();
+ if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask);
+ ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis());
+
+ if (flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag"));
+ if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags"));
+
+ for (Object tag: ((TaskInternal<?>)task).getTags()) {
+ tasksWithTagCreating(tag).add(task);
+ }
+ }
+
+ protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) {
+ internalBeforeStart(flags, task);
+ }
+ protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
+ internalBeforeStart(flags, task);
+ }
+
+ /** invoked in a task's thread when a task is starting to run (may be some time after submitted),
+ * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */
+ protected void internalBeforeStart(Map<?,?> flags, Task<?> task) {
+ activeTaskCount.incrementAndGet();
+
+ //set thread _before_ start time, so we won't get a null thread when there is a start-time
+ if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task);
+ if (!task.isCancelled()) {
+ Thread thread = Thread.currentThread();
+ ((TaskInternal<?>)task).setThread(thread);
+ if (RENAME_THREADS) {
+ threadOriginalName.set(thread.getName());
+ String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
+ thread.setName(newThreadName);
+ }
+ PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
+ ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
+ }
+ ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
+ }
+
+ /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
+ protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
+ internalAfterEnd(flags, task, false, true);
+ }
+ /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task)},
+ * with a per-iteration task generated by the surrounding scheduled task */
+ protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> scheduledTask, Task<?> taskIteration) {
+ internalAfterEnd(flags, scheduledTask, true, false);
+ }
+ /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked,
+ * and normally (if not interrupted prior to start)
+ * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */
+ protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) {
+ internalAfterEnd(flags, task, true, true);
+ }
+ /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)},
+ * and, for atomic tasks and scheduled-task submission iterations where
+ * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */
+ protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations) {
+ if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
+ if (startedInThisThread) {
+ activeTaskCount.decrementAndGet();
+ }
+ if (isEndingAllIterations) {
+ incompleteTaskIds.remove(task.getId());
+ ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
+ ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
+ }
+
+ if (startedInThisThread) {
+ PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
+ //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time
+ if (RENAME_THREADS && startedInThisThread) {
+ Thread thread = task.getThread();
+ if (thread==null) {
+ log.warn("BasicTask.afterEnd invoked without corresponding beforeStart");
+ } else {
+ thread.setName(threadOriginalName.get());
+ threadOriginalName.remove();
+ }
+ }
+ ((TaskInternal<?>)task).setThread(null);
+ }
+ synchronized (task) { task.notifyAll(); }
+ }
+
+ public TaskScheduler getTaskSchedulerForTag(Object tag) {
+ return schedulerByTag.get(tag);
+ }
+
+ public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) {
+ synchronized (schedulerByTag) {
+ TaskScheduler old = getTaskSchedulerForTag(tag);
+ if (old!=null) {
+ if (scheduler.isAssignableFrom(old.getClass())) {
+ /* already have such an instance */
+ return;
+ }
+ //might support multiple in future...
+ throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")");
+ }
+ try {
+ TaskScheduler schedulerI = scheduler.newInstance();
+ // allow scheduler to have a nice name, for logging etc
+ if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag);
+ setTaskSchedulerForTag(tag, schedulerI);
+ } catch (InstantiationException e) {
+ throw Exceptions.propagate(e);
+ } catch (IllegalAccessException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ }
+
+ /**
+ * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag.
+ *
+ * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class)
+ * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two.
+ *
+ * @see #setTaskSchedulerForTag(Object, Class)
+ */
+ public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) {
+ synchronized (schedulerByTag) {
+ scheduler.injectExecutor(runner);
+
+ Object old = schedulerByTag.put(tag, scheduler);
+ if (old!=null && old!=scheduler) {
+ //might support multiple in future...
+ throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")");
+ }
+ }
+ }
+
+ /**
+ * Forgets that any scheduler was associated with a tag.
+ *
+ * @see #setTaskSchedulerForTag(Object, TaskScheduler)
+ * @see #setTaskSchedulerForTag(Object, Class)
+ */
+ public boolean clearTaskSchedulerForTag(Object tag) {
+ synchronized (schedulerByTag) {
+ Object old = schedulerByTag.remove(tag);
+ return (old!=null);
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() {
+ return schedulerByTag;
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
index 0000000,ab87480..20e6180
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/util/core/task/system/internal/SystemProcessTaskFactory.java
@@@ -1,0 -1,130 +1,131 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.util.core.task.system.internal;
+
+ import java.io.File;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
++import org.apache.brooklyn.core.config.Sanitizer;
+ import org.apache.brooklyn.location.ssh.SshMachineLocation;
+ import org.apache.brooklyn.util.collections.MutableMap;
+ import org.apache.brooklyn.util.core.config.ConfigBag;
+ import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
+ import org.apache.brooklyn.util.core.internal.ssh.process.ProcessTool;
+ import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+
+ import com.google.common.base.Function;
+
+ public class SystemProcessTaskFactory<T extends SystemProcessTaskFactory<T,RET>,RET> extends AbstractProcessTaskFactory<T, RET> {
+
+ private static final Logger log = LoggerFactory.getLogger(SystemProcessTaskFactory.class);
+
+ // FIXME Plum this through?!
+ private File directory;
+ private Boolean loginShell;
+
+ public SystemProcessTaskFactory(String ...commands) {
+ super(commands);
+ }
+
+ public T directory(File directory) {
+ markDirty();
+ this.directory = directory;
+ return self();
+ }
+
+ public T loginShell(boolean loginShell) {
+ markDirty();
+ this.loginShell = loginShell;
+ return self();
+ }
+
+ @Override
+ public T machine(SshMachineLocation machine) {
+ log.warn("Not permitted to set machines on "+this+" (ignoring - "+machine+")");
+ if (log.isDebugEnabled())
+ log.debug("Source of attempt to set machines on "+this+" ("+machine+")",
+ new Throwable("Source of attempt to set machines on "+this+" ("+machine+")"));
+ return self();
+ }
+
+ @Override
+ public ProcessTaskWrapper<RET> newTask() {
+ return new SystemProcessTaskWrapper();
+ }
+
+ protected class SystemProcessTaskWrapper extends ProcessTaskWrapper<RET> {
+ protected final String taskTypeShortName;
+
+ public SystemProcessTaskWrapper() {
+ this("Process");
+ }
+ public SystemProcessTaskWrapper(String taskTypeShortName) {
+ super(SystemProcessTaskFactory.this);
+ this.taskTypeShortName = taskTypeShortName;
+ }
+ @Override
+ protected ConfigBag getConfigForRunning() {
+ ConfigBag result = super.getConfigForRunning();
+ if (directory != null) config.put(ProcessTool.PROP_DIRECTORY, directory.getAbsolutePath());
+ if (loginShell != null) config.put(ProcessTool.PROP_LOGIN_SHELL, loginShell);
+ return result;
+ }
+ @Override
+ protected void run(ConfigBag config) {
+ if (Boolean.FALSE.equals(this.runAsScript)) {
+ this.exitCode = newExecWithLoggingHelpers().execCommands(config.getAllConfig(), getSummary(), getCommands(), getShellEnvironment());
+ } else { // runScript = null or TRUE
+ this.exitCode = newExecWithLoggingHelpers().execScript(config.getAllConfig(), getSummary(), getCommands(), getShellEnvironment());
+ }
+ }
+ @Override
+ protected String taskTypeShortName() { return taskTypeShortName; }
+ }
+
+ protected ExecWithLoggingHelpers newExecWithLoggingHelpers() {
+ return new ExecWithLoggingHelpers("Process") {
+ @Override
+ protected <U> U execWithTool(MutableMap<String, Object> props, Function<ShellTool, U> task) {
+ // properties typically passed to both
+ if (log.isDebugEnabled() && props!=null && !props.isEmpty())
- log.debug("Ignoring flags "+props+" when running "+this);
++ log.debug("Ignoring flags "+Sanitizer.sanitize(props)+" when running "+this);
+ return task.apply(new ProcessTool());
+ }
+ @Override
+ protected void preExecChecks() {}
+ @Override
+ protected String constructDefaultLoggingPrefix(ConfigBag execFlags) {
+ return "system.exec";
+ }
+ @Override
+ protected String getTargetName() {
+ return "local host";
+ }
+ }.logger(log);
+ }
+
+ /** concrete instance (for generics) */
+ public static class ConcreteSystemProcessTaskFactory<RET> extends SystemProcessTaskFactory<ConcreteSystemProcessTaskFactory<RET>, RET> {
+ public ConcreteSystemProcessTaskFactory(String ...commands) {
+ super(commands);
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
index 0000000,e7d43cd..3c8ce83
mode 000000,100644..100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/catalog/internal/CatalogItemComparatorTest.java
@@@ -1,0 -1,82 +1,86 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.core.catalog.internal;
+
+ import static org.testng.Assert.assertEquals;
+ import static org.testng.Assert.assertTrue;
+
-import org.testng.annotations.Test;
+ import org.apache.brooklyn.api.catalog.CatalogItem;
-import org.apache.brooklyn.core.catalog.internal.CatalogItemBuilder;
-import org.apache.brooklyn.core.catalog.internal.CatalogItemComparator;
++import org.testng.annotations.Test;
+
+ public class CatalogItemComparatorTest {
+ private static final String RC2 = "10.5.8-rc2";
+ private static final String STABLE = "10.5.8";
+
++ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testComparison() {
+ compare("0.0.1", "0.0.2", 1);
+ compare("0.0.2", "0.0.1", -1);
+ compare("0.0.1-qual", "0.0.2", 1);
+ compare("0.0.1.qual", "0.0.2", 1);
- compare("0.0.1-qual", "0.0.1_qual", 0);
++
++ // NB: semantics of this changed in 090-SNAPSHOT not to be 0 unless identical
++ // (remove when we're used to this)
++// compare("0.0.1-qual", "0.0.1_qual", 0);
++
+ compare("0.0.1.qual", "0.0.1.qual", 0);
+ compare("0.0.1", "0.0.2-SNAPSHOT", -1);
+ compare("0.0.1", "0.0.2.SNAPSHOT", -1);
+ compare("0.0.0_SNAPSHOT", "0.0.1-SNAPSHOT-20141111114709760", 1);
+ compare("0.0.0.SNAPSHOT", "0.0.1.SNAPSHOT-20141111114709760", 1);
+ compare("2.0", "2.0.1-BUILD", 1);
+ compare("2.0", "2.0.1.BUILD", 1);
+ compare("2.0.1", "2.0-BUILD", -1);
+ compare("2.0.1", "2.0.0.BUILD", -1);
+ compare("2.0", "2.0-BUILD", -1);
+ // Note not true for .qualifier: compare("2.0", "2.0.0.BUILD", -1);
+ compare("2.1", "2.0-BUILD", -1);
+ compare("2.1", "2.0.0.BUILD", -1);
+ compare("1", "1.3", 1);
+ compare("1-beta", "1-rc2", 1);
+ // Note not true for .qualifier: compare("1.0.0.beta", "1.0.0.rc2", 1);
+ compare("1-beta1", "1-beta10", 1);
+
+ compare(STABLE, "10.5", -1);
+ compare(STABLE, STABLE, 0);
+
+ compare(STABLE, "10.6", 1);
+ compare(STABLE, "10.5.8.1", 1);
+
+ compare("10.5.8-rc2", "10.5.8-rc3", 1) ;
+ compare("10.5.8-rc2", "10.5.8-rc1", -1);
+
+ compare(STABLE, RC2, -1);
+
+ CatalogItemComparator cmp = CatalogItemComparator.INSTANCE;
+ assertTrue(cmp.compare(v(RC2), v("10.5.8-beta1")) == cmp.compare(v(RC2), v("10.5.8-beta3")));
+ }
+
++ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void compare(String v1, String v2, int expected) {
+ CatalogItemComparator cmp = CatalogItemComparator.INSTANCE;
+ assertEquals(cmp.compare(v(v1), v(v2)), expected);
+ assertEquals(cmp.compare(v(v2), v(v1)), -expected);
+ }
+
+ private CatalogItem<?, ?> v(String version) {
+ return CatalogItemBuilder.newEntity("xxx", version).build();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
index 0000000,c1ae306..77ba9c6
mode 000000,100644..100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/AttributeMapTest.java
@@@ -1,0 -1,228 +1,248 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.core.entity;
+
+ import static org.testng.Assert.assertEquals;
+ import static org.testng.Assert.assertTrue;
+
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.List;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+
+ import org.apache.brooklyn.api.entity.Application;
+ import org.apache.brooklyn.api.entity.EntitySpec;
+ import org.apache.brooklyn.api.sensor.AttributeSensor;
+ import org.apache.brooklyn.core.sensor.AttributeMap;
+ import org.apache.brooklyn.core.sensor.Sensors;
+ import org.apache.brooklyn.core.test.entity.TestApplication;
+ import org.apache.brooklyn.core.test.entity.TestEntity;
+ import org.apache.brooklyn.core.test.entity.TestEntityImpl;
+ import org.apache.brooklyn.test.Asserts;
+ import org.apache.brooklyn.util.collections.MutableMap;
+ import org.apache.brooklyn.util.guava.Maybe;
+ import org.testng.annotations.AfterMethod;
+ import org.testng.annotations.BeforeMethod;
+ import org.testng.annotations.Test;
+
+ import com.google.common.base.Function;
+ import com.google.common.base.Functions;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.Lists;
+
+ public class AttributeMapTest {
+ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
+
+ Application app;
+ TestEntity entity;
+ TestEntityImpl entityImpl;
+ AttributeMap map;
+ ExecutorService executor;
+
+ @BeforeMethod(alwaysRun=true)
+ public void setUp() {
+ app = TestApplication.Factory.newManagedInstanceForTests();
+ TestEntity entity = app.addChild(EntitySpec.create(TestEntity.class));
+ entityImpl = (TestEntityImpl) Entities.deproxy(entity);
+ map = new AttributeMap(entityImpl, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
+ executor = Executors.newCachedThreadPool();
+ }
+
+ @AfterMethod(alwaysRun=true)
+ public void tearDown() {
+ if (executor != null) executor.shutdownNow();
+ if (app != null) Entities.destroyAll(app.getManagementContext());
+ }
+
+ // See ENGR-2111
+ @Test
+ public void testConcurrentUpdatesDoNotCauseConcurrentModificationException() throws Exception {
+ List<Future<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+ Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
+
+ @Test
+ public void testConcurrentUpdatesAndGetsDoNotCauseConcurrentModificationException() throws Exception {
+ List<Future<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i, "");
+ Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, "a"));
+ Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
+ futures.add(future);
+ futures.add(future2);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ }
+
+ @Test
++ public void testConcurrentUpdatesAllApplied() throws Exception {
++ List<Future<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("attributeMapTest.exampleSensor"+i);
++ Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor, i));
++ futures.add(future);
++ }
++
++ for (Future<?> future : futures) {
++ future.get();
++ }
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("attributeMapTest.exampleSensor"+i);
++ assertEquals(map.getValue(nextSensor), (Integer)i);
++ }
++ }
++
++ @Test
+ public void testStoredSensorsCanBeRetrieved() throws Exception {
+ AttributeSensor<String> sensor1 = Sensors.newStringSensor("a", "");
+ AttributeSensor<String> sensor2 = Sensors.newStringSensor("b.c", "");
+
+ map.update(sensor1, "1val");
+ map.update(sensor2, "2val");
+
+ assertEquals(map.getValue(sensor1), "1val");
+ assertEquals(map.getValue(sensor2), "2val");
+
+ assertEquals(map.getValue(ImmutableList.of("a")), "1val");
+ assertEquals(map.getValue(ImmutableList.of("b","c")), "2val");
+ }
+
+ @Test
+ public void testStoredByPathCanBeRetrieved() throws Exception {
+ AttributeSensor<String> sensor1 = Sensors.newStringSensor("a", "");
+ AttributeSensor<String> sensor2 = Sensors.newStringSensor("b.c", "");
+
+ map.update(ImmutableList.of("a"), "1val");
+ map.update(ImmutableList.of("b", "c"), "2val");
+
+ assertEquals(map.getValue(sensor1), "1val");
+ assertEquals(map.getValue(sensor2), "2val");
+
+ assertEquals(map.getValue(ImmutableList.of("a")), "1val");
+ assertEquals(map.getValue(ImmutableList.of("b","c")), "2val");
+ }
+
+ @Test
+ public void testCanStoreSensorThenChildSensor() throws Exception {
+ AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
+ AttributeSensor<String> childSensor = Sensors.newStringSensor("a.b", "");
+
+ map.update(sensor, "parentValue");
+ map.update(childSensor, "childValue");
+
+ assertEquals(map.getValue(childSensor), "childValue");
+ assertEquals(map.getValue(sensor), "parentValue");
+ }
+
+ @Test
+ public void testCanStoreChildThenParentSensor() throws Exception {
+ AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
+ AttributeSensor<String> childSensor = Sensors.newStringSensor("a.b", "");
+
+ map.update(childSensor, "childValue");
+ map.update(sensor, "parentValue");
+
+ assertEquals(map.getValue(childSensor), "childValue");
+ assertEquals(map.getValue(sensor), "parentValue");
+ }
+
+ @Test
+ public void testConcurrentModifyAttributeCalls() throws Exception {
+ AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+
+ Function<Integer, Maybe<Integer>> modifier = new Function<Integer, Maybe<Integer>>() {
+ @Override public Maybe<Integer> apply(Integer input) {
+ return Maybe.of((input == null) ? 1 : input + 1);
+ }
+ };
+
+ List<Future<?>> futures = Lists.newArrayList();
+
+ for (int i = 0; i < NUM_TASKS; i++) {
+ Future<?> future = executor.submit(newModifyAttributeCallable(map, sensor, modifier));
+ futures.add(future);
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(map.getValue(sensor), Integer.valueOf(NUM_TASKS));
+ }
+
+ @Test
+ public void testModifyAttributeReturningAbsentDoesNotEmit() throws Exception {
+ AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+ AttributeSensor<Integer> childSensor = Sensors.newIntegerSensor("a.b", "");
+
+ final RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+ entityImpl.subscriptions().subscribe(entityImpl, sensor, listener);
+
+ map.modify(childSensor, Functions.constant(Maybe.<Integer>absent()));
+
+ Asserts.succeedsContinually(new Runnable() {
+ @Override public void run() {
+ assertTrue(Iterables.isEmpty(listener.getEvents()), "events="+listener.getEvents());
+ }});
+ }
+
+ protected <T> Runnable newUpdateMapRunnable(final AttributeMap map, final AttributeSensor<T> attribute, final T val) {
+ return new Runnable() {
+ @Override public void run() {
+ map.update(attribute, val);
+ }
+ };
+ }
+
+ protected <T> Callable<T> newGetAttributeCallable(final AttributeMap map, final AttributeSensor<T> attribute) {
+ return new Callable<T>() {
+ @Override public T call() {
+ return map.getValue(attribute);
+ }
+ };
+ }
+
+ protected <T> Callable<T> newModifyAttributeCallable(final AttributeMap map, final AttributeSensor<T> attribute, final Function<? super T, Maybe<T>> modifier) {
+ return new Callable<T>() {
+ @Override public T call() {
+ return map.modify(attribute, modifier);
+ }
+ };
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
index 0000000,0000000..f606226
new file mode 100644
--- /dev/null
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityConcurrencyTest.java
@@@ -1,0 -1,0 +1,275 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.brooklyn.core.entity;
++
++import static org.testng.Assert.assertEquals;
++
++import java.util.List;
++import java.util.concurrent.Executors;
++
++import org.apache.brooklyn.api.entity.EntitySpec;
++import org.apache.brooklyn.api.location.Location;
++import org.apache.brooklyn.api.location.LocationSpec;
++import org.apache.brooklyn.api.policy.PolicySpec;
++import org.apache.brooklyn.api.sensor.AttributeSensor;
++import org.apache.brooklyn.api.sensor.EnricherSpec;
++import org.apache.brooklyn.config.ConfigKey;
++import org.apache.brooklyn.core.config.ConfigKeys;
++import org.apache.brooklyn.core.enricher.BasicEnricherTest;
++import org.apache.brooklyn.core.feed.AbstractFeed;
++import org.apache.brooklyn.core.location.SimulatedLocation;
++import org.apache.brooklyn.core.policy.basic.BasicPolicyTest;
++import org.apache.brooklyn.core.sensor.Sensors;
++import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
++import org.apache.brooklyn.core.test.entity.TestEntity;
++import org.apache.brooklyn.entity.group.BasicGroup;
++import org.apache.brooklyn.entity.stock.BasicEntity;
++import org.apache.brooklyn.test.Asserts;
++import org.testng.annotations.AfterMethod;
++import org.testng.annotations.BeforeMethod;
++import org.testng.annotations.Test;
++
++import com.google.common.base.Predicates;
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.Lists;
++import com.google.common.util.concurrent.Futures;
++import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.ListeningExecutorService;
++import com.google.common.util.concurrent.MoreExecutors;
++
++public class EntityConcurrencyTest extends BrooklynAppUnitTestSupport {
++ TestEntity entity;
++ ListeningExecutorService executor;
++
++ @BeforeMethod(alwaysRun=true)
++ @Override
++ public void setUp() throws Exception {
++ super.setUp();
++ entity = app.addChild(EntitySpec.create(TestEntity.class));
++ executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
++ }
++
++ @AfterMethod(alwaysRun=true)
++ @Override
++ public void tearDown() throws Exception {
++ super.tearDown();
++ if (executor != null) executor.shutdownNow();
++ }
++
++ @Test
++ public void testConcurrentSetAttribute() throws Exception {
++ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ final AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("EntityConcurrencyTest.exampleSensor"+i);
++ final int val = i;
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.sensors().set(nextSensor, val);
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ AttributeSensor<Integer> nextSensor = Sensors.newIntegerSensor("EntityConcurrencyTest.exampleSensor"+i);
++ assertEquals(entity.sensors().get(nextSensor), (Integer)i, "i="+i);
++ }
++ }
++
++ @Test
++ public void testConcurrentSetConfig() throws Exception {
++ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ final ConfigKey<Integer> nextKey = ConfigKeys.newIntegerConfigKey("EntityConcurrencyTest.exampleConfig"+i);
++ final int val = i;
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.config().set(nextKey, val);
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ final ConfigKey<Integer> nextKey = ConfigKeys.newIntegerConfigKey("EntityConcurrencyTest.exampleConfig"+i);
++ assertEquals(entity.config().get(nextKey), (Integer)i, "i="+i);
++ }
++ }
++
++ @Test
++ public void testConcurrentAddTag() throws Exception {
++ final int NUM_TASKS = Math.min(500 * Runtime.getRuntime().availableProcessors(), 1000);
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++ List<Integer> tags = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ final int val = i;
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.tags().addTag(val);
++ }});
++ futures.add(future);
++ tags.add(val);
++ }
++
++ Futures.allAsList(futures).get();
++
++ Asserts.assertEqualsIgnoringOrder(entity.tags().getTags(), tags);
++ }
++
++ @Test
++ public void testConcurrentAddGroup() throws Exception {
++ final int NUM_TASKS = 100;
++
++ List<BasicGroup> groups = Lists.newArrayList();
++ for (int i = 0; i < NUM_TASKS; i++) {
++ groups.add(app.addChild(EntitySpec.create(BasicGroup.class)));
++ }
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (final BasicGroup group : groups) {
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ group.addMember(entity);
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ Asserts.assertEqualsIgnoringOrder(entity.groups(), groups);
++ }
++
++ @Test
++ public void testConcurrentAddChild() throws Exception {
++ final int NUM_TASKS = 100;
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.addChild(EntitySpec.create(BasicEntity.class));
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ assertEquals(entity.getChildren().size(), NUM_TASKS);
++ Asserts.assertEqualsIgnoringOrder(entity.getChildren(), mgmt.getEntityManager().findEntities(Predicates.instanceOf(BasicEntity.class)));
++ }
++
++ @Test
++ public void testConcurrentAddLocation() throws Exception {
++ final int NUM_TASKS = 100;
++
++ List<Location> locs = Lists.newArrayList();
++ for (int i = 0; i < NUM_TASKS; i++) {
++ locs.add(mgmt.getLocationManager().createLocation(LocationSpec.create(SimulatedLocation.class)));
++ }
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (final Location loc : locs) {
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.addLocations(ImmutableList.of(loc));
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ Asserts.assertEqualsIgnoringOrder(entity.getLocations(), locs);
++ }
++
++ @Test
++ public void testConcurrentAddPolicy() throws Exception {
++ final int NUM_TASKS = 100;
++
++ int numPrePolicies = entity.policies().size();
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.policies().add(PolicySpec.create(BasicPolicyTest.MyPolicy.class));
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ assertEquals(entity.policies().size(), NUM_TASKS+numPrePolicies);
++ }
++
++ @Test
++ public void testConcurrentAddEnricher() throws Exception {
++ final int NUM_TASKS = 100;
++
++ int numPreEnrichers = entity.enrichers().size();
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.enrichers().add(EnricherSpec.create(BasicEnricherTest.MyEnricher.class));
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ assertEquals(entity.enrichers().size(), NUM_TASKS+numPreEnrichers);
++ }
++
++ @Test
++ public void testConcurrentAddFeed() throws Exception {
++ final int NUM_TASKS = 100;
++
++ List<ListenableFuture<?>> futures = Lists.newArrayList();
++
++ for (int i = 0; i < NUM_TASKS; i++) {
++ ListenableFuture<?> future = executor.submit(new Runnable() {
++ @Override public void run() {
++ entity.feeds().addFeed(new MyFeed());
++ }});
++ futures.add(future);
++ }
++
++ Futures.allAsList(futures).get();
++
++ assertEquals(entity.feeds().getFeeds().size(), NUM_TASKS);
++ }
++ private static class MyFeed extends AbstractFeed {
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
index 0000000,7b68d28..3b68738
mode 000000,100644..100644
--- a/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/entity/EntityFunctionsTest.java
@@@ -1,0 -1,77 +1,83 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.core.entity;
+
+ import static org.testng.Assert.assertEquals;
+ import static org.testng.Assert.assertNull;
+
+ import org.apache.brooklyn.api.entity.EntitySpec;
+ import org.apache.brooklyn.api.location.Location;
+ import org.apache.brooklyn.core.entity.EntityFunctions;
+ import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+ import org.apache.brooklyn.core.test.entity.TestEntity;
+ import org.testng.annotations.BeforeMethod;
+ import org.testng.annotations.Test;
+
+ import com.google.common.base.Predicates;
+ import com.google.common.collect.ImmutableList;
+
+ public class EntityFunctionsTest extends BrooklynAppUnitTestSupport {
+
+ private TestEntity entity;
+ private Location loc;
+
+ @BeforeMethod(alwaysRun=true)
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).displayName("mydisplayname"));
+ loc = app.getManagementContext().getLocationRegistry().resolve("localhost");
+ }
+
+ @Test
+ public void testAttribute() throws Exception {
+ entity.sensors().set(TestEntity.NAME, "myname");
+ assertEquals(EntityFunctions.attribute(TestEntity.NAME).apply(entity), "myname");
+ assertNull(EntityFunctions.attribute(TestEntity.SEQUENCE).apply(entity));
+ }
++
++ @Test
++ public void testEntityAttributeTest() {
++ entity.sensors().set(TestEntity.NAME, "myname");
++ assertEquals(EntityFunctions.attribute(entity, TestEntity.NAME).apply(new Object()), "myname");
++ }
+
+ @Test
+ public void testConfig() throws Exception {
+ entity.config().set(TestEntity.CONF_NAME, "myname");
+ assertEquals(EntityFunctions.config(TestEntity.CONF_NAME).apply(entity), "myname");
+ assertNull(EntityFunctions.config(TestEntity.CONF_OBJECT).apply(entity));
+ }
+
+ @Test
+ public void testDisplayName() throws Exception {
+ assertEquals(EntityFunctions.displayName().apply(entity), "mydisplayname");
+ }
+
+ @Test
+ public void testId() throws Exception {
+ assertEquals(EntityFunctions.id().apply(entity), entity.getId());
+ }
+
+ @Test
+ public void testLocationMatching() throws Exception {
+ entity.addLocations(ImmutableList.of(loc));
+ assertEquals(EntityFunctions.locationMatching(Predicates.alwaysTrue()).apply(entity), loc);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistryTest.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/test/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistryTest.java
index 0000000,0000000..6f2f573
new file mode 100644
--- /dev/null
+++ b/brooklyn-server/core/src/test/java/org/apache/brooklyn/core/typereg/BasicBrooklynTypeRegistryTest.java
@@@ -1,0 -1,0 +1,186 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.brooklyn.core.typereg;
++
++import org.apache.brooklyn.api.typereg.RegisteredType;
++import org.apache.brooklyn.core.test.BrooklynMgmtUnitTestSupport;
++import org.apache.brooklyn.test.Asserts;
++import org.apache.brooklyn.util.collections.MutableList;
++import org.apache.brooklyn.util.collections.MutableSet;
++import org.testng.Assert;
++import org.testng.annotations.Test;
++
++import com.google.common.base.Predicates;
++import com.google.common.collect.Iterables;
++
++public class BasicBrooklynTypeRegistryTest extends BrooklynMgmtUnitTestSupport {
++
++ private BasicBrooklynTypeRegistry registry() {
++ return (BasicBrooklynTypeRegistry) mgmt.getTypeRegistry();
++ }
++
++ private void add(RegisteredType type) {
++ add(type, false);
++ }
++ private void add(RegisteredType type, boolean canForce) {
++ registry().addToLocalUnpersistedTypeRegistry(type, canForce);
++ }
++
++ private final static RegisteredType SAMPLE_TYPE = RegisteredTypes.bean("item.A", "1", new BasicTypeImplementationPlan("ignore", null), String.class);
++ private final static RegisteredType SAMPLE_TYPE2 = RegisteredTypes.bean("item.A", "2", new BasicTypeImplementationPlan("ignore", null), String.class);
++
++ @Test
++ public void testAddAndGet() {
++ Assert.assertFalse( Iterables.contains(registry().getAll(), SAMPLE_TYPE) );
++ Assert.assertNull( registry().get(SAMPLE_TYPE.getSymbolicName(), SAMPLE_TYPE.getVersion()) );
++ Assert.assertNull( registry().get(SAMPLE_TYPE.getId()) );
++ add(SAMPLE_TYPE);
++
++ Assert.assertTrue( Iterables.contains(registry().getAll(), SAMPLE_TYPE) );
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getSymbolicName(), SAMPLE_TYPE.getVersion()), SAMPLE_TYPE );
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++
++ Assert.assertTrue( Iterables.contains(registry().getMatching(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName())), SAMPLE_TYPE) );
++ }
++
++ @Test
++ public void testCantAddSameIdUnlessSameInstanceOrForced() {
++ add(SAMPLE_TYPE);
++ RegisteredType sampleTypeClone = RegisteredTypes.bean("item.A", "1", new BasicTypeImplementationPlan("ignore", null), String.class);
++ add(sampleTypeClone, true);
++ Assert.assertNotEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++
++ add(SAMPLE_TYPE, true);
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++
++ try {
++ add(sampleTypeClone);
++ Asserts.shouldHaveFailedPreviously();
++ } catch (Exception e) {
++ Asserts.expectedFailureContains(e, SAMPLE_TYPE.getSymbolicName());
++ }
++
++ // only one entry
++ Assert.assertEquals( Iterables.size(registry().getMatching(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 1);
++ // unversioned request returns sample
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getSymbolicName()), SAMPLE_TYPE );
++ }
++
++ @Test
++ public void testGettingBestVersion() {
++ add(SAMPLE_TYPE);
++ add(SAMPLE_TYPE2);
++
++ Assert.assertTrue( Iterables.contains(registry().getAll(), SAMPLE_TYPE) );
++ Assert.assertTrue( Iterables.contains(registry().getAll(), SAMPLE_TYPE2) );
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getId()), SAMPLE_TYPE );
++ Assert.assertEquals( registry().get(SAMPLE_TYPE2.getId()), SAMPLE_TYPE2 );
++ Assert.assertNotEquals( registry().get(SAMPLE_TYPE2.getId()), SAMPLE_TYPE );
++
++ Assert.assertEquals( Iterables.size(registry().getMatching(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 2);
++
++ // unversioned request returns latest
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getSymbolicName()), SAMPLE_TYPE2 );
++ }
++
++ @Test
++ public void testGetWithFilter() {
++ add(SAMPLE_TYPE);
++
++ Assert.assertEquals( Iterables.size(registry().getMatching(Predicates.and(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()),
++ RegisteredTypePredicates.subtypeOf(String.class)
++ ))), 1 );
++ Assert.assertTrue( Iterables.isEmpty(registry().getMatching(Predicates.and(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()),
++ RegisteredTypePredicates.subtypeOf(Integer.class)
++ ))) );
++ }
++
++ @Test
++ public void testGetWithContext() {
++ add(SAMPLE_TYPE);
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getId(),
++ RegisteredTypeLoadingContexts.bean(String.class)), SAMPLE_TYPE );
++ Assert.assertEquals( registry().get(SAMPLE_TYPE.getId(),
++ RegisteredTypeLoadingContexts.bean(Integer.class)), null );
++ }
++
++ @Test
++ public void testAlias() {
++ add(SAMPLE_TYPE);
++ add(SAMPLE_TYPE2);
++
++ RegisteredType sampleType15WithAliases = RegisteredTypes.addAliases(
++ RegisteredTypes.bean("item.A", "1.1", new BasicTypeImplementationPlan("ignore", null), String.class),
++ MutableList.of("my_a", "the_a"));
++ add(sampleType15WithAliases);
++ Assert.assertEquals(sampleType15WithAliases.getAliases(), MutableSet.of("my_a", "the_a"));
++
++ Assert.assertEquals( Iterables.size(registry().getMatching(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 3);
++
++ Assert.assertEquals( registry().get("my_a"), sampleType15WithAliases );
++ Assert.assertEquals( registry().get("the_a"), sampleType15WithAliases );
++ Assert.assertEquals( registry().get(sampleType15WithAliases.getId()), sampleType15WithAliases );
++
++ // but unadorned type still returns v2
++ Assert.assertEquals( registry().get(sampleType15WithAliases.getSymbolicName()), SAMPLE_TYPE2 );
++
++ // and filters work
++ Assert.assertEquals( registry().getMatching(RegisteredTypePredicates.alias("the_a")),
++ MutableList.of(sampleType15WithAliases) );
++ Assert.assertEquals( registry().get("my_a",
++ RegisteredTypeLoadingContexts.bean(String.class)), sampleType15WithAliases );
++ Assert.assertEquals( registry().get("my_a",
++ RegisteredTypeLoadingContexts.bean(Integer.class)), null );
++ }
++
++ @Test
++ public void testTags() {
++ add(SAMPLE_TYPE);
++ add(SAMPLE_TYPE2);
++
++ RegisteredType sampleType15WithTags = RegisteredTypes.addTags(
++ RegisteredTypes.bean("item.A", "1.1", new BasicTypeImplementationPlan("ignore", null), String.class),
++ MutableList.of("my_a", "the_a"));
++ add(sampleType15WithTags);
++ Assert.assertEquals(sampleType15WithTags.getTags(), MutableSet.of("my_a", "the_a"));
++
++ Assert.assertEquals( Iterables.size(registry().getMatching(
++ RegisteredTypePredicates.symbolicName(SAMPLE_TYPE.getSymbolicName()))), 3);
++
++ Assert.assertEquals( registry().get(sampleType15WithTags.getId()), sampleType15WithTags );
++
++ // and filters work
++ Assert.assertEquals( registry().getMatching(RegisteredTypePredicates.tag("the_a")),
++ MutableList.of(sampleType15WithTags) );
++
++ // but can't lookup by tag as a get
++ Assert.assertEquals( registry().get("my_a"), null );
++
++ // and unadorned type still returns v2
++ Assert.assertEquals( registry().get(sampleType15WithTags.getSymbolicName()), SAMPLE_TYPE2 );
++
++ }
++
++}