You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:50:46 UTC
[04/51] [abbrv] [partial] brooklyn-server git commit: move subdir
from incubator up a level as it is promoted to its own repo (first
non-incubator commit!)
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
deleted file mode 100644
index 57d4712..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.mapOf;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.mgmt.SubscriptionContext;
-import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
-import org.apache.brooklyn.api.mgmt.SubscriptionManager;
-import org.apache.brooklyn.api.sensor.Sensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
-
-import groovy.lang.Closure;
-
-/**
- * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}.
- */
-public class BasicSubscriptionContext implements SubscriptionContext {
-
- private static final Logger LOG = LoggerFactory.getLogger(BasicSubscriptionContext.class);
-
- private final SubscriptionManager manager;
- private final Object subscriber;
- private final Map<String,Object> flags;
-
- public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
- this(Collections.<String,Object>emptyMap(), manager, subscriber);
- }
-
- public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {
- this.manager = manager;
- this.subscriber = subscriber;
- this.flags = mapOf("subscriber", subscriber);
- if (flags!=null) this.flags.putAll(flags);
- }
-
- @SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, Closure c) {
- return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, c);
- }
-
- @SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
- return subscribe(newFlags, producer, sensor, toSensorEventListener(c));
- }
-
- @Override
- public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener);
- }
-
- @Override
- public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
- if (newFlags != null) subscriptionFlags.putAll(newFlags);
- return manager.subscribe(subscriptionFlags, producer, sensor, listener);
- }
-
- @SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, Closure c) {
- return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, c);
- }
-
- @SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, Closure c) {
- return subscribeToChildren(newFlags, parent, sensor, toSensorEventListener(c));
- }
-
- @Override
- public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener);
- }
-
- @Override
- public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
- if (newFlags != null) subscriptionFlags.putAll(newFlags);
- return manager.subscribeToChildren(subscriptionFlags, parent, sensor, listener);
- }
-
- @SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, Closure c) {
- return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, c);
- }
-
- @SuppressWarnings("rawtypes")
- public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, Closure c) {
- return subscribeToMembers(newFlags, parent, sensor, toSensorEventListener(c));
- }
-
- @Override
- public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener);
- }
-
- @Override
- public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
- Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
- if (newFlags != null) subscriptionFlags.putAll(newFlags);
- return manager.subscribeToMembers(subscriptionFlags, parent, sensor, listener);
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public boolean unsubscribe(SubscriptionHandle subscriptionId) {
- Preconditions.checkNotNull(subscriptionId, "subscriptionId must not be null");
- Preconditions.checkArgument(Objects.equal(subscriber, ((Subscription) subscriptionId).subscriber), "The subscriptionId is for a different "+subscriber+"; expected "+((Subscription) subscriptionId).subscriber);
- return manager.unsubscribe(subscriptionId);
- }
-
- /** @see SubscriptionManager#publish(SensorEvent) */
- @Override
- public <T> void publish(SensorEvent<T> event) {
- manager.publish(event);
- }
-
- /** Return the subscriptions associated with this context */
- @Override
- public Set<SubscriptionHandle> getSubscriptions() {
- return manager.getSubscriptionsForSubscriber(subscriber);
- }
-
- @Override
- public int unsubscribeAll() {
- int count = 0;
-
- // To avoid ConcurrentModificationException when copying subscriptions, need to synchronize on it
- Set<SubscriptionHandle> subscriptions = getSubscriptions();
- Collection<SubscriptionHandle> subscriptionsCopy;
- synchronized (subscriptions) {
- subscriptionsCopy = ImmutableList.copyOf(subscriptions);
- }
-
- for (SubscriptionHandle s : subscriptionsCopy) {
- count++;
- boolean result = unsubscribe(s);
- if (!result) LOG.warn("When unsubscribing from all of {}, unsubscribe of {} return false", subscriber, s);
- }
- return count;
- }
-
- @SuppressWarnings("rawtypes")
- private <T> SensorEventListener<T> toSensorEventListener(final Closure c) {
- return new SensorEventListener<T>() {
- @Override public void onEvent(SensorEvent<T> event) {
- c.call(event);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
deleted file mode 100644
index c8ef0e6..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynGarbageCollector.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.ConcurrentModificationException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.mgmt.HasTaskChildren;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.internal.BrooklynProperties;
-import org.apache.brooklyn.core.internal.storage.BrooklynStorage;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.task.BasicExecutionManager;
-import org.apache.brooklyn.util.core.task.ExecutionListener;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.javalang.MemoryUsageTracker;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.annotations.Beta;
-import com.google.common.collect.Iterables;
-
-/**
- * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory.
- *
- * The deletion policy is configurable:
- * <ul>
- * <li>Period - how frequently to look at the existing tasks to delete some, if required
- * <li>Max task age - the time after which a completed task will be automatically deleted
- * (i.e. any root task completed more than maxTaskAge ago will be deleted)
- * <li>Max tasks per <various categories> - the maximum number of tasks to be kept for a given tag,
- * split into categories based on what is seeming to be useful
- * </ul>
- *
- * The default is to check with a period of one minute, deleting tasks after 30 days,
- * and keeping at most 100000 tasks in the system,
- * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag
- * within that entity (or global if not attached to an entity).
- *
- * @author aled
- */
-public class BrooklynGarbageCollector {
-
- private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class);
-
- public static final ConfigKey<Duration> GC_PERIOD = ConfigKeys.newDurationConfigKey(
- "brooklyn.gc.period", "the period for checking if any tasks need to be deleted",
- Duration.minutes(1));
-
- public static final ConfigKey<Boolean> DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey(
- "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false);
-
- /**
- * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task?
- * default to yes, despite it can be some extra loops, to make sure we GC them promptly.
- * @since 0.7.0 */
- // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked
- // transient so it is destroyed prompty; there may be others, however;
- // but OTOH it might be expensive to check for these all the time!
- // TODO probably we can set this false (remove this and related code),
- // and just rely on usual GC to pick up background tasks; the lifecycle of background task
- // should normally be independent of the submitter. (DST was the exception, and marking
- // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not,
- // and we don't want the submitted to show up at the root in the GUI, which it will if its
- // submitter has been GC'd)
- @Beta
- public static final ConfigKey<Boolean> CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey(
- "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true);
-
- public static final ConfigKey<Integer> MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey(
- "brooklyn.gc.maxTasksPerTag",
- "the maximum number of tasks to be kept for a given tag "
- + "within an execution context (e.g. entity); "
- + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full",
- 50);
-
- public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey(
- "brooklyn.gc.maxTasksPerEntity",
- "the maximum number of tasks to be kept for a given entity",
- 1000);
-
- public static final ConfigKey<Integer> MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey(
- "brooklyn.gc.maxTasksGlobal",
- "the maximum number of tasks to be kept across the entire system",
- 100000);
-
- public static final ConfigKey<Duration> MAX_TASK_AGE = ConfigKeys.newDurationConfigKey(
- "brooklyn.gc.maxTaskAge",
- "the duration after which a completed task will be automatically deleted",
- Duration.days(30));
-
- protected final static Comparator<Task<?>> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator<Task<?>>() {
- @Override public int compare(Task<?> t1, Task<?> t2) {
- long end1 = t1.getEndTimeUtc();
- long end2 = t2.getEndTimeUtc();
- return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1);
- }
- };
-
- private final BasicExecutionManager executionManager;
- private final BrooklynStorage storage;
- private final BrooklynProperties brooklynProperties;
- private final ScheduledExecutorService executor;
- private ScheduledFuture<?> activeCollector;
- private Map<Entity,Task<?>> unmanagedEntitiesNeedingGc = new LinkedHashMap<Entity, Task<?>>();
-
- private Duration gcPeriod;
- private final boolean doSystemGc;
- private volatile boolean running = true;
-
- public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) {
- this.executionManager = executionManager;
- this.storage = storage;
- this.brooklynProperties = brooklynProperties;
-
- doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC);
-
- executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override public Thread newThread(Runnable r) {
- return new Thread(r, "brooklyn-gc");
- }});
-
- executionManager.addListener(new ExecutionListener() {
- @Override public void onTaskDone(Task<?> task) {
- BrooklynGarbageCollector.this.onTaskDone(task);
- }});
-
- scheduleCollector(true);
- }
-
- protected synchronized void scheduleCollector(boolean canInterruptCurrent) {
- if (activeCollector != null) activeCollector.cancel(canInterruptCurrent);
-
- gcPeriod = brooklynProperties.getConfig(GC_PERIOD);
- if (gcPeriod!=null) {
- activeCollector = executor.scheduleWithFixedDelay(
- new Runnable() {
- @Override public void run() {
- gcIteration();
- }
- },
- gcPeriod.toMillisecondsRoundingUp(),
- gcPeriod.toMillisecondsRoundingUp(),
- TimeUnit.MILLISECONDS);
- }
- }
-
- /** force a round of Brooklyn garbage collection */
- public void gcIteration() {
- try {
- logUsage("brooklyn gc (before)");
- gcTasks();
- logUsage("brooklyn gc (after)");
-
- if (doSystemGc) {
- // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing
- // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant
- // amount of memory having been released, as though a full-gc had been run. But this is highly
- // dependent on the JVM implementation.
- System.gc(); System.gc();
- logUsage("brooklyn gc (after system gc)");
- }
- } catch (Throwable t) {
- Exceptions.propagateIfFatal(t);
- LOG.warn("Error during management-context GC: "+t, t);
- // previously we bailed on all errors, but I don't think we should do that -Alex
- }
- }
-
- public void logUsage(String prefix) {
- if (LOG.isDebugEnabled())
- LOG.debug(prefix+" - using "+getUsageString());
- }
-
- public static String makeBasicUsageString() {
- return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+
- Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" +
- " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+
- Thread.activeCount()+" threads";
- }
-
- public String getUsageString() {
- return makeBasicUsageString()+"; "+
- "storage: " + storage.getStorageMetrics() + "; " +
- "tasks: " +
- executionManager.getNumActiveTasks()+" active, "+
- executionManager.getNumIncompleteTasks()+" unfinished; "+
- executionManager.getNumInMemoryTasks()+" remembered, "+
- executionManager.getTotalTasksSubmitted()+" total submitted)";
- }
-
- public void shutdownNow() {
- running = false;
- if (activeCollector != null) activeCollector.cancel(true);
- if (executor != null) executor.shutdownNow();
- }
-
- public void onUnmanaged(Entity entity) {
- // defer task deletions until the entity is completely unmanaged
- // (this is usually invoked during the stop sequence)
- synchronized (unmanagedEntitiesNeedingGc) {
- unmanagedEntitiesNeedingGc.put(entity, Tasks.current());
- }
- }
-
- public void deleteTasksForEntity(Entity entity) {
- // remove all references to this entity from tasks
- executionManager.deleteTag(entity);
- executionManager.deleteTag(BrooklynTaskTags.tagForContextEntity(entity));
- executionManager.deleteTag(BrooklynTaskTags.tagForCallerEntity(entity));
- executionManager.deleteTag(BrooklynTaskTags.tagForTargetEntity(entity));
- }
-
- public void onUnmanaged(Location loc) {
- // No-op currently; no tasks are tracked through their location
- }
-
- public void onTaskDone(Task<?> task) {
- if (shouldDeleteTaskImmediately(task)) {
- executionManager.deleteTask(task);
- }
- }
-
- /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */
- @Deprecated
- public boolean shouldDeleteTask(Task<?> task) {
- return shouldDeleteTaskImmediately(task);
- }
- /** whether this task should be deleted on completion,
- * because it is transient, or because it is submitted background without much context information */
- protected boolean shouldDeleteTaskImmediately(Task<?> task) {
- if (!task.isDone()) return false;
-
- Set<Object> tags = task.getTags();
- if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG))
- return true;
- if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG))
- return false;
-
- if (task.getSubmittedByTask()!=null) {
- Task<?> parent = task.getSubmittedByTask();
- if (executionManager.getTask(parent.getId())==null) {
- // parent is already cleaned up
- return true;
- }
- if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) {
- // it is a child, let the parent manage this task's death
- return false;
- }
- Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
- if (associatedEntity!=null) {
- // this is associated to an entity; destroy only if the entity is unmanaged
- return !Entities.isManaged(associatedEntity);
- }
- // if not associated to an entity, then delete immediately
- return true;
- }
-
- // e.g. scheduled tasks, sensor events, etc
- // TODO (in future may keep some of these with another limit, based on a new TagCategory)
- // there may also be a server association for server-side tasks which should be kept
- // (but be careful not to keep too many subscriptions!)
-
- return true;
- }
-
- /**
- * Deletes old tasks. The age/number of tasks to keep is controlled by fields like
- * {@link #maxTasksPerTag} and {@link #maxTaskAge}.
- */
- protected synchronized int gcTasks() {
- // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
- // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help.
- //
- // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That
- // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than
- // 32 bytes overhead per object for HashSet).
- //
- // More notes on optimization is in the history of this file.
-
- if (!running) return 0;
-
- Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD);
- if (!Objects.equal(gcPeriod, newPeriod)) {
- // caller has changed period, reschedule on next run
- scheduleCollector(false);
- }
-
- expireUnmanagedEntityTasks();
- expireAgedTasks();
- expireTransientTasks();
-
- // now look at overcapacity tags, non-entity tags first
-
- Set<Object> taskTags = executionManager.getTaskTags();
-
- int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY);
- int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG);
-
- Map<Object,AtomicInteger> taskNonEntityTagsOverCapacity = MutableMap.of();
- Map<Object,AtomicInteger> taskEntityTagsOverCapacity = MutableMap.of();
-
- Map<Object,AtomicInteger> taskAllTagsOverCapacity = MutableMap.of();
-
- for (Object tag : taskTags) {
- if (isTagIgnoredForGc(tag)) continue;
-
- Set<Task<?>> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag);
- if (tasksWithTag==null) continue;
- AtomicInteger overA = null;
- if (tag instanceof WrappedEntity) {
- int over = tasksWithTag.size() - maxTasksPerEntity;
- if (over>0) {
- overA = new AtomicInteger(over);
- taskEntityTagsOverCapacity.put(tag, overA);
- }
- } else {
- int over = tasksWithTag.size() - maxTasksPerTag;
- if (over>0) {
- overA = new AtomicInteger(over);
- taskNonEntityTagsOverCapacity.put(tag, overA);
- }
- }
- if (overA!=null) {
- taskAllTagsOverCapacity.put(tag, overA);
- }
- }
-
- int deletedCount = 0;
- deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
- deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
- deletedCount += expireSubTasksWhoseSubmitterIsExpired();
-
- int deletedGlobally = expireIfOverCapacityGlobally();
- deletedCount += deletedGlobally;
- if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired();
-
- return deletedCount;
- }
-
- protected static boolean isTagIgnoredForGc(Object tag) {
- if (tag == null) return true;
- if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true;
- if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true;
- if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true;
- if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true;
- if (tag instanceof WrappedStream) {
- return true;
- }
-
- return false;
- }
-
- protected void expireUnmanagedEntityTasks() {
- Iterator<Entry<Entity, Task<?>>> ei;
- synchronized (unmanagedEntitiesNeedingGc) {
- ei = MutableSet.copyOf(unmanagedEntitiesNeedingGc.entrySet()).iterator();
- }
- while (ei.hasNext()) {
- Entry<Entity, Task<?>> ee = ei.next();
- if (Entities.isManaged(ee.getKey())) continue;
- if (ee.getValue()!=null && !ee.getValue().isDone()) continue;
- deleteTasksForEntity(ee.getKey());
- synchronized (unmanagedEntitiesNeedingGc) {
- unmanagedEntitiesNeedingGc.remove(ee.getKey());
- }
- }
- }
-
- protected void expireAgedTasks() {
- Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE);
-
- Collection<Task<?>> allTasks = executionManager.allTasksLive();
- Collection<Task<?>> tasksToDelete = MutableList.of();
-
- try {
- for (Task<?> task: allTasks) {
- if (!task.isDone()) continue;
- if (BrooklynTaskTags.isSubTask(task)) continue;
-
- if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc())))
- tasksToDelete.add(task);
- }
-
- } catch (ConcurrentModificationException e) {
- // delete what we've found so far
- LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
- }
-
- for (Task<?> task: tasksToDelete) {
- executionManager.deleteTask(task);
- }
- }
-
- protected void expireTransientTasks() {
- Set<Task<?>> transientTasks = executionManager.getTasksWithTag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
- for (Task<?> t: transientTasks) {
- if (!t.isDone()) continue;
- executionManager.deleteTask(t);
- }
- }
-
- protected int expireSubTasksWhoseSubmitterIsExpired() {
- // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS
- if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS))
- return 0;
-
- Collection<Task<?>> allTasks = executionManager.allTasksLive();
- Collection<Task<?>> tasksToDelete = MutableList.of();
- try {
- for (Task<?> task: allTasks) {
- if (!task.isDone()) continue;
- Task<?> submitter = task.getSubmittedByTask();
- // if we've leaked, ie a subtask which is not a child task,
- // and the submitter is GC'd, then delete this also
- if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) {
- tasksToDelete.add(task);
- }
- }
-
- } catch (ConcurrentModificationException e) {
- // delete what we've found so far
- LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
- }
-
- for (Task<?> task: tasksToDelete) {
- executionManager.deleteTask(task);
- }
- return tasksToDelete.size();
- }
-
- protected enum TagCategory {
- ENTITY, NON_ENTITY_NORMAL;
-
- public boolean acceptsTag(Object tag) {
- if (isTagIgnoredForGc(tag)) return false;
- if (tag instanceof WrappedEntity) return this==ENTITY;
- if (this==ENTITY) return false;
- return true;
- }
- }
-
-
- /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */
- protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
- if (emptyFilterNeeded) {
- // previous run may have decremented counts
- MutableList<Object> nowOkayTags = MutableList.of();
- for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
- if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
- }
- for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
- }
-
- if (taskTagsInCategoryOverCapacity.isEmpty())
- return 0;
-
- Collection<Task<?>> tasks = executionManager.allTasksLive();
- List<Task<?>> tasksToConsiderDeleting = MutableList.of();
- try {
- for (Task<?> task: tasks) {
- if (!task.isDone()) continue;
-
- Set<Object> tags = task.getTags();
-
- int categoryTags = 0, tooFullCategoryTags = 0;
- for (Object tag: tags) {
- if (category.acceptsTag(tag)) {
- categoryTags++;
- if (taskTagsInCategoryOverCapacity.containsKey(tag))
- tooFullCategoryTags++;
- }
- }
- if (tooFullCategoryTags>0) {
- if (categoryTags==tooFullCategoryTags) {
- // all buckets are full, delete this one
- tasksToConsiderDeleting.add(task);
- } else {
- // if any bucket is under capacity, then give grace to the other buckets in this category
- for (Object tag: tags) {
- if (category.acceptsTag(tag)) {
- AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
- if (over!=null) {
- if (over.decrementAndGet()<=0) {
- // and remove it from over-capacity if so
- taskTagsInCategoryOverCapacity.remove(tag);
- if (taskTagsInCategoryOverCapacity.isEmpty())
- return 0;
- }
- }
- }
- }
- }
- }
- }
-
- } catch (ConcurrentModificationException e) {
- // do CME's happen with these data structures?
- // if so, let's just delete what we've found so far
- LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" "
- + "tags over capacity, expiring old tasks; "
- + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: "
- + taskTagsInCategoryOverCapacity);
-
- Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);
- // now try deleting tasks which are overcapacity for each (non-entity) tag
- int deleted = 0;
- for (Task<?> task: tasksToConsiderDeleting) {
- boolean delete = true;
- for (Object tag: task.getTags()) {
- if (!category.acceptsTag(tag))
- continue;
- if (taskTagsInCategoryOverCapacity.get(tag)==null) {
- // no longer over capacity in this tag
- delete = false;
- break;
- }
- }
- if (delete) {
- // delete this and update overcapacity info
- deleted++;
- executionManager.deleteTask(task);
- for (Object tag: task.getTags()) {
- AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
- if (counter!=null && counter.decrementAndGet()<=0)
- taskTagsInCategoryOverCapacity.remove(tag);
- }
- if (LOG.isTraceEnabled())
- LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
- if (taskTagsInCategoryOverCapacity.isEmpty())
- break;
- }
- }
-
- if (LOG.isDebugEnabled())
- LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
- + "capacities now: " + taskTagsInCategoryOverCapacity);
- return deleted;
- }
-
- protected int expireIfOverCapacityGlobally() {
- Collection<Task<?>> tasksLive = executionManager.allTasksLive();
- if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL))
- return 0;
- LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some");
-
- try {
- tasksLive = MutableList.copyOf(tasksLive);
- } catch (ConcurrentModificationException e) {
- tasksLive = executionManager.getTasksWithAllTags(MutableList.of());
- }
-
- MutableList<Task<?>> tasks = MutableList.of();
- for (Task<?> task: tasksLive) {
- if (task.isDone()) {
- tasks.add(task);
- }
- }
-
- int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL);
- if (numToDelete <= 0) {
- LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any");
- return 0;
- }
-
- Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR);
-
- int numDeleted = 0;
- while (numDeleted < numToDelete && tasks.size()>numDeleted) {
- executionManager.deleteTask( tasks.get(numDeleted++) );
- }
- if (LOG.isDebugEnabled())
- LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size());
- return numDeleted;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java
deleted file mode 100644
index 4c56515..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagementMode.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-/** Indicates how an entity/location/adjunct is treated at a given {@link ManagementContext} */
-public enum BrooklynObjectManagementMode {
- /** item does not exist, not in memory, nor persisted (e.g. creating for first time, or finally destroying) */
- NONEXISTENT,
- /** item exists or existed elsewhere, i.e. there is persisted state, but is not loaded here */
- UNMANAGED_PERSISTED,
- /** item is loaded but read-only (ie not actively managed here) */
- LOADED_READ_ONLY,
- /** item is actively managed here */
- MANAGED_PRIMARY
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java
deleted file mode 100644
index db93b60..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynObjectManagerInternal.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import org.apache.brooklyn.api.objs.BrooklynObject;
-
-public interface BrooklynObjectManagerInternal<T extends BrooklynObject> {
-
- ManagementTransitionMode getLastManagementTransitionMode(String itemId);
- void setManagementTransitionMode(T item, ManagementTransitionMode mode);
-
- /**
- * Begins management for the given rebinded root, recursively;
- * if rebinding as a read-only copy, {@link #setReadOnly(T, boolean)} should be called prior to this.
- */
- void manageRebindedRoot(T item);
-
- void unmanage(final T e, final ManagementTransitionMode info);
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java
deleted file mode 100644
index 91ca5dc..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BrooklynShutdownHooks.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.mgmt.ManagementContext;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
-import org.apache.brooklyn.util.javalang.Threads;
-import org.apache.brooklyn.util.time.CountdownTimer;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-
-public class BrooklynShutdownHooks {
-
- private static final Logger log = LoggerFactory.getLogger(BrooklynShutdownHooks.class);
-
- private static final Duration DEFAULT_SHUTDOWN_TIMEOUT = Duration.TWO_MINUTES;
-
- private static final AtomicBoolean isShutdownHookRegistered = new AtomicBoolean();
- private static final List<Entity> entitiesToStopOnShutdown = Lists.newArrayList();
- private static final List<ManagementContext> managementContextsToStopAppsOnShutdown = Lists.newArrayList();
- private static final List<ManagementContext> managementContextsToTerminateOnShutdown = Lists.newArrayList();
- private static final AtomicBoolean isShutDown = new AtomicBoolean(false);
-
-// private static final Object mutex = new Object();
- private static final Semaphore semaphore = new Semaphore(1);
-
- /**
- * Max time to wait for shutdown to complete, when stopping the entities from {@link #invokeStopOnShutdown(Entity)}.
- * Default is two minutes - deliberately long because stopping cloud VMs can often take a minute.
- */
- private static volatile Duration shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
-
- public static void setShutdownTimeout(Duration val) {
- shutdownTimeout = val;
- }
-
- public static void invokeStopOnShutdown(Entity entity) {
- if (!(entity instanceof Startable)) {
- log.warn("Not adding entity {} for stop-on-shutdown as not an instance of {}", entity, Startable.class.getSimpleName());
- return;
- }
- try {
- semaphore.acquire();
- if (isShutDown.get()) {
- semaphore.release();
- try {
- log.warn("Call to invokeStopOnShutdown for "+entity+" while system already shutting down; invoking stop now and throwing exception");
- Entities.destroy(entity);
- throw new IllegalStateException("Call to invokeStopOnShutdown for "+entity+" while system already shutting down");
- } catch (Exception e) {
- throw new IllegalStateException("Call to invokeStopOnShutdown for "+entity+" while system already shutting down, had error: "+e, e);
- }
- }
-
- try {
- // TODO should be a weak reference in case it is destroyed before shutdown
- // (only applied to certain entities started via launcher so not a big leak)
- entitiesToStopOnShutdown.add(entity);
- } finally {
- semaphore.release();
- }
- addShutdownHookIfNotAlready();
-
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- }
-
- public static void invokeStopAppsOnShutdown(ManagementContext managementContext) {
- try {
- semaphore.acquire();
- if (isShutDown.get()) {
- semaphore.release();
- try {
- log.warn("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down; invoking stop now and throwing exception");
- destroyAndWait(managementContext.getApplications(), shutdownTimeout);
-
- throw new IllegalStateException("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down");
- } catch (Exception e) {
- throw new IllegalStateException("Call to invokeStopAppsOnShutdown for "+managementContext+" while system already shutting down, had error: "+e, e);
- }
- }
-
- // TODO weak reference, as per above
- managementContextsToStopAppsOnShutdown.add(managementContext);
- semaphore.release();
- addShutdownHookIfNotAlready();
-
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- }
-
- public static void invokeTerminateOnShutdown(ManagementContext managementContext) {
- try {
- semaphore.acquire();
- if (isShutDown.get()) {
- semaphore.release();
- try {
- log.warn("Call to invokeStopOnShutdown for "+managementContext+" while system already shutting down; invoking stop now and throwing exception");
- ((ManagementContextInternal)managementContext).terminate();
- throw new IllegalStateException("Call to invokeTerminateOnShutdown for "+managementContext+" while system already shutting down");
- } catch (Exception e) {
- throw new IllegalStateException("Call to invokeTerminateOnShutdown for "+managementContext+" while system already shutting down, had error: "+e, e);
- }
- }
-
- // TODO weak reference, as per above
- managementContextsToTerminateOnShutdown.add(managementContext);
- semaphore.release();
- addShutdownHookIfNotAlready();
-
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- }
-
- private static void addShutdownHookIfNotAlready() {
- if (isShutdownHookRegistered.compareAndSet(false, true)) {
- Threads.addShutdownHook(BrooklynShutdownHookJob.newInstanceForReal());
- }
- }
-
- @VisibleForTesting
- public static class BrooklynShutdownHookJob implements Runnable {
-
- final boolean setStaticShutDownFlag;
-
- private BrooklynShutdownHookJob(boolean setStaticShutDownFlag) {
- this.setStaticShutDownFlag = setStaticShutDownFlag;
- }
-
- public static BrooklynShutdownHookJob newInstanceForReal() {
- return new BrooklynShutdownHookJob(true);
- }
-
- /** testing instance does not actually set the `isShutDown` bit */
- public static BrooklynShutdownHookJob newInstanceForTesting() {
- return new BrooklynShutdownHookJob(false);
- }
-
- @Override
- public void run() {
- // First stop entities; on interrupt, abort waiting for tasks - but let shutdown hook continue
- Set<Entity> entitiesToStop = MutableSet.of();
- try {
- semaphore.acquire();
- if (setStaticShutDownFlag)
- isShutDown.set(true);
- semaphore.release();
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- entitiesToStop.addAll(entitiesToStopOnShutdown);
- for (ManagementContext mgmt: managementContextsToStopAppsOnShutdown) {
- if (mgmt.isRunning()) {
- entitiesToStop.addAll(mgmt.getApplications());
- }
- }
-
- if (entitiesToStop.isEmpty()) {
- log.debug("Brooklyn shutdown: no entities to stop");
- } else {
- log.info("Brooklyn shutdown: stopping entities "+entitiesToStop);
- destroyAndWait(entitiesToStop, shutdownTimeout);
- }
-
- // Then terminate management contexts
- log.debug("Brooklyn terminateOnShutdown shutdown-hook invoked: terminating management contexts: "+managementContextsToTerminateOnShutdown);
- for (ManagementContext managementContext: managementContextsToTerminateOnShutdown) {
- try {
- if (!managementContext.isRunning())
- continue;
- ((ManagementContextInternal)managementContext).terminate();
- } catch (RuntimeException e) {
- log.info("terminateOnShutdown of "+managementContext+" returned error (continuing): "+e, e);
- }
- }
- }
- }
-
- protected static void destroyAndWait(Iterable<? extends Entity> entitiesToStop, Duration timeout) {
- MutableList<Task<?>> stops = MutableList.of();
- for (Entity entityToStop: entitiesToStop) {
- final Entity entity = entityToStop;
- if (!Entities.isManaged(entity)) continue;
- Task<Object> t = Tasks.builder().dynamic(false).displayName("destroying "+entity).body(new Runnable() {
- @Override public void run() { Entities.destroy(entity); }
- }).build();
- stops.add( ((EntityInternal)entity).getExecutionContext().submit(t) );
- }
- CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
- for (Task<?> t: stops) {
- try {
- Duration durationRemaining = timer.getDurationRemaining();
- Object result = t.getUnchecked(durationRemaining.isPositive() ? durationRemaining : Duration.ONE_MILLISECOND);
- if (log.isDebugEnabled()) log.debug("stopOnShutdown of {} completed: {}", t, result);
- } catch (RuntimeInterruptedException e) {
- Thread.currentThread().interrupt();
- if (log.isDebugEnabled()) log.debug("stopOnShutdown of "+t+" interrupted: "+e);
- break;
- } catch (RuntimeException e) {
- Exceptions.propagateIfFatal(e);
- log.warn("Shutdown hook "+t+" returned error (continuing): "+e);
- if (log.isDebugEnabled()) log.debug("stopOnShutdown of "+t+" returned error (continuing to stop others): "+e, e);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java
deleted file mode 100644
index 35841be..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CampYamlParser.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import java.util.Map;
-
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-
-public interface CampYamlParser {
-
- ConfigKey<CampYamlParser> YAML_PARSER_KEY = ConfigKeys.newConfigKey(CampYamlParser.class, "brooklyn.camp.yamlParser");
-
- Map<String, Object> parse(Map<String, Object> map);
-
- Object parse(String val);
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java
deleted file mode 100644
index 7aa700f..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/CollectionChangeListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-public interface CollectionChangeListener<Item> {
- void onItemAdded(Item item);
- void onItemRemoved(Item item);
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java
deleted file mode 100644
index ae0c7a5..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/DeferredBrooklynProperties.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.File;
-import java.io.InputStream;
-import java.net.URL;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.config.ConfigKey.HasConfigKey;
-import org.apache.brooklyn.core.internal.BrooklynProperties;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.core.task.DeferredSupplier;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.guava.Maybe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Maps;
-
-/**
- * Delegates to another {@link BrooklynProperties} implementation, but intercepts all calls to get.
- * The results are transformed: if they are in the external-config format then they are
- * automatically converted to {@link DeferredSupplier}.
- *
- * The external-config format is that same as that for camp-yaml blueprints (i.e.
- * {@code $brooklyn:external("myprovider", "mykey")}.
- */
-public class DeferredBrooklynProperties implements BrooklynProperties {
-
- private static final Logger LOG = LoggerFactory.getLogger(DeferredBrooklynProperties.class);
-
- private static final String BROOKLYN_YAML_PREFIX = "$brooklyn:";
-
- private final BrooklynProperties delegate;
- private final ManagementContextInternal mgmt;
-
- public DeferredBrooklynProperties(BrooklynProperties delegate, ManagementContextInternal mgmt) {
- this.delegate = checkNotNull(delegate, "delegate");
- this.mgmt = checkNotNull(mgmt, "mgmt");
- }
-
- private Object transform(ConfigKey<?> key, Object value) {
- if (value instanceof CharSequence) {
- String raw = value.toString();
- if (raw.startsWith(BROOKLYN_YAML_PREFIX)) {
- CampYamlParser parser = mgmt.getConfig().getConfig(CampYamlParser.YAML_PARSER_KEY);
- if (parser == null) {
- // TODO Should we fail or return the untransformed value?
- // Problem is this gets called during initialisation, e.g. by BrooklynFeatureEnablement calling asMapWithStringKeys()
- // throw new IllegalStateException("Cannot parse external-config for "+key+" because no camp-yaml parser available");
- LOG.debug("Not transforming external-config {}, as no camp-yaml parser available", key);
- return value;
- }
- return parser.parse(raw);
- }
- }
- return value;
- }
-
- private <T> T resolve(ConfigKey<T> key, Object value) {
- Object transformed = transform(key, value);
-
- Object result;
- if (transformed instanceof DeferredSupplier) {
- ExecutionContext exec = mgmt.getServerExecutionContext();
- try {
- result = Tasks.resolveValue(transformed, key.getType(), exec);
- } catch (ExecutionException | InterruptedException e) {
- throw Exceptions.propagate(e);
- }
- } else {
- result = transformed;
- }
-
- return TypeCoercions.coerce(result, key.getTypeToken());
- }
-
- @Override
- public <T> T getConfig(ConfigKey<T> key) {
- T raw = delegate.getConfig(key);
- return resolve(key, raw);
- }
-
- @Override
- public <T> T getConfig(HasConfigKey<T> key) {
- T raw = delegate.getConfig(key);
- return resolve(key.getConfigKey(), raw);
- }
-
- @Override
- public <T> T getConfig(HasConfigKey<T> key, T defaultValue) {
- T raw = delegate.getConfig(key, defaultValue);
- return resolve(key.getConfigKey(), raw);
- }
-
- @Override
- public <T> T getConfig(ConfigKey<T> key, T defaultValue) {
- T raw = delegate.getConfig(key, defaultValue);
- return resolve(key, raw);
- }
-
- @Deprecated
- @Override
- public Object getRawConfig(ConfigKey<?> key) {
- return transform(key, delegate.getRawConfig(key));
- }
-
- @Override
- public Maybe<Object> getConfigRaw(ConfigKey<?> key, boolean includeInherited) {
- Maybe<Object> result = delegate.getConfigRaw(key, includeInherited);
- return (result.isPresent()) ? Maybe.of(transform(key, result.get())) : Maybe.absent();
- }
-
- @Override
- public Map<ConfigKey<?>, Object> getAllConfig() {
- Map<ConfigKey<?>, Object> raw = delegate.getAllConfig();
- Map<ConfigKey<?>, Object> result = Maps.newLinkedHashMap();
- for (Map.Entry<ConfigKey<?>, Object> entry : raw.entrySet()) {
- result.put(entry.getKey(), transform(entry.getKey(), entry.getValue()));
- }
- return result;
- }
-
- @Override
- public Map<String, Object> asMapWithStringKeys() {
- Map<ConfigKey<?>, Object> raw = delegate.getAllConfig();
- Map<String, Object> result = Maps.newLinkedHashMap();
- for (Map.Entry<ConfigKey<?>, Object> entry : raw.entrySet()) {
- result.put(entry.getKey().getName(), transform(entry.getKey(), entry.getValue()));
- }
- return result;
- }
-
- /**
- * Discouraged; returns the String so if it is external config, it will be the
- * {@code $brooklyn:external(...)} format.
- */
- @Override
- @SuppressWarnings("rawtypes")
- @Deprecated
- public String get(Map flags, String key) {
- return delegate.get(flags, key);
- }
-
- /**
- * Discouraged; returns the String so if it is external config, it will be the
- * {@code $brooklyn:external(...)} format.
- */
- @Override
- public String getFirst(String ...keys) {
- return delegate.getFirst(keys);
- }
-
- /**
- * Discouraged; returns the String so if it is external config, it will be the
- * {@code $brooklyn:external(...)} format.
- */
- @Override
- @SuppressWarnings("rawtypes")
- public String getFirst(Map flags, String ...keys) {
- return delegate.getFirst(flags, keys);
- }
-
- @Override
- public BrooklynProperties submap(Predicate<ConfigKey<?>> filter) {
- BrooklynProperties submap = delegate.submap(filter);
- return new DeferredBrooklynProperties(submap, mgmt);
- }
-
- @Override
- public BrooklynProperties addEnvironmentVars() {
- delegate.addEnvironmentVars();
- return this;
- }
-
- @Override
- public BrooklynProperties addSystemProperties() {
- delegate.addSystemProperties();
- return this;
- }
-
- @Override
- public BrooklynProperties addFrom(ConfigBag cfg) {
- delegate.addFrom(cfg);
- return this;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public BrooklynProperties addFrom(Map map) {
- delegate.addFrom(map);
- return this;
- }
-
- @Override
- public BrooklynProperties addFrom(InputStream i) {
- delegate.addFrom(i);
- return this;
- }
-
- @Override
- public BrooklynProperties addFrom(File f) {
- delegate.addFrom(f);
- return this;
- }
-
- @Override
- public BrooklynProperties addFrom(URL u) {
- delegate.addFrom(u);
- return this;
- }
-
- @Override
- public BrooklynProperties addFromUrl(String url) {
- delegate.addFromUrl(url);
- return this;
- }
-
- @Override
- public BrooklynProperties addFromUrlProperty(String urlProperty) {
- delegate.addFromUrlProperty(urlProperty);
- return this;
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public BrooklynProperties addFromMap(Map properties) {
- delegate.addFromMap(properties);
- return this;
- }
-
- @Override
- public boolean putIfAbsent(String key, Object value) {
- return delegate.putIfAbsent(key, value);
- }
-
- @Override
- public String toString() {
- return delegate.toString();
- }
-
- @Override
- public Object put(Object key, Object value) {
- return delegate.put(key, value);
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public void putAll(Map vals) {
- delegate.putAll(vals);
- }
-
- @Override
- public <T> Object put(HasConfigKey<T> key, T value) {
- return delegate.put(key, value);
- }
-
- @Override
- public <T> Object put(ConfigKey<T> key, T value) {
- return delegate.put(key, value);
- }
-
- @Override
- public <T> boolean putIfAbsent(ConfigKey<T> key, T value) {
- return delegate.putIfAbsent(key, value);
- }
-
-
- //////////////////////////////////////////////////////////////////////////////////
- // Methods below from java.util.LinkedHashMap, which BrooklynProperties extends //
- //////////////////////////////////////////////////////////////////////////////////
-
- @Override
- public int size() {
- return delegate.size();
- }
-
- @Override
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return delegate.containsKey(key);
- }
-
- @Override
- public boolean containsValue(Object value) {
- return delegate.containsValue(value);
- }
-
- @Override
- public Object get(Object key) {
- return delegate.get(key);
- }
-
- @Override
- public Object remove(Object key) {
- return delegate.remove(key);
- }
-
- @Override
- public void clear() {
- delegate.clear();
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public Set keySet() {
- return delegate.keySet();
- }
-
- @Override
- @SuppressWarnings("rawtypes")
- public Collection values() {
- return delegate.values();
- }
-
- @Override
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public Set<Map.Entry> entrySet() {
- return delegate.entrySet();
- }
-
- @Override
- public boolean equals(Object o) {
- return delegate.equals(o);
- }
-
- @Override
- public int hashCode() {
- return delegate.hashCode();
- }
-
- // put(Object, Object) already overridden
- //@Override
- //public Object put(Object key, Object value) {
-
- // putAll(Map) already overridden
- //@Override
- //public void putAll(Map m) {
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d03f254b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java
----------------------------------------------------------------------
diff --git a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java
deleted file mode 100644
index f8bb7cb..0000000
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/EffectorUtils.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.internal;
-
-import static org.apache.brooklyn.util.groovy.GroovyJavaMethods.truth;
-
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.brooklyn.api.effector.Effector;
-import org.apache.brooklyn.api.effector.ParameterType;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.core.effector.BasicParameterType;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.exceptions.PropagatedRuntimeException;
-import org.apache.brooklyn.util.guava.Maybe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Utility methods for invoking effectors.
- */
-public class EffectorUtils {
-
- private static final Logger log = LoggerFactory.getLogger(EffectorUtils.class);
-
- /** prepares arguments for an effector either accepting:
- * an array, which should contain the arguments in order, optionally omitting those which have defaults defined;
- * or a map, which should contain the arguments by name, again optionally omitting those which have defaults defined,
- * and in this case also performing type coercion.
- */
- public static Object[] prepareArgsForEffector(Effector<?> eff, Object args) {
- if (args != null && args.getClass().isArray()) {
- return prepareArgsForEffectorFromArray(eff, (Object[]) args);
- }
- if (args instanceof Map) {
- return prepareArgsForEffectorFromMap(eff, (Map) args);
- }
- log.warn("Deprecated effector invocation style for call to "+eff+", expecting a map or an array, got: "+args);
- if (log.isDebugEnabled()) {
- log.debug("Deprecated effector invocation style for call to "+eff+", expecting a map or an array, got: "+args,
- new Throwable("Trace for deprecated effector invocation style"));
- }
- return oldPrepareArgsForEffector(eff, args);
- }
-
- /** method used for calls such as entity.effector(arg1, arg2)
- * get routed here from AbstractEntity.invokeMethod */
- private static Object[] prepareArgsForEffectorFromArray(Effector<?> eff, Object args[]) {
- int newArgsNeeded = eff.getParameters().size();
- if (args.length==1 && args[0] instanceof Map) {
- if (newArgsNeeded!=1 || !eff.getParameters().get(0).getParameterClass().isAssignableFrom(args[0].getClass())) {
- // treat a map in an array as a map passed directly (unless the method takes a single-arg map)
- // this is to support effector(param1: val1)
- return prepareArgsForEffectorFromMap(eff, (Map) args[0]);
- }
- }
- return prepareArgsForEffectorAsMapFromArray(eff, args).values().toArray(new Object[0]);
- }
-
- public static Map prepareArgsForEffectorAsMapFromArray(Effector<?> eff, Object args[]) {
- int newArgsNeeded = eff.getParameters().size();
- List l = Lists.newArrayList();
- l.addAll(Arrays.asList(args));
- Map newArgs = new LinkedHashMap();
-
- for (int index = 0; index < eff.getParameters().size(); index++) {
- ParameterType<?> it = eff.getParameters().get(index);
-
- if (l.size() >= newArgsNeeded) {
- //all supplied (unnamed) arguments must be used; ignore map
- newArgs.put(it.getName(), l.remove(0));
- // TODO do we ignore arguments in the same order that groovy does?
- } else if (!l.isEmpty() && it.getParameterClass().isInstance(l.get(0))) {
- //if there are parameters supplied, and type is correct, they get applied before default values
- //(this is akin to groovy)
- newArgs.put(it.getName(), l.remove(0));
- } else if (it instanceof BasicParameterType && ((BasicParameterType)it).hasDefaultValue()) {
- //finally, default values are used to make up for missing parameters
- newArgs.put(it.getName(), ((BasicParameterType)it).getDefaultValue());
- } else {
- throw new IllegalArgumentException("Invalid arguments (count mismatch) for effector "+eff+": "+args);
- }
-
- newArgsNeeded--;
- }
- if (newArgsNeeded > 0) {
- throw new IllegalArgumentException("Invalid arguments (missing "+newArgsNeeded+") for effector "+eff+": "+args);
- }
- if (!l.isEmpty()) {
- throw new IllegalArgumentException("Invalid arguments ("+l.size()+" extra) for effector "+eff+": "+args);
- }
- return newArgs;
- }
-
- private static Object[] prepareArgsForEffectorFromMap(Effector<?> eff, Map m) {
- m = Maps.newLinkedHashMap(m); //make editable copy
- List newArgs = Lists.newArrayList();
- int newArgsNeeded = eff.getParameters().size();
-
- for (int index = 0; index < eff.getParameters().size(); index++) {
- ParameterType<?> it = eff.getParameters().get(index);
- Object v;
- if (truth(it.getName()) && m.containsKey(it.getName())) {
- // argument is in the map
- v = m.remove(it.getName());
- } else if (it instanceof BasicParameterType && ((BasicParameterType)it).hasDefaultValue()) {
- //finally, default values are used to make up for missing parameters
- v = ((BasicParameterType)it).getDefaultValue();
- } else {
- throw new IllegalArgumentException("Invalid arguments (missing argument "+it+") for effector "+eff+": "+m);
- }
-
- newArgs.add(TypeCoercions.coerce(v, it.getParameterClass()));
- newArgsNeeded--;
- }
- if (newArgsNeeded>0)
- throw new IllegalArgumentException("Invalid arguments (missing "+newArgsNeeded+") for effector "+eff+": "+m);
- if (!m.isEmpty()) {
- log.warn("Unsupported parameter to "+eff+" (ignoring): "+m);
- }
- return newArgs.toArray(new Object[newArgs.size()]);
- }
-
- /**
- * Takes arguments, and returns an array of arguments suitable for use by the Effector
- * according to the ParameterTypes it exposes.
- * <p>
- * The args can be:
- * <ol>
- * <li>an array of ordered arguments
- * <li>a collection (which will be automatically converted to an array)
- * <li>a single argument (which will then be wrapped in an array)
- * <li>a map containing the (named) arguments
- * <li>an array or collection single entry of a map (treated same as 5 above)
- * <li>a semi-populated array or collection that also containing a map as first arg -
- * uses ordered args in array, but uses named values from map in preference.
- * <li>semi-populated array or collection, where default values will otherwise be used.
- * </ol>
- */
- public static Object[] oldPrepareArgsForEffector(Effector<?> eff, Object args) {
- //attempt to coerce unexpected types
- Object[] argsArray;
- if (args==null) {
- argsArray = new Object[0];
- } else if (args.getClass().isArray()) {
- argsArray = (Object[]) args;
- } else {
- if (args instanceof Collection) {
- argsArray = ((Collection) args).toArray(new Object[((Collection) args).size()]);
- } else {
- argsArray = new Object[] { args };
- }
- }
-
- //if args starts with a map, assume it contains the named arguments
- //(but only use it when we have insufficient supplied arguments)
- List l = Lists.newArrayList();
- l.addAll(Arrays.asList(argsArray));
- Map m = (argsArray.length > 0 && argsArray[0] instanceof Map ? Maps.newLinkedHashMap((Map) l.remove(0)) : null);
- List newArgs = Lists.newArrayList();
- int newArgsNeeded = eff.getParameters().size();
- boolean mapUsed = false;
-
- for (int index = 0; index < eff.getParameters().size(); index++) {
- ParameterType<?> it = eff.getParameters().get(index);
-
- if (l.size() >= newArgsNeeded) {
- //all supplied (unnamed) arguments must be used; ignore map
- newArgs.add(l.remove(0));
- } else if (truth(m) && truth(it.getName()) && m.containsKey(it.getName())) {
- //some arguments were not supplied, and this one is in the map
- newArgs.add(m.remove(it.getName()));
- } else if (index == 0 && Map.class.isAssignableFrom(it.getParameterClass())) {
- //if first arg is a map it takes the supplied map
- newArgs.add(m);
- mapUsed = true;
- } else if (!l.isEmpty() && it.getParameterClass().isInstance(l.get(0))) {
- //if there are parameters supplied, and type is correct, they get applied before default values
- //(this is akin to groovy)
- newArgs.add(l.remove(0));
- } else if (it instanceof BasicParameterType && ((BasicParameterType)it).hasDefaultValue()) {
- //finally, default values are used to make up for missing parameters
- newArgs.add(((BasicParameterType)it).getDefaultValue());
- } else {
- throw new IllegalArgumentException("Invalid arguments (count mismatch) for effector "+eff+": "+args);
- }
-
- newArgsNeeded--;
- }
- if (newArgsNeeded > 0) {
- throw new IllegalArgumentException("Invalid arguments (missing "+newArgsNeeded+") for effector "+eff+": "+args);
- }
- if (!l.isEmpty()) {
- throw new IllegalArgumentException("Invalid arguments ("+l.size()+" extra) for effector "+eff+": "+args);
- }
- if (truth(m) && !mapUsed) {
- throw new IllegalArgumentException("Invalid arguments ("+m.size()+" extra named) for effector "+eff+": "+args);
- }
- return newArgs.toArray(new Object[newArgs.size()]);
- }
-
- /**
- * Invokes a method effector so that its progress is tracked. For internal use only, when we know the effector is backed by a method which is local.
- */
- public static <T> T invokeMethodEffector(Entity entity, Effector<T> eff, Object[] args) {
- String name = eff.getName();
-
- try {
- if (log.isDebugEnabled()) log.debug("Invoking effector {} on {}", new Object[] {name, entity});
- if (log.isTraceEnabled()) log.trace("Invoking effector {} on {} with args {}", new Object[] {name, entity, args});
- EntityManagementSupport mgmtSupport = ((EntityInternal)entity).getManagementSupport();
- if (!mgmtSupport.isDeployed()) {
- mgmtSupport.attemptLegacyAutodeployment(name);
- }
- ManagementContextInternal mgmtContext = (ManagementContextInternal) ((EntityInternal) entity).getManagementContext();
-
- mgmtSupport.getEntityChangeListener().onEffectorStarting(eff, args);
- try {
- return mgmtContext.invokeEffectorMethodSync(entity, eff, args);
- } finally {
- mgmtSupport.getEntityChangeListener().onEffectorCompleted(eff);
- }
- } catch (Exception e) {
- handleEffectorException(entity, eff, e);
- // (won't return below)
- return null;
- }
- }
-
- public static void handleEffectorException(Entity entity, Effector<?> effector, Throwable throwable) {
- String message = "Error invoking " + effector.getName() + " at " + entity;
- // Avoid throwing a PropagatedRuntimeException that just repeats the last PropagatedRuntimeException.
- if (throwable instanceof PropagatedRuntimeException &&
- throwable.getMessage() != null &&
- throwable.getMessage().startsWith(message)) {
- throw PropagatedRuntimeException.class.cast(throwable);
- } else {
- log.warn(message + ": " + Exceptions.collapseText(throwable));
- throw new PropagatedRuntimeException(message, throwable);
- }
- }
-
- public static <T> Task<T> invokeEffectorAsync(Entity entity, Effector<T> eff, Map<String,?> parameters) {
- String name = eff.getName();
-
- if (log.isDebugEnabled()) log.debug("Invoking-async effector {} on {}", new Object[] { name, entity });
- if (log.isTraceEnabled()) log.trace("Invoking-async effector {} on {} with args {}", new Object[] { name, entity, parameters });
- EntityManagementSupport mgmtSupport = ((EntityInternal)entity).getManagementSupport();
- if (!mgmtSupport.isDeployed()) {
- mgmtSupport.attemptLegacyAutodeployment(name);
- }
- ManagementContextInternal mgmtContext = (ManagementContextInternal) ((EntityInternal)entity).getManagementContext();
-
- // FIXME seems brittle to have the listeners in the Utils method; better to move into the context.invokeEff
- // (or whatever the last mile before invoking the effector is - though currently there is not such a canonical place!)
- mgmtSupport.getEntityChangeListener().onEffectorStarting(eff, parameters);
- try {
- return mgmtContext.invokeEffector(entity, eff, parameters);
- } finally {
- // FIXME this is really Effector submitted
- mgmtSupport.getEntityChangeListener().onEffectorCompleted(eff);
- }
- }
-
- /** @deprecated since 0.7.0, not used */
- @Deprecated
- public static Effector<?> findEffectorMatching(Entity entity, Method method) {
- outer: for (Effector<?> effector : entity.getEntityType().getEffectors()) {
- if (!effector.getName().equals(entity)) continue;
- if (effector.getParameters().size() != method.getParameterTypes().length) continue;
- for (int i = 0; i < effector.getParameters().size(); i++) {
- if (effector.getParameters().get(i).getParameterClass() != method.getParameterTypes()[i]) continue outer;
- }
- return effector;
- }
- return null;
- }
-
- /** @deprecated since 0.7.0, expects parameters but does not use them! */
- @Deprecated
- public static Effector<?> findEffectorMatching(Set<Effector<?>> effectors, String effectorName, Map<String, ?> parameters) {
- // TODO Support overloading: check parameters as well
- for (Effector<?> effector : effectors) {
- if (effector.getName().equals(effectorName)) {
- return effector;
- }
- }
- return null;
- }
-
- /** matches effectors by name only (not parameters) */
- public static Maybe<Effector<?>> findEffector(Collection<? extends Effector<?>> effectors, String effectorName) {
- for (Effector<?> effector : effectors) {
- if (effector.getName().equals(effectorName)) {
- return Maybe.<Effector<?>>of(effector);
- }
- }
- return Maybe.absent(new NoSuchElementException("No effector with name "+effectorName+" (contenders "+effectors+")"));
- }
-
- /** matches effectors by name only (not parameters), based on what is declared on the entity static type */
- public static Maybe<Effector<?>> findEffectorDeclared(Entity entity, String effectorName) {
- return findEffector(entity.getEntityType().getEffectors(), effectorName);
- }
-
- /** @deprecated since 0.7.0 use {@link #getTaskFlagsForEffectorInvocation(Entity, Effector, ConfigBag)} */
- public static Map<Object,Object> getTaskFlagsForEffectorInvocation(Entity entity, Effector<?> effector) {
- return getTaskFlagsForEffectorInvocation(entity, effector, null);
- }
-
- /** returns a (mutable) map of the standard flags which should be placed on an effector */
- public static Map<Object,Object> getTaskFlagsForEffectorInvocation(Entity entity, Effector<?> effector, ConfigBag parameters) {
- List<Object> tags = MutableList.of(
- BrooklynTaskTags.EFFECTOR_TAG,
- BrooklynTaskTags.tagForEffectorCall(entity, effector.getName(), parameters),
- BrooklynTaskTags.tagForTargetEntity(entity));
- if (Entitlements.getEntitlementContext() != null) {
- tags.add(BrooklynTaskTags.tagForEntitlement(Entitlements.getEntitlementContext()));
- }
- return MutableMap.builder()
- .put("description", "Invoking effector "+effector.getName()
- +" on "+entity.getDisplayName()
- +(parameters!=null ? " with parameters "+parameters.getAllConfig() : ""))
- .put("displayName", effector.getName())
- .put("tags", tags)
- .build();
- }
-
-}