You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by ha...@apache.org on 2015/08/15 15:33:16 UTC
[14/33] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/management
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java b/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java
new file mode 100644
index 0000000..6a133fc
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/ha/OsgiManager.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.ha;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.launch.Framework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.BrooklynVersion;
+
+import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle;
+import org.apache.brooklyn.api.management.ManagementContext;
+
+import brooklyn.config.BrooklynServerConfig;
+import brooklyn.config.BrooklynServerPaths;
+import brooklyn.config.ConfigKey;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.os.Os;
+import brooklyn.util.os.Os.DeletionResult;
+import brooklyn.util.osgi.Osgis;
+import brooklyn.util.osgi.Osgis.BundleFinder;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+public class OsgiManager {
+
+ private static final Logger log = LoggerFactory.getLogger(OsgiManager.class);
+
+ public static final ConfigKey<Boolean> USE_OSGI = BrooklynServerConfig.USE_OSGI;
+
+ /* see Osgis for info on starting framework etc */
+
+ protected ManagementContext mgmt;
+ protected Framework framework;
+ protected File osgiCacheDir;
+
+ public OsgiManager(ManagementContext mgmt) {
+ this.mgmt = mgmt;
+ }
+
+ public void start() {
+ try {
+ osgiCacheDir = BrooklynServerPaths.getOsgiCacheDirCleanedIfNeeded(mgmt);
+
+ // any extra OSGi startup args could go here
+ framework = Osgis.newFrameworkStarted(osgiCacheDir.getAbsolutePath(), false, MutableMap.of());
+
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ public void stop() {
+ try {
+ if (framework!=null) {
+ framework.stop();
+ framework.waitForStop(0); // 0 means indefinite
+ }
+ } catch (BundleException e) {
+ throw Exceptions.propagate(e);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ if (BrooklynServerPaths.isOsgiCacheForCleaning(mgmt, osgiCacheDir)) {
+ // See exception reported in https://issues.apache.org/jira/browse/BROOKLYN-72
+ // We almost always fail to delete he OSGi temp directory due to a concurrent modification.
+ // Therefore keep trying.
+ final AtomicReference<DeletionResult> deletionResult = new AtomicReference<DeletionResult>();
+ Repeater.create("Delete OSGi cache dir")
+ .until(new Callable<Boolean>() {
+ public Boolean call() {
+ deletionResult.set(Os.deleteRecursively(osgiCacheDir));
+ return deletionResult.get().wasSuccessful();
+ }})
+ .limitTimeTo(Duration.ONE_SECOND)
+ .backoffTo(Duration.millis(50))
+ .run();
+ if (deletionResult.get().getThrowable()!=null) {
+ log.debug("Unable to delete "+osgiCacheDir+" (possibly being modified concurrently?): "+deletionResult.get().getThrowable());
+ }
+ }
+ osgiCacheDir = null;
+ framework = null;
+ }
+
+ public synchronized void registerBundle(CatalogBundle bundle) {
+ try {
+ if (checkBundleInstalledThrowIfInconsistent(bundle)) {
+ return;
+ }
+
+ Bundle b = Osgis.install(framework, bundle.getUrl());
+
+ checkCorrectlyInstalled(bundle, b);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ throw new IllegalStateException("Bundle from "+bundle.getUrl()+" failed to install: " + e.getMessage(), e);
+ }
+ }
+
+ private void checkCorrectlyInstalled(CatalogBundle bundle, Bundle b) {
+ String nv = b.getSymbolicName()+":"+b.getVersion().toString();
+
+ if (!isBundleNameEqualOrAbsent(bundle, b)) {
+ throw new IllegalStateException("Bundle already installed as "+nv+" but user explicitly requested "+bundle);
+ }
+
+ List<Bundle> matches = Osgis.bundleFinder(framework)
+ .symbolicName(b.getSymbolicName())
+ .version(b.getVersion().toString())
+ .findAll();
+ if (matches.isEmpty()) {
+ log.error("OSGi could not find bundle "+nv+" in search after installing it from "+bundle.getUrl());
+ } else if (matches.size()==1) {
+ log.debug("Bundle from "+bundle.getUrl()+" successfully installed as " + nv + " ("+b+")");
+ } else {
+ log.warn("OSGi has multiple bundles matching "+nv+", when just installed from "+bundle.getUrl()+": "+matches+"; "
+ + "brooklyn will prefer the URL-based bundle for top-level references but any dependencies or "
+ + "import-packages will be at the mercy of OSGi. "
+ + "It is recommended to use distinct versions for different bundles, and the same URL for the same bundles.");
+ }
+ }
+
+ private boolean checkBundleInstalledThrowIfInconsistent(CatalogBundle bundle) {
+ String bundleUrl = bundle.getUrl();
+ if (bundleUrl != null) {
+ Maybe<Bundle> installedBundle = Osgis.bundleFinder(framework).requiringFromUrl(bundleUrl).find();
+ if (installedBundle.isPresent()) {
+ Bundle b = installedBundle.get();
+ String nv = b.getSymbolicName()+":"+b.getVersion().toString();
+ if (!isBundleNameEqualOrAbsent(bundle, b)) {
+ throw new IllegalStateException("User requested bundle " + bundle + " but already installed as "+nv);
+ } else {
+ log.trace("Bundle from "+bundleUrl+" already installed as "+nv+"; not re-registering");
+ }
+ return true;
+ }
+ } else {
+ Maybe<Bundle> installedBundle = Osgis.bundleFinder(framework).symbolicName(bundle.getSymbolicName()).version(bundle.getVersion()).find();
+ if (installedBundle.isPresent()) {
+ log.trace("Bundle "+bundle+" installed from "+installedBundle.get().getLocation());
+ } else {
+ throw new IllegalStateException("Bundle "+bundle+" not previously registered, but URL is empty.");
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean isBundleNameEqualOrAbsent(CatalogBundle bundle, Bundle b) {
+ return !bundle.isNamed() ||
+ (bundle.getSymbolicName().equals(b.getSymbolicName()) &&
+ bundle.getVersion().equals(b.getVersion().toString()));
+ }
+
+ public <T> Maybe<Class<T>> tryResolveClass(String type, CatalogBundle... catalogBundles) {
+ return tryResolveClass(type, Arrays.asList(catalogBundles));
+ }
+ public <T> Maybe<Class<T>> tryResolveClass(String type, Iterable<CatalogBundle> catalogBundles) {
+ Map<CatalogBundle,Throwable> bundleProblems = MutableMap.of();
+ Set<String> extraMessages = MutableSet.of();
+ for (CatalogBundle catalogBundle: catalogBundles) {
+ try {
+ Maybe<Bundle> bundle = findBundle(catalogBundle);
+ if (bundle.isPresent()) {
+ Bundle b = bundle.get();
+ Class<T> clazz;
+ //Extension bundles don't support loadClass.
+ //Instead load from the app classpath.
+ if (Osgis.isExtensionBundle(b)) {
+ @SuppressWarnings("unchecked")
+ Class<T> c = (Class<T>)Class.forName(type);
+ clazz = c;
+ } else {
+ @SuppressWarnings("unchecked")
+ Class<T> c = (Class<T>)b.loadClass(type);
+ clazz = c;
+ }
+ return Maybe.of(clazz);
+ } else {
+ bundleProblems.put(catalogBundle, ((Maybe.Absent<?>)bundle).getException());
+ }
+
+ } catch (Exception e) {
+ // should come from classloading now; name formatting or missing bundle errors will be caught above
+ Exceptions.propagateIfFatal(e);
+ bundleProblems.put(catalogBundle, e);
+
+ Throwable cause = e.getCause();
+ if (cause != null && cause.getMessage().contains("Unresolved constraint in bundle")) {
+ if (BrooklynVersion.INSTANCE.getVersionFromOsgiManifest()==null) {
+ extraMessages.add("No brooklyn-core OSGi manifest available. OSGi will not work.");
+ }
+ if (BrooklynVersion.isDevelopmentEnvironment()) {
+ extraMessages.add("Your development environment may not have created necessary files. Doing a maven build then retrying may fix the issue.");
+ }
+ if (!extraMessages.isEmpty()) log.warn(Strings.join(extraMessages, " "));
+ log.warn("Unresolved constraint resolving OSGi bundle "+catalogBundle+" to load "+type+": "+cause.getMessage());
+ if (log.isDebugEnabled()) log.debug("Trace for OSGi resolution failure", e);
+ }
+ }
+ }
+ if (bundleProblems.size()==1) {
+ Throwable error = Iterables.getOnlyElement(bundleProblems.values());
+ if (error instanceof ClassNotFoundException && error.getCause()!=null && error.getCause().getMessage()!=null) {
+ error = Exceptions.collapseIncludingAllCausalMessages(error);
+ }
+ return Maybe.absent("Unable to resolve class "+type+" in "+Iterables.getOnlyElement(bundleProblems.keySet())
+ + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), error);
+ } else {
+ return Maybe.absent(Exceptions.create("Unable to resolve class "+type+": "+bundleProblems
+ + (extraMessages.isEmpty() ? "" : " ("+Strings.join(extraMessages, " ")+")"), bundleProblems.values()));
+ }
+ }
+
+ public Maybe<Bundle> findBundle(CatalogBundle catalogBundle) {
+ //Either fail at install time when the user supplied name:version is different
+ //from the one reported from the bundle
+ //or
+ //Ignore user supplied name:version when URL is supplied to be able to find the
+ //bundle even if it's with a different version.
+ //
+ //For now we just log a warning if there's a version discrepancy at install time,
+ //so prefer URL if supplied.
+ BundleFinder bundleFinder = Osgis.bundleFinder(framework);
+ if (catalogBundle.getUrl() != null) {
+ bundleFinder.requiringFromUrl(catalogBundle.getUrl());
+ } else {
+ bundleFinder.symbolicName(catalogBundle.getSymbolicName()).version(catalogBundle.getVersion());
+ }
+ return bundleFinder.find();
+ }
+
+ /**
+ * Iterates through catalogBundles until one contains a resource with the given name.
+ */
+ public URL getResource(String name, Iterable<CatalogBundle> catalogBundles) {
+ for (CatalogBundle catalogBundle: catalogBundles) {
+ try {
+ Maybe<Bundle> bundle = findBundle(catalogBundle);
+ if (bundle.isPresent()) {
+ URL result = bundle.get().getResource(name);
+ if (result!=null) return result;
+ }
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @return An iterable of all resources matching name in catalogBundles.
+ */
+ public Iterable<URL> getResources(String name, Iterable<CatalogBundle> catalogBundles) {
+ List<URL> resources = Lists.newArrayList();
+ for (CatalogBundle catalogBundle : catalogBundles) {
+ try {
+ Maybe<Bundle> bundle = findBundle(catalogBundle);
+ if (bundle.isPresent()) {
+ Enumeration<URL> result = bundle.get().getResources(name);
+ resources.addAll(Collections.list(result));
+ }
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ }
+ }
+ return resources;
+ }
+
+ public Framework getFramework() {
+ return framework;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java
new file mode 100644
index 0000000..1758846
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractManagementContext.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.basic.BrooklynObject;
+import org.apache.brooklyn.api.catalog.BrooklynCatalog;
+import org.apache.brooklyn.api.catalog.CatalogItem;
+import org.apache.brooklyn.api.entity.Effector;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.drivers.EntityDriverManager;
+import org.apache.brooklyn.api.entity.drivers.downloads.DownloadResolverManager;
+import org.apache.brooklyn.api.entity.rebind.RebindManager;
+import org.apache.brooklyn.api.location.LocationRegistry;
+import org.apache.brooklyn.api.management.ExecutionContext;
+import org.apache.brooklyn.api.management.ManagementContext;
+import org.apache.brooklyn.api.management.SubscriptionContext;
+import org.apache.brooklyn.api.management.Task;
+import org.apache.brooklyn.api.management.classloading.BrooklynClassLoadingContext;
+import org.apache.brooklyn.api.management.entitlement.EntitlementManager;
+import org.apache.brooklyn.api.management.ha.HighAvailabilityManager;
+import org.apache.brooklyn.core.management.classloading.JavaBrooklynClassLoadingContext;
+import org.apache.brooklyn.core.management.entitlement.Entitlements;
+import org.apache.brooklyn.core.management.ha.HighAvailabilityManagerImpl;
+
+import brooklyn.catalog.internal.BasicBrooklynCatalog;
+import brooklyn.catalog.internal.CatalogInitialization;
+import brooklyn.catalog.internal.CatalogUtils;
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.StringConfigMap;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.drivers.BasicEntityDriverManager;
+import brooklyn.entity.drivers.downloads.BasicDownloadsManager;
+import brooklyn.entity.rebind.RebindManagerImpl;
+import brooklyn.internal.storage.BrooklynStorage;
+import brooklyn.internal.storage.DataGrid;
+import brooklyn.internal.storage.DataGridFactory;
+import brooklyn.internal.storage.impl.BrooklynStorageImpl;
+import brooklyn.internal.storage.impl.inmemory.InMemoryDataGridFactory;
+
+import org.apache.brooklyn.location.basic.BasicLocationRegistry;
+
+import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.guava.Maybe;
+import brooklyn.util.task.BasicExecutionContext;
+import brooklyn.util.task.Tasks;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractManagementContext implements ManagementContextInternal {
+ private static final Logger log = LoggerFactory.getLogger(AbstractManagementContext.class);
+
+ private static DataGridFactory loadDataGridFactory(BrooklynProperties properties) {
+ String clazzName = properties.getFirst(DataGridFactory.class.getName());
+ if(clazzName == null){
+ clazzName = InMemoryDataGridFactory.class.getName();
+ }
+
+ Class<?> clazz;
+ try{
+ //todo: which classloader should we use?
+ clazz = LocalManagementContext.class.getClassLoader().loadClass(clazzName);
+ }catch(ClassNotFoundException e){
+ throw new IllegalStateException(format("Could not load class [%s]",clazzName),e);
+ }
+
+ Object instance;
+ try {
+ instance = clazz.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(format("Could not instantiate class [%s]",clazzName),e);
+ }
+
+ if(!(instance instanceof DataGridFactory)){
+ throw new IllegalStateException(format("Class [%s] not an instantiate of class [%s]",clazzName, DataGridFactory.class.getName()));
+ }
+
+ return (DataGridFactory)instance;
+ }
+
+ static {
+ ResourceUtils.addClassLoaderProvider(new Function<Object, BrooklynClassLoadingContext>() {
+ @Override
+ public BrooklynClassLoadingContext apply(@Nullable Object input) {
+ if (input instanceof EntityInternal) {
+ EntityInternal internal = (EntityInternal)input;
+ if (internal.getCatalogItemId() != null) {
+ CatalogItem<?, ?> item = CatalogUtils.getCatalogItemOptionalVersion(internal.getManagementContext(), internal.getCatalogItemId());
+ if (item != null) {
+ return CatalogUtils.newClassLoadingContext(internal.getManagementContext(), item);
+ } else {
+ log.error("Can't find catalog item " + internal.getCatalogItemId() +
+ " used for instantiating entity " + internal +
+ ". Falling back to application classpath.");
+ }
+ }
+ return apply(internal.getManagementSupport());
+ }
+
+ if (input instanceof EntityManagementSupport)
+ return apply(((EntityManagementSupport)input).getManagementContext());
+ if (input instanceof ManagementContext)
+ return JavaBrooklynClassLoadingContext.create((ManagementContext) input);
+ return null;
+ }
+ });
+ }
+
+ private final AtomicLong totalEffectorInvocationCount = new AtomicLong();
+
+ protected BrooklynProperties configMap;
+ protected BasicLocationRegistry locationRegistry;
+ protected final BasicBrooklynCatalog catalog;
+ protected ClassLoader baseClassLoader;
+ protected Iterable<URL> baseClassPathForScanning;
+
+ private final RebindManager rebindManager;
+ private final HighAvailabilityManager highAvailabilityManager;
+
+ protected volatile BrooklynGarbageCollector gc;
+
+ private final EntityDriverManager entityDriverManager;
+ protected DownloadResolverManager downloadsManager;
+
+ protected EntitlementManager entitlementManager;
+
+ private final BrooklynStorage storage;
+
+ private volatile boolean running = true;
+ protected boolean startupComplete = false;
+ protected final List<Throwable> errors = Collections.synchronizedList(MutableList.<Throwable>of());
+
+ protected Maybe<URI> uri = Maybe.absent();
+ protected CatalogInitialization catalogInitialization;
+
+ public AbstractManagementContext(BrooklynProperties brooklynProperties){
+ this(brooklynProperties, null);
+ }
+
+ public AbstractManagementContext(BrooklynProperties brooklynProperties, DataGridFactory datagridFactory) {
+ this.configMap = brooklynProperties;
+ this.entityDriverManager = new BasicEntityDriverManager();
+ this.downloadsManager = BasicDownloadsManager.newDefault(configMap);
+ if (datagridFactory == null) {
+ datagridFactory = loadDataGridFactory(brooklynProperties);
+ }
+ DataGrid datagrid = datagridFactory.newDataGrid(this);
+
+ this.catalog = new BasicBrooklynCatalog(this);
+
+ this.storage = new BrooklynStorageImpl(datagrid);
+ this.rebindManager = new RebindManagerImpl(this); // TODO leaking "this" reference; yuck
+ this.highAvailabilityManager = new HighAvailabilityManagerImpl(this); // TODO leaking "this" reference; yuck
+
+ this.entitlementManager = Entitlements.newManager(this, brooklynProperties);
+ }
+
+ @Override
+ public void terminate() {
+ highAvailabilityManager.stop();
+ running = false;
+ rebindManager.stop();
+ storage.terminate();
+ // Don't unmanage everything; different entities get given their events at different times
+ // so can cause problems (e.g. a group finds out that a member is unmanaged, before the
+ // group itself has been told that it is unmanaged).
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public boolean isStartupComplete() {
+ return startupComplete;
+ }
+
+ @Override
+ public BrooklynStorage getStorage() {
+ return storage;
+ }
+
+ @Override
+ public RebindManager getRebindManager() {
+ return rebindManager;
+ }
+
+ @Override
+ public HighAvailabilityManager getHighAvailabilityManager() {
+ return highAvailabilityManager;
+ }
+
+ @Override
+ public long getTotalEffectorInvocations() {
+ return totalEffectorInvocationCount.get();
+ }
+
+ @Override
+ public ExecutionContext getExecutionContext(Entity e) {
+ // BEC is a thin wrapper around EM so fine to create a new one here; but make sure it gets the real entity
+ if (e instanceof AbstractEntity) {
+ return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.tagForContextEntity(e)), getExecutionManager());
+ } else {
+ return ((EntityInternal)e).getManagementSupport().getExecutionContext();
+ }
+ }
+
+ @Override
+ public ExecutionContext getServerExecutionContext() {
+ // BEC is a thin wrapper around EM so fine to create a new one here
+ return new BasicExecutionContext(MutableMap.of("tag", BrooklynTaskTags.BROOKLYN_SERVER_TASK_TAG), getExecutionManager());
+ }
+
+ @Override
+ public SubscriptionContext getSubscriptionContext(Entity e) {
+ // BSC is a thin wrapper around SM so fine to create a new one here
+ return new BasicSubscriptionContext(getSubscriptionManager(), e);
+ }
+
+ @Override
+ public EntityDriverManager getEntityDriverManager() {
+ return entityDriverManager;
+ }
+
+ @Override
+ public DownloadResolverManager getEntityDownloadsManager() {
+ return downloadsManager;
+ }
+
+ @Override
+ public EntitlementManager getEntitlementManager() {
+ return entitlementManager;
+ }
+
+ protected abstract void manageIfNecessary(Entity entity, Object context);
+
+ @Override
+ public <T> Task<T> invokeEffector(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters) {
+ return runAtEntity(entity, eff, parameters);
+ }
+
+ protected <T> T invokeEffectorMethodLocal(Entity entity, Effector<T> eff, Object args) {
+ assert isManagedLocally(entity) : "cannot invoke effector method at "+this+" because it is not managed here";
+ totalEffectorInvocationCount.incrementAndGet();
+ Object[] transformedArgs = EffectorUtils.prepareArgsForEffector(eff, args);
+ return GroovyJavaMethods.invokeMethodOnMetaClass(entity, eff.getName(), transformedArgs);
+ }
+
+ /**
+ * Method for entity to make effector happen with correct semantics (right place, right task context),
+ * when a method is called on that entity.
+ * @throws ExecutionException
+ */
+ @Override
+ public <T> T invokeEffectorMethodSync(final Entity entity, final Effector<T> eff, final Object args) throws ExecutionException {
+ try {
+ Task<?> current = Tasks.current();
+ if (current == null || !entity.equals(BrooklynTaskTags.getContextEntity(current)) || !isManagedLocally(entity)) {
+ manageIfNecessary(entity, eff.getName());
+ // Wrap in a task if we aren't already in a task that is tagged with this entity
+ Task<T> task = runAtEntity( EffectorUtils.getTaskFlagsForEffectorInvocation(entity, eff,
+ ConfigBag.newInstance().configureStringKey("args", args)),
+ entity,
+ new Callable<T>() {
+ public T call() {
+ return invokeEffectorMethodLocal(entity, eff, args);
+ }});
+ return task.get();
+ } else {
+ return invokeEffectorMethodLocal(entity, eff, args);
+ }
+ } catch (Exception e) {
+ // don't need to attach any message or warning because the Effector impl hierarchy does that (see calls to EffectorUtils.handleException)
+ throw new ExecutionException(e);
+ }
+ }
+
+ /**
+ * Whether the master entity record is local, and sensors and effectors can be properly accessed locally.
+ */
+ public abstract boolean isManagedLocally(Entity e);
+
+ /**
+ * Causes the indicated runnable to be run at the right location for the given entity.
+ *
+ * Returns the actual task (if it is local) or a proxy task (if it is remote);
+ * if management for the entity has not yet started this may start it.
+ *
+ * @deprecated since 0.6.0 use effectors (or support {@code runAtEntity(Entity, Effector, Map)} if something else is needed);
+ * (Callable with Map flags is too open-ended, bothersome to support, and not used much)
+ */
+ @Deprecated
+ public abstract <T> Task<T> runAtEntity(@SuppressWarnings("rawtypes") Map flags, Entity entity, Callable<T> c);
+
+ /** Runs the given effector in the right place for the given entity.
+ * The task is immediately submitted in the background, but also recorded in the queueing context (if present)
+ * so it appears as a child, but marked inessential so it does not fail the parent task, who will ordinarily
+ * call {@link Task#get()} on the object and may do their own failure handling.
+ */
+ protected abstract <T> Task<T> runAtEntity(final Entity entity, final Effector<T> eff, @SuppressWarnings("rawtypes") final Map parameters);
+
+ @Override
+ public StringConfigMap getConfig() {
+ return configMap;
+ }
+
+ @Override
+ public BrooklynProperties getBrooklynProperties() {
+ return configMap;
+ }
+
+ @Override
+ public synchronized LocationRegistry getLocationRegistry() {
+ if (locationRegistry==null) locationRegistry = new BasicLocationRegistry(this);
+ return locationRegistry;
+ }
+
+ @Override
+ public BrooklynCatalog getCatalog() {
+ if (!getCatalogInitialization().hasRunAnyInitialization()) {
+ // catalog init is needed; normally this will be done from start sequence,
+ // but if accessed early -- and in tests -- we will load it here
+ getCatalogInitialization().injectManagementContext(this);
+ getCatalogInitialization().populateUnofficial(catalog);
+ }
+ return catalog;
+ }
+
+ @Override
+ public ClassLoader getCatalogClassLoader() {
+ // catalog does not have to be initialized
+ return catalog.getRootClassLoader();
+ }
+
+ /**
+ * Optional class-loader that this management context should use as its base,
+ * as the first-resort in the catalog, and for scanning (if scanning the default in the catalog).
+ * In most instances the default classloader (ManagementContext.class.getClassLoader(), assuming
+ * this was in the JARs used at boot time) is fine, and in those cases this method normally returns null.
+ * (Surefire does some weird stuff, but the default classloader is fine for loading;
+ * however it requires a custom base classpath to be set for scanning.)
+ */
+ @Override
+ public ClassLoader getBaseClassLoader() {
+ return baseClassLoader;
+ }
+
+ /** See {@link #getBaseClassLoader()}. Only settable once and must be invoked before catalog is loaded. */
+ public void setBaseClassLoader(ClassLoader cl) {
+ if (baseClassLoader==cl) return;
+ if (baseClassLoader!=null) throw new IllegalStateException("Cannot change base class loader (in "+this+")");
+ if (catalog!=null) throw new IllegalStateException("Cannot set base class after catalog has been loaded (in "+this+")");
+ this.baseClassLoader = cl;
+ }
+
+ /** Optional mechanism for setting the classpath which should be scanned by the catalog, if the catalog
+ * is scanning the default classpath. Usually it infers the right thing, but some classloaders
+ * (e.g. surefire) do funny things which the underlying org.reflections.Reflections library can't see in to.
+ * <p>
+ * This should normally be invoked early in the server startup. Setting it after the catalog is loaded will not
+ * take effect without an explicit internal call to do so. Once set, it can be changed prior to catalog loading
+ * but it cannot be <i>changed</i> once the catalog is loaded.
+ * <p>
+ * ClasspathHelper.forJavaClassPath() is often a good argument to pass, and is used internally in some places
+ * when no items are found on the catalog. */
+ @Override
+ public void setBaseClassPathForScanning(Iterable<URL> urls) {
+ if (Objects.equal(baseClassPathForScanning, urls)) return;
+ if (baseClassPathForScanning != null) {
+ if (catalog==null)
+ log.warn("Changing scan classpath to "+urls+" from "+baseClassPathForScanning);
+ else
+ throw new IllegalStateException("Cannot change base class path for scanning (in "+this+")");
+ }
+ this.baseClassPathForScanning = urls;
+ }
+ /**
+ * @see #setBaseClassPathForScanning(Iterable)
+ */
+ @Override
+ public Iterable<URL> getBaseClassPathForScanning() {
+ return baseClassPathForScanning;
+ }
+
+ public BrooklynGarbageCollector getGarbageCollector() {
+ return gc;
+ }
+
+ @Override
+ public void setManagementNodeUri(URI uri) {
+ this.uri = Maybe.of(checkNotNull(uri, "uri"));
+ }
+
+ @Override
+ public Maybe<URI> getManagementNodeUri() {
+ return uri;
+ }
+
+ private Object catalogInitMutex = new Object();
+ @Override
+ public CatalogInitialization getCatalogInitialization() {
+ synchronized (catalogInitMutex) {
+ if (catalogInitialization!=null) return catalogInitialization;
+ CatalogInitialization ci = new CatalogInitialization();
+ setCatalogInitialization(ci);
+ return ci;
+ }
+ }
+
+ @Override
+ public void setCatalogInitialization(CatalogInitialization catalogInitialization) {
+ synchronized (catalogInitMutex) {
+ Preconditions.checkNotNull(catalogInitialization, "initialization must not be null");
+ if (this.catalogInitialization!=null && this.catalogInitialization != catalogInitialization)
+ throw new IllegalStateException("Changing catalog init from "+this.catalogInitialization+" to "+catalogInitialization+"; changes not permitted");
+ catalogInitialization.injectManagementContext(this);
+ this.catalogInitialization = catalogInitialization;
+ }
+ }
+
+ public BrooklynObject lookup(String id) {
+ return lookup(id, BrooklynObject.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends BrooklynObject> T lookup(String id, Class<T> type) {
+ Object result;
+ result = getEntityManager().getEntity(id);
+ if (result!=null && type.isInstance(result)) return (T)result;
+
+ result = getLocationManager().getLocation(id);
+ if (result!=null && type.isInstance(result)) return (T)result;
+
+ // TODO policies, enrichers, feeds
+ return null;
+ }
+
+ @Override
+ public List<Throwable> errors() {
+ return errors;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java
new file mode 100644
index 0000000..bec041d
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AbstractSubscriptionManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.SubscriptionHandle;
+import org.apache.brooklyn.api.management.SubscriptionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
+
+public abstract class AbstractSubscriptionManager implements SubscriptionManager {
+
+ // TODO Perhaps could use guava's SynchronizedSetMultimap? But need to check its synchronization guarantees.
+ // That would replace the utils used for subscriptionsBySubscriber etc.
+
+ @SuppressWarnings("unused")
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
+
+ /** performs the actual subscription; should return the subscription parameter as the handle */
+ protected abstract <T> SubscriptionHandle subscribe(Map<String, Object> flags, Subscription<T> s);
+ /** performs the actual publishing -- ie distribution to subscriptions */
+ public abstract <T> void publish(final SensorEvent<T> event);
+
+ public static class EntitySensorToken {
+ Entity e;
+ Sensor<?> s;
+ String sName;
+ public EntitySensorToken(Entity e, Sensor<?> s) {
+ this.e = e;
+ this.s = s;
+ this.sName = (s == null) ? null : checkNotNull(s.getName(), "sensor must have non-null name: %s", s);
+ }
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(e, sName);
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof EntitySensorToken)) return false;
+ if (!Objects.equal(e, ((EntitySensorToken)obj).e)) return false;
+ if (!Objects.equal(sName, ((EntitySensorToken)obj).sName)) return false;
+ return true;
+ }
+ @Override
+ public String toString() {
+ return (e != null ? e.getId() : "*")+":"+(s != null ? sName : "*");
+ }
+ }
+ static Object makeEntitySensorToken(Entity e, Sensor<?> s) {
+ return new EntitySensorToken(e, s);
+ }
+ static Object makeEntitySensorToken(SensorEvent<?> se) {
+ return makeEntitySensorToken(se.getSource(), se.getSensor());
+ }
+
+ /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */
+ public final <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener);
+ }
+
+ /**
+ * This implementation handles the following flags, in addition to those described in the {@link SubscriptionManager}
+ * interface:
+ * <ul>
+ * <li>subscriberExecutionManagerTag - a tag to pass to execution manager (without setting any execution semantics / TaskPreprocessor);
+ * if not supplied and there is a subscriber, this will be inferred from the subscriber and set up with SingleThreadedScheduler
+ * <li>eventFilter - a Predicate<SensorEvent> instance to filter what events are delivered
+ * </ul>
+ *
+ * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)
+ */
+ public final <T> SubscriptionHandle subscribe(Map<String, Object> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribe(flags, new Subscription<T>(producer, sensor, listener));
+ }
+
+ /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
+ public final <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+ }
+
+ /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */
+ public final <T> SubscriptionHandle subscribeToChildren(Map<String, Object> flags, final Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() {
+ public boolean apply(SensorEvent<T> input) {
+ return parent != null && input.getSource() != null && parent.equals(input.getSource().getParent());
+ }
+ };
+ flags.put("eventFilter", eventFilter);
+ return subscribe(flags, null, sensor, listener);
+ }
+
+ /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */
+ public final <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+ }
+
+ /** @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) */
+ public final <T> SubscriptionHandle subscribeToMembers(Map<String, Object> flags, final Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ Predicate<SensorEvent<T>> eventFilter = new Predicate<SensorEvent<T>>() {
+ public boolean apply(SensorEvent<T> input) {
+ return parent.getMembers().contains(input.getSource());
+ }
+ };
+ flags.put("eventFilter", eventFilter);
+ return subscribe(flags, null, sensor, listener);
+ }
+
+ protected <T> Object getSubscriber(Map<String, Object> flags, Subscription<T> s) {
+ return s.subscriber!=null ? s.subscriber : flags.containsKey("subscriber") ? flags.remove("subscriber") : s.listener;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java
new file mode 100644
index 0000000..5d6a057
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AccessManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import org.apache.brooklyn.api.management.AccessController;
+
+import com.google.common.annotations.Beta;
+
+@Beta
+public interface AccessManager {
+
+ AccessController getAccessController();
+
+ boolean isLocationProvisioningAllowed();
+
+ boolean isLocationManagementAllowed();
+
+ boolean isEntityManagementAllowed();
+
+ void setLocationProvisioningAllowed(boolean allowed);
+
+ void setLocationManagementAllowed(boolean allowed);
+
+ void setEntityManagementAllowed(boolean allowed);
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java
new file mode 100644
index 0000000..038ec90
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/AsyncCollectionChangeAdapter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.management.ExecutionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.task.SingleThreadedScheduler;
+
+public class AsyncCollectionChangeAdapter<Item> implements CollectionChangeListener<Item> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectionChangeAdapter.class);
+
+ private final ExecutionManager executor;
+ private final CollectionChangeListener<Item> delegate;
+
+ public AsyncCollectionChangeAdapter(ExecutionManager executor, CollectionChangeListener<Item> delegate) {
+ this.executor = checkNotNull(executor, "executor");
+ this.delegate = checkNotNull(delegate, "delegate");
+ ((BasicExecutionManager) executor).setTaskSchedulerForTag(delegate, SingleThreadedScheduler.class);
+ }
+
+ @Override
+ public void onItemAdded(final Item item) {
+ executor.submit(MutableMap.of("tag", delegate), new Runnable() {
+ public void run() {
+ try {
+ delegate.onItemAdded(item);
+ } catch (Throwable t) {
+ LOG.warn("Error notifying listener of itemAdded("+item+")", t);
+ Exceptions.propagate(t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onItemRemoved(final Item item) {
+ executor.submit(MutableMap.of("tag", delegate), new Runnable() {
+ public void run() {
+ try {
+ delegate.onItemRemoved(item);
+ } catch (Throwable t) {
+ LOG.warn("Error notifying listener of itemAdded("+item+")", t);
+ Exceptions.propagate(t);
+ }
+ }
+ });
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof AsyncCollectionChangeAdapter) &&
+ delegate.equals(((AsyncCollectionChangeAdapter<?>) other).delegate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java
new file mode 100644
index 0000000..04aa18f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BasicSubscriptionContext.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import static brooklyn.util.JavaGroovyEquivalents.mapOf;
+import groovy.lang.Closure;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.management.SubscriptionContext;
+import org.apache.brooklyn.api.management.SubscriptionHandle;
+import org.apache.brooklyn.api.management.SubscriptionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+/**
+ * A {@link SubscriptionContext} for an entity or other user of a {@link SubscriptionManager}.
+ */
+public class BasicSubscriptionContext implements SubscriptionContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BasicSubscriptionContext.class);
+
+ private final SubscriptionManager manager;
+ private final Object subscriber;
+ private final Map<String,Object> flags;
+
+ public BasicSubscriptionContext(SubscriptionManager manager, Object subscriber) {
+ this(Collections.<String,Object>emptyMap(), manager, subscriber);
+ }
+
+ public BasicSubscriptionContext(Map<String, ?> flags, SubscriptionManager manager, Object subscriber) {
+ this.manager = manager;
+ this.subscriber = subscriber;
+ this.flags = mapOf("subscriber", subscriber);
+ if (flags!=null) this.flags.putAll(flags);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, Closure c) {
+ return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, c);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, Closure c) {
+ return subscribe(newFlags, producer, sensor, toSensorEventListener(c));
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribe(Collections.<String,Object>emptyMap(), producer, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
+ if (newFlags != null) subscriptionFlags.putAll(newFlags);
+ return manager.subscribe(subscriptionFlags, producer, sensor, listener);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, Closure c) {
+ return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, c);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, Closure c) {
+ return subscribeToChildren(newFlags, parent, sensor, toSensorEventListener(c));
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribeToChildren(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToChildren(Map<String, Object> newFlags, Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
+ if (newFlags != null) subscriptionFlags.putAll(newFlags);
+ return manager.subscribeToChildren(subscriptionFlags, parent, sensor, listener);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, Closure c) {
+ return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, c);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, Closure c) {
+ return subscribeToMembers(newFlags, parent, sensor, toSensorEventListener(c));
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToMembers(Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ return subscribeToMembers(Collections.<String,Object>emptyMap(), parent, sensor, listener);
+ }
+
+ @Override
+ public <T> SubscriptionHandle subscribeToMembers(Map<String, Object> newFlags, Group parent, Sensor<T> sensor, SensorEventListener<? super T> listener) {
+ Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags);
+ if (newFlags != null) subscriptionFlags.putAll(newFlags);
+ return manager.subscribeToMembers(subscriptionFlags, parent, sensor, listener);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public boolean unsubscribe(SubscriptionHandle subscriptionId) {
+ Preconditions.checkNotNull(subscriptionId, "subscriptionId must not be null");
+ Preconditions.checkArgument(Objects.equal(subscriber, ((Subscription) subscriptionId).subscriber), "The subscriptionId is for a different "+subscriber+"; expected "+((Subscription) subscriptionId).subscriber);
+ return manager.unsubscribe(subscriptionId);
+ }
+
+ /** @see SubscriptionManager#publish(SensorEvent) */
+ @Override
+ public <T> void publish(SensorEvent<T> event) {
+ manager.publish(event);
+ }
+
+ /** Return the subscriptions associated with this context */
+ @Override
+ public Set<SubscriptionHandle> getSubscriptions() {
+ return manager.getSubscriptionsForSubscriber(subscriber);
+ }
+
+ @Override
+ public int unsubscribeAll() {
+ int count = 0;
+
+ // To avoid ConcurrentModificationException when copying subscriptions, need to synchronize on it
+ Set<SubscriptionHandle> subscriptions = getSubscriptions();
+ Collection<SubscriptionHandle> subscriptionsCopy;
+ synchronized (subscriptions) {
+ subscriptionsCopy = ImmutableList.copyOf(subscriptions);
+ }
+
+ for (SubscriptionHandle s : subscriptionsCopy) {
+ count++;
+ boolean result = unsubscribe(s);
+ if (!result) LOG.warn("When unsubscribing from all of {}, unsubscribe of {} return false", subscriber, s);
+ }
+ return count;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private <T> SensorEventListener<T> toSensorEventListener(final Closure c) {
+ return new SensorEventListener<T>() {
+ @Override public void onEvent(SensorEvent<T> event) {
+ c.call(event);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java
new file mode 100644
index 0000000..c02ff81
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynGarbageCollector.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.management.HasTaskChildren;
+import org.apache.brooklyn.api.management.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity;
+import brooklyn.entity.basic.BrooklynTaskTags.WrappedStream;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.Entities;
+import brooklyn.internal.storage.BrooklynStorage;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.javalang.MemoryUsageTracker;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.task.ExecutionListener;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Iterables;
+
+/**
+ * Deletes record of old tasks, to prevent space leaks and the eating up of more and more memory.
+ *
+ * The deletion policy is configurable:
+ * <ul>
+ * <li>Period - how frequently to look at the existing tasks to delete some, if required
+ * <li>Max task age - the time after which a completed task will be automatically deleted
+ * (i.e. any root task completed more than maxTaskAge ago will be deleted)
+ * <li>Max tasks per <various categories> - the maximum number of tasks to be kept for a given tag,
+ * split into categories based on what is seeming to be useful
+ * </ul>
+ *
+ * The default is to check with a period of one minute, deleting tasks after 30 days,
+ * and keeping at most 100000 tasks in the system,
+ * max 1000 tasks per entity, 50 per effector within that entity, and 50 per other non-effector tag
+ * within that entity (or global if not attached to an entity).
+ *
+ * @author aled
+ */
+public class BrooklynGarbageCollector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BrooklynGarbageCollector.class);
+
+ public static final ConfigKey<Duration> GC_PERIOD = ConfigKeys.newDurationConfigKey(
+ "brooklyn.gc.period", "the period for checking if any tasks need to be deleted",
+ Duration.minutes(1));
+
+ public static final ConfigKey<Boolean> DO_SYSTEM_GC = ConfigKeys.newBooleanConfigKey(
+ "brooklyn.gc.doSystemGc", "whether to periodically call System.gc()", false);
+
+ /**
+ * should we check for tasks which are submitted by another but backgrounded, i.e. not a child of that task?
+ * default to yes, despite it can be some extra loops, to make sure we GC them promptly.
+ * @since 0.7.0 */
+ // work offender is {@link DynamicSequentialTask} internal job tracker, but it is marked
+ // transient so it is destroyed prompty; there may be others, however;
+ // but OTOH it might be expensive to check for these all the time!
+ // TODO probably we can set this false (remove this and related code),
+ // and just rely on usual GC to pick up background tasks; the lifecycle of background task
+ // should normally be independent of the submitter. (DST was the exception, and marking
+ // transient there fixes the main problem, which is when the submitter is GC'd but the submitted is not,
+ // and we don't want the submitted to show up at the root in the GUI, which it will if its
+ // submitter has been GC'd)
+ @Beta
+ public static final ConfigKey<Boolean> CHECK_SUBTASK_SUBMITTERS = ConfigKeys.newBooleanConfigKey(
+ "brooklyn.gc.checkSubtaskSubmitters", "whether for subtasks to check the submitters", true);
+
+ public static final ConfigKey<Integer> MAX_TASKS_PER_TAG = ConfigKeys.newIntegerConfigKey(
+ "brooklyn.gc.maxTasksPerTag",
+ "the maximum number of tasks to be kept for a given tag "
+ + "within an execution context (e.g. entity); "
+ + "some broad-brush tags are excluded, and if an entity has multiple tags all tag counts must be full",
+ 50);
+
+ public static final ConfigKey<Integer> MAX_TASKS_PER_ENTITY = ConfigKeys.newIntegerConfigKey(
+ "brooklyn.gc.maxTasksPerEntity",
+ "the maximum number of tasks to be kept for a given entity",
+ 1000);
+
+ public static final ConfigKey<Integer> MAX_TASKS_GLOBAL = ConfigKeys.newIntegerConfigKey(
+ "brooklyn.gc.maxTasksGlobal",
+ "the maximum number of tasks to be kept across the entire system",
+ 100000);
+
+ public static final ConfigKey<Duration> MAX_TASK_AGE = ConfigKeys.newDurationConfigKey(
+ "brooklyn.gc.maxTaskAge",
+ "the duration after which a completed task will be automatically deleted",
+ Duration.days(30));
+
+ protected final static Comparator<Task<?>> TASKS_OLDEST_FIRST_COMPARATOR = new Comparator<Task<?>>() {
+ @Override public int compare(Task<?> t1, Task<?> t2) {
+ long end1 = t1.getEndTimeUtc();
+ long end2 = t2.getEndTimeUtc();
+ return (end1 < end2) ? -1 : ((end1 == end2) ? 0 : 1);
+ }
+ };
+
+ private final BasicExecutionManager executionManager;
+ private final BrooklynStorage storage;
+ private final BrooklynProperties brooklynProperties;
+ private final ScheduledExecutorService executor;
+ private ScheduledFuture<?> activeCollector;
+ private Map<Entity,Task<?>> unmanagedEntitiesNeedingGc = new LinkedHashMap<Entity, Task<?>>();
+
+ private Duration gcPeriod;
+ private final boolean doSystemGc;
+ private volatile boolean running = true;
+
+ public BrooklynGarbageCollector(BrooklynProperties brooklynProperties, BasicExecutionManager executionManager, BrooklynStorage storage) {
+ this.executionManager = executionManager;
+ this.storage = storage;
+ this.brooklynProperties = brooklynProperties;
+
+ doSystemGc = brooklynProperties.getConfig(DO_SYSTEM_GC);
+
+ executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override public Thread newThread(Runnable r) {
+ return new Thread(r, "brooklyn-gc");
+ }});
+
+ executionManager.addListener(new ExecutionListener() {
+ @Override public void onTaskDone(Task<?> task) {
+ BrooklynGarbageCollector.this.onTaskDone(task);
+ }});
+
+ scheduleCollector(true);
+ }
+
+ protected synchronized void scheduleCollector(boolean canInterruptCurrent) {
+ if (activeCollector != null) activeCollector.cancel(canInterruptCurrent);
+
+ gcPeriod = brooklynProperties.getConfig(GC_PERIOD);
+ if (gcPeriod!=null) {
+ activeCollector = executor.scheduleWithFixedDelay(
+ new Runnable() {
+ @Override public void run() {
+ gcIteration();
+ }
+ },
+ gcPeriod.toMillisecondsRoundingUp(),
+ gcPeriod.toMillisecondsRoundingUp(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /** force a round of Brooklyn garbage collection */
+ public void gcIteration() {
+ try {
+ logUsage("brooklyn gc (before)");
+ gcTasks();
+ logUsage("brooklyn gc (after)");
+
+ if (doSystemGc) {
+ // Can be very useful when tracking down OOMEs etc, where a lot of tasks are executing
+ // Empirically observed that (on OS X jvm at least) calling twice blocks - logs a significant
+ // amount of memory having been released, as though a full-gc had been run. But this is highly
+ // dependent on the JVM implementation.
+ System.gc(); System.gc();
+ logUsage("brooklyn gc (after system gc)");
+ }
+ } catch (Throwable t) {
+ Exceptions.propagateIfFatal(t);
+ LOG.warn("Error during management-context GC: "+t, t);
+ // previously we bailed on all errors, but I don't think we should do that -Alex
+ }
+ }
+
+ public void logUsage(String prefix) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(prefix+" - using "+getUsageString());
+ }
+
+ public static String makeBasicUsageString() {
+ return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+
+ Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" +
+ " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+
+ Thread.activeCount()+" threads";
+ }
+
+ public String getUsageString() {
+ return makeBasicUsageString()+"; "+
+ "storage: " + storage.getStorageMetrics() + "; " +
+ "tasks: " +
+ executionManager.getNumActiveTasks()+" active, "+
+ executionManager.getNumIncompleteTasks()+" unfinished; "+
+ executionManager.getNumInMemoryTasks()+" remembered, "+
+ executionManager.getTotalTasksSubmitted()+" total submitted)";
+ }
+
+ public void shutdownNow() {
+ running = false;
+ if (activeCollector != null) activeCollector.cancel(true);
+ if (executor != null) executor.shutdownNow();
+ }
+
+ public void onUnmanaged(Entity entity) {
+ // defer task deletions until the entity is completely unmanaged
+ // (this is usually invoked during the stop sequence)
+ synchronized (unmanagedEntitiesNeedingGc) {
+ unmanagedEntitiesNeedingGc.put(entity, Tasks.current());
+ }
+ }
+
+ public void deleteTasksForEntity(Entity entity) {
+ // remove all references to this entity from tasks
+ executionManager.deleteTag(entity);
+ executionManager.deleteTag(BrooklynTaskTags.tagForContextEntity(entity));
+ executionManager.deleteTag(BrooklynTaskTags.tagForCallerEntity(entity));
+ executionManager.deleteTag(BrooklynTaskTags.tagForTargetEntity(entity));
+ }
+
+ public void onUnmanaged(Location loc) {
+ // No-op currently; no tasks are tracked through their location
+ }
+
+ public void onTaskDone(Task<?> task) {
+ if (shouldDeleteTaskImmediately(task)) {
+ executionManager.deleteTask(task);
+ }
+ }
+
+ /** @deprecated since 0.7.0, method moved internal until semantics are clarified; see also {@link #shouldDeleteTaskImmediately(Task)} */
+ @Deprecated
+ public boolean shouldDeleteTask(Task<?> task) {
+ return shouldDeleteTaskImmediately(task);
+ }
+ /** whether this task should be deleted on completion,
+ * because it is transient, or because it is submitted background without much context information */
+ protected boolean shouldDeleteTaskImmediately(Task<?> task) {
+ if (!task.isDone()) return false;
+
+ Set<Object> tags = task.getTags();
+ if (tags.contains(ManagementContextInternal.TRANSIENT_TASK_TAG))
+ return true;
+ if (tags.contains(ManagementContextInternal.EFFECTOR_TAG) || tags.contains(ManagementContextInternal.NON_TRANSIENT_TASK_TAG))
+ return false;
+
+ if (task.getSubmittedByTask()!=null) {
+ Task<?> parent = task.getSubmittedByTask();
+ if (executionManager.getTask(parent.getId())==null) {
+ // parent is already cleaned up
+ return true;
+ }
+ if (parent instanceof HasTaskChildren && Iterables.contains(((HasTaskChildren)parent).getChildren(), task)) {
+ // it is a child, let the parent manage this task's death
+ return false;
+ }
+ Entity associatedEntity = BrooklynTaskTags.getTargetOrContextEntity(task);
+ if (associatedEntity!=null) {
+ // this is associated to an entity; destroy only if the entity is unmanaged
+ return !Entities.isManaged(associatedEntity);
+ }
+ // if not associated to an entity, then delete immediately
+ return true;
+ }
+
+ // e.g. scheduled tasks, sensor events, etc
+ // TODO (in future may keep some of these with another limit, based on a new TagCategory)
+ // there may also be a server association for server-side tasks which should be kept
+ // (but be careful not to keep too many subscriptions!)
+
+ return true;
+ }
+
+ /**
+ * Deletes old tasks. The age/number of tasks to keep is controlled by fields like
+ * {@link #maxTasksPerTag} and {@link #maxTaskAge}.
+ */
+ protected synchronized int gcTasks() {
+ // TODO Must be careful with memory usage here: have seen OOME if we get crazy lots of tasks.
+ // hopefully the use new limits, filters, and use of live lists in some places (added Sep 2014) will help.
+ //
+ // An option is for getTasksWithTag(tag) to return an ArrayList rather than a LinkedHashSet. That
+ // is a far more memory efficient data structure (e.g. 4 bytes overhead per object rather than
+ // 32 bytes overhead per object for HashSet).
+ //
+ // More notes on optimization is in the history of this file.
+
+ if (!running) return 0;
+
+ Duration newPeriod = brooklynProperties.getConfig(GC_PERIOD);
+ if (!Objects.equal(gcPeriod, newPeriod)) {
+ // caller has changed period, reschedule on next run
+ scheduleCollector(false);
+ }
+
+ expireUnmanagedEntityTasks();
+ expireAgedTasks();
+ expireTransientTasks();
+
+ // now look at overcapacity tags, non-entity tags first
+
+ Set<Object> taskTags = executionManager.getTaskTags();
+
+ int maxTasksPerEntity = brooklynProperties.getConfig(MAX_TASKS_PER_ENTITY);
+ int maxTasksPerTag = brooklynProperties.getConfig(MAX_TASKS_PER_TAG);
+
+ Map<Object,AtomicInteger> taskNonEntityTagsOverCapacity = MutableMap.of();
+ Map<Object,AtomicInteger> taskEntityTagsOverCapacity = MutableMap.of();
+
+ Map<Object,AtomicInteger> taskAllTagsOverCapacity = MutableMap.of();
+
+ for (Object tag : taskTags) {
+ if (isTagIgnoredForGc(tag)) continue;
+
+ Set<Task<?>> tasksWithTag = executionManager.tasksWithTagLiveOrNull(tag);
+ if (tasksWithTag==null) continue;
+ AtomicInteger overA = null;
+ if (tag instanceof WrappedEntity) {
+ int over = tasksWithTag.size() - maxTasksPerEntity;
+ if (over>0) {
+ overA = new AtomicInteger(over);
+ taskEntityTagsOverCapacity.put(tag, overA);
+ }
+ } else {
+ int over = tasksWithTag.size() - maxTasksPerTag;
+ if (over>0) {
+ overA = new AtomicInteger(over);
+ taskNonEntityTagsOverCapacity.put(tag, overA);
+ }
+ }
+ if (overA!=null) {
+ taskAllTagsOverCapacity.put(tag, overA);
+ }
+ }
+
+ int deletedCount = 0;
+ deletedCount += expireOverCapacityTagsInCategory(taskNonEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.NON_ENTITY_NORMAL, false);
+ deletedCount += expireOverCapacityTagsInCategory(taskEntityTagsOverCapacity, taskAllTagsOverCapacity, TagCategory.ENTITY, true);
+ deletedCount += expireSubTasksWhoseSubmitterIsExpired();
+
+ int deletedGlobally = expireIfOverCapacityGlobally();
+ deletedCount += deletedGlobally;
+ if (deletedGlobally>0) deletedCount += expireSubTasksWhoseSubmitterIsExpired();
+
+ return deletedCount;
+ }
+
+ protected static boolean isTagIgnoredForGc(Object tag) {
+ if (tag == null) return true;
+ if (tag.equals(ManagementContextInternal.EFFECTOR_TAG)) return true;
+ if (tag.equals(ManagementContextInternal.SUB_TASK_TAG)) return true;
+ if (tag.equals(ManagementContextInternal.NON_TRANSIENT_TASK_TAG)) return true;
+ if (tag.equals(ManagementContextInternal.TRANSIENT_TASK_TAG)) return true;
+ if (tag instanceof WrappedStream) {
+ return true;
+ }
+
+ return false;
+ }
+
+ protected void expireUnmanagedEntityTasks() {
+ Iterator<Entry<Entity, Task<?>>> ei;
+ synchronized (unmanagedEntitiesNeedingGc) {
+ ei = MutableSet.copyOf(unmanagedEntitiesNeedingGc.entrySet()).iterator();
+ }
+ while (ei.hasNext()) {
+ Entry<Entity, Task<?>> ee = ei.next();
+ if (Entities.isManaged(ee.getKey())) continue;
+ if (ee.getValue()!=null && !ee.getValue().isDone()) continue;
+ deleteTasksForEntity(ee.getKey());
+ synchronized (unmanagedEntitiesNeedingGc) {
+ unmanagedEntitiesNeedingGc.remove(ee.getKey());
+ }
+ }
+ }
+
+ protected void expireAgedTasks() {
+ Duration maxTaskAge = brooklynProperties.getConfig(MAX_TASK_AGE);
+
+ Collection<Task<?>> allTasks = executionManager.allTasksLive();
+ Collection<Task<?>> tasksToDelete = MutableList.of();
+
+ try {
+ for (Task<?> task: allTasks) {
+ if (!task.isDone()) continue;
+ if (BrooklynTaskTags.isSubTask(task)) continue;
+
+ if (maxTaskAge.isShorterThan(Duration.sinceUtc(task.getEndTimeUtc())))
+ tasksToDelete.add(task);
+ }
+
+ } catch (ConcurrentModificationException e) {
+ // delete what we've found so far
+ LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
+ }
+
+ for (Task<?> task: tasksToDelete) {
+ executionManager.deleteTask(task);
+ }
+ }
+
+ protected void expireTransientTasks() {
+ Set<Task<?>> transientTasks = executionManager.getTasksWithTag(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+ for (Task<?> t: transientTasks) {
+ if (!t.isDone()) continue;
+ executionManager.deleteTask(t);
+ }
+ }
+
+ protected int expireSubTasksWhoseSubmitterIsExpired() {
+ // ideally we wouldn't have this; see comments on CHECK_SUBTASK_SUBMITTERS
+ if (!brooklynProperties.getConfig(CHECK_SUBTASK_SUBMITTERS))
+ return 0;
+
+ Collection<Task<?>> allTasks = executionManager.allTasksLive();
+ Collection<Task<?>> tasksToDelete = MutableList.of();
+ try {
+ for (Task<?> task: allTasks) {
+ if (!task.isDone()) continue;
+ Task<?> submitter = task.getSubmittedByTask();
+ // if we've leaked, ie a subtask which is not a child task,
+ // and the submitter is GC'd, then delete this also
+ if (submitter!=null && submitter.isDone() && executionManager.getTask(submitter.getId())==null) {
+ tasksToDelete.add(task);
+ }
+ }
+
+ } catch (ConcurrentModificationException e) {
+ // delete what we've found so far
+ LOG.debug("Got CME inspecting aged tasks, with "+tasksToDelete.size()+" found for deletion: "+e);
+ }
+
+ for (Task<?> task: tasksToDelete) {
+ executionManager.deleteTask(task);
+ }
+ return tasksToDelete.size();
+ }
+
+ protected enum TagCategory {
+ ENTITY, NON_ENTITY_NORMAL;
+
+ public boolean acceptsTag(Object tag) {
+ if (isTagIgnoredForGc(tag)) return false;
+ if (tag instanceof WrappedEntity) return this==ENTITY;
+ if (this==ENTITY) return false;
+ return true;
+ }
+ }
+
+
+ /** expires tasks which are over-capacity in all their non-entity tag categories, returned count */
+ protected int expireOverCapacityTagsInCategory(Map<Object, AtomicInteger> taskTagsInCategoryOverCapacity, Map<Object, AtomicInteger> taskAllTagsOverCapacity, TagCategory category, boolean emptyFilterNeeded) {
+ if (emptyFilterNeeded) {
+ // previous run may have decremented counts
+ MutableList<Object> nowOkayTags = MutableList.of();
+ for (Map.Entry<Object,AtomicInteger> entry: taskTagsInCategoryOverCapacity.entrySet()) {
+ if (entry.getValue().get()<=0) nowOkayTags.add(entry.getKey());
+ }
+ for (Object tag: nowOkayTags) taskTagsInCategoryOverCapacity.remove(tag);
+ }
+
+ if (taskTagsInCategoryOverCapacity.isEmpty())
+ return 0;
+
+ Collection<Task<?>> tasks = executionManager.allTasksLive();
+ List<Task<?>> tasksToConsiderDeleting = MutableList.of();
+ try {
+ for (Task<?> task: tasks) {
+ if (!task.isDone()) continue;
+
+ Set<Object> tags = task.getTags();
+
+ int categoryTags = 0, tooFullCategoryTags = 0;
+ for (Object tag: tags) {
+ if (category.acceptsTag(tag)) {
+ categoryTags++;
+ if (taskTagsInCategoryOverCapacity.containsKey(tag))
+ tooFullCategoryTags++;
+ }
+ }
+ if (tooFullCategoryTags>0) {
+ if (categoryTags==tooFullCategoryTags) {
+ // all buckets are full, delete this one
+ tasksToConsiderDeleting.add(task);
+ } else {
+ // if any bucket is under capacity, then give grace to the other buckets in this category
+ for (Object tag: tags) {
+ if (category.acceptsTag(tag)) {
+ AtomicInteger over = taskTagsInCategoryOverCapacity.get(tag);
+ if (over!=null) {
+ if (over.decrementAndGet()<=0) {
+ // and remove it from over-capacity if so
+ taskTagsInCategoryOverCapacity.remove(tag);
+ if (taskTagsInCategoryOverCapacity.isEmpty())
+ return 0;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ } catch (ConcurrentModificationException e) {
+ // do CME's happen with these data structures?
+ // if so, let's just delete what we've found so far
+ LOG.debug("Got CME inspecting tasks, with "+tasksToConsiderDeleting.size()+" found for deletion: "+e);
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("brooklyn-gc detected "+taskTagsInCategoryOverCapacity.size()+" "+category+" "
+ + "tags over capacity, expiring old tasks; "
+ + tasksToConsiderDeleting.size()+" tasks under consideration; categories are: "
+ + taskTagsInCategoryOverCapacity);
+
+ Collections.sort(tasksToConsiderDeleting, TASKS_OLDEST_FIRST_COMPARATOR);
+ // now try deleting tasks which are overcapacity for each (non-entity) tag
+ int deleted = 0;
+ for (Task<?> task: tasksToConsiderDeleting) {
+ boolean delete = true;
+ for (Object tag: task.getTags()) {
+ if (!category.acceptsTag(tag))
+ continue;
+ if (taskTagsInCategoryOverCapacity.get(tag)==null) {
+ // no longer over capacity in this tag
+ delete = false;
+ break;
+ }
+ }
+ if (delete) {
+ // delete this and update overcapacity info
+ deleted++;
+ executionManager.deleteTask(task);
+ for (Object tag: task.getTags()) {
+ AtomicInteger counter = taskAllTagsOverCapacity.get(tag);
+ if (counter!=null && counter.decrementAndGet()<=0)
+ taskTagsInCategoryOverCapacity.remove(tag);
+ }
+ if (LOG.isTraceEnabled())
+ LOG.trace("brooklyn-gc deleted "+task+", buckets now "+taskTagsInCategoryOverCapacity);
+ if (taskTagsInCategoryOverCapacity.isEmpty())
+ break;
+ }
+ }
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("brooklyn-gc deleted "+deleted+" tasks in over-capacity " + category+" tag categories; "
+ + "capacities now: " + taskTagsInCategoryOverCapacity);
+ return deleted;
+ }
+
+ protected int expireIfOverCapacityGlobally() {
+ Collection<Task<?>> tasksLive = executionManager.allTasksLive();
+ if (tasksLive.size() <= brooklynProperties.getConfig(MAX_TASKS_GLOBAL))
+ return 0;
+ LOG.debug("brooklyn-gc detected "+tasksLive.size()+" tasks in memory, over global limit, looking at deleting some");
+
+ try {
+ tasksLive = MutableList.copyOf(tasksLive);
+ } catch (ConcurrentModificationException e) {
+ tasksLive = executionManager.getTasksWithAllTags(MutableList.of());
+ }
+
+ MutableList<Task<?>> tasks = MutableList.of();
+ for (Task<?> task: tasksLive) {
+ if (task.isDone()) {
+ tasks.add(task);
+ }
+ }
+
+ int numToDelete = tasks.size() - brooklynProperties.getConfig(MAX_TASKS_GLOBAL);
+ if (numToDelete <= 0) {
+ LOG.debug("brooklyn-gc detected only "+tasks.size()+" completed tasks in memory, not over global limit, so not deleting any");
+ return 0;
+ }
+
+ Collections.sort(tasks, TASKS_OLDEST_FIRST_COMPARATOR);
+
+ int numDeleted = 0;
+ while (numDeleted < numToDelete && tasks.size()>numDeleted) {
+ executionManager.deleteTask( tasks.get(numDeleted++) );
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("brooklyn-gc deleted "+numDeleted+" tasks as was over global limit, now have "+executionManager.allTasksLive().size());
+ return numDeleted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6602f694/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java
new file mode 100644
index 0000000..49afe27
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/management/internal/BrooklynObjectManagementMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.management.internal;
+
+/** Indicates how an entity/location/adjunct is treated at a given {@link ManagementContext} */
+public enum BrooklynObjectManagementMode {
+ /** item does not exist, not in memory, nor persisted (e.g. creating for first time, or finally destroying) */
+ NONEXISTENT,
+ /** item exists or existed elsewhere, i.e. there is persisted state, but is not loaded here */
+ UNMANAGED_PERSISTED,
+ /** item is loaded but read-only (ie not actively managed here) */
+ LOADED_READ_ONLY,
+ /** item is actively managed here */
+ MANAGED_PRIMARY
+}
\ No newline at end of file