You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:00:47 UTC
[32/64] incubator-brooklyn git commit: [BROOKLYN-162] Refactor
package in ./core/util
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/osgi/Osgis.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/osgi/Osgis.java b/core/src/main/java/brooklyn/util/osgi/Osgis.java
deleted file mode 100644
index 3bf972e..0000000
--- a/core/src/main/java/brooklyn/util/osgi/Osgis.java
+++ /dev/null
@@ -1,719 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.osgi;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.jar.Attributes;
-import java.util.jar.JarInputStream;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-
-import javax.annotation.Nullable;
-
-import org.apache.felix.framework.FrameworkFactory;
-import org.apache.felix.framework.util.StringMap;
-import org.apache.felix.framework.util.manifestparser.ManifestParser;
-import org.osgi.framework.Bundle;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.BundleException;
-import org.osgi.framework.Constants;
-import org.osgi.framework.Version;
-import org.osgi.framework.launch.Framework;
-import org.osgi.framework.namespace.PackageNamespace;
-import org.osgi.framework.wiring.BundleCapability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle;
-
-import brooklyn.util.ResourceUtils;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.MutableSet;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.ReferenceWithError;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.net.Urls;
-import brooklyn.util.os.Os;
-import brooklyn.util.stream.Streams;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.annotations.Beta;
-import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Stopwatch;
-
-/**
- * utilities for working with osgi.
- * osgi support is in early days (June 2014) so this class is beta, subject to change,
- * particularly in how framework is started and bundles installed.
- *
- * @since 0.7.0 */
-@Beta
-public class Osgis {
- private static final Logger LOG = LoggerFactory.getLogger(Osgis.class);
-
- private static final String EXTENSION_PROTOCOL = "system";
- private static final String MANIFEST_PATH = "META-INF/MANIFEST.MF";
- private static final Set<String> SYSTEM_BUNDLES = MutableSet.of();
-
- public static class VersionedName {
- private final String symbolicName;
- private final Version version;
- public VersionedName(Bundle b) {
- this.symbolicName = b.getSymbolicName();
- this.version = b.getVersion();
- }
- public VersionedName(String symbolicName, Version version) {
- this.symbolicName = symbolicName;
- this.version = version;
- }
- @Override public String toString() {
- return symbolicName + ":" + Strings.toString(version);
- }
- public boolean equals(String sn, String v) {
- return symbolicName.equals(sn) && (version == null && v == null || version != null && version.toString().equals(v));
- }
- public boolean equals(String sn, Version v) {
- return symbolicName.equals(sn) && (version == null && v == null || version != null && version.equals(v));
- }
- public String getSymbolicName() {
- return symbolicName;
- }
- public Version getVersion() {
- return version;
- }
- @Override
- public int hashCode() {
- return Objects.hashCode(symbolicName, version);
- }
- @Override
- public boolean equals(Object other) {
- if (!(other instanceof VersionedName)) return false;
- VersionedName o = (VersionedName) other;
- return Objects.equal(symbolicName, o.symbolicName) && Objects.equal(version, o.version);
- }
- }
-
- public static class BundleFinder {
- protected final Framework framework;
- protected String symbolicName;
- protected String version;
- protected String url;
- protected boolean urlMandatory = false;
- protected final List<Predicate<? super Bundle>> predicates = MutableList.of();
-
- protected BundleFinder(Framework framework) {
- this.framework = framework;
- }
-
- public BundleFinder symbolicName(String symbolicName) {
- this.symbolicName = symbolicName;
- return this;
- }
-
- public BundleFinder version(String version) {
- this.version = version;
- return this;
- }
-
- public BundleFinder id(String symbolicNameOptionallyWithVersion) {
- if (Strings.isBlank(symbolicNameOptionallyWithVersion))
- return this;
-
- Maybe<VersionedName> nv = parseOsgiIdentifier(symbolicNameOptionallyWithVersion);
- if (nv.isAbsent())
- throw new IllegalArgumentException("Cannot parse symbolic-name:version string '"+symbolicNameOptionallyWithVersion+"'");
-
- return id(nv.get());
- }
-
- private BundleFinder id(VersionedName nv) {
- symbolicName(nv.getSymbolicName());
- if (nv.getVersion() != null) {
- version(nv.getVersion().toString());
- }
- return this;
- }
-
- public BundleFinder bundle(CatalogBundle bundle) {
- if (bundle.isNamed()) {
- symbolicName(bundle.getSymbolicName());
- version(bundle.getVersion());
- }
- if (bundle.getUrl() != null) {
- requiringFromUrl(bundle.getUrl());
- }
- return this;
- }
-
- /** Looks for a bundle matching the given URL;
- * unlike {@link #requiringFromUrl(String)} however, if the URL does not match any bundles
- * it will return other matching bundles <i>if</if> a {@link #symbolicName(String)} is specified.
- */
- public BundleFinder preferringFromUrl(String url) {
- this.url = url;
- urlMandatory = false;
- return this;
- }
-
- /** Requires the bundle to have the given URL set as its location. */
- public BundleFinder requiringFromUrl(String url) {
- this.url = url;
- urlMandatory = true;
- return this;
- }
-
- /** Finds the best matching bundle. */
- public Maybe<Bundle> find() {
- return findOne(false);
- }
-
- /** Finds the matching bundle, requiring it to be unique. */
- public Maybe<Bundle> findUnique() {
- return findOne(true);
- }
-
- protected Maybe<Bundle> findOne(boolean requireExactlyOne) {
- if (symbolicName==null && url==null)
- throw new IllegalStateException(this+" must be given either a symbolic name or a URL");
-
- List<Bundle> result = findAll();
- if (result.isEmpty())
- return Maybe.absent("No bundle matching "+getConstraintsDescription());
- if (requireExactlyOne && result.size()>1)
- return Maybe.absent("Multiple bundles ("+result.size()+") matching "+getConstraintsDescription());
-
- return Maybe.of(result.get(0));
- }
-
- /** Finds all matching bundles, in decreasing version order. */
- public List<Bundle> findAll() {
- boolean urlMatched = false;
- List<Bundle> result = MutableList.of();
- for (Bundle b: framework.getBundleContext().getBundles()) {
- if (symbolicName!=null && !symbolicName.equals(b.getSymbolicName())) continue;
- if (version!=null && !Version.parseVersion(version).equals(b.getVersion())) continue;
- for (Predicate<? super Bundle> predicate: predicates) {
- if (!predicate.apply(b)) continue;
- }
-
- // check url last, because if it isn't mandatory we should only clear if we find a url
- // for which the other items also match
- if (url!=null) {
- boolean matches = url.equals(b.getLocation());
- if (urlMandatory) {
- if (!matches) continue;
- else urlMatched = true;
- } else {
- if (matches) {
- if (!urlMatched) {
- result.clear();
- urlMatched = true;
- }
- } else {
- if (urlMatched) {
- // can't use this bundle as we have previously found a preferred bundle, with a matching url
- continue;
- }
- }
- }
- }
-
- result.add(b);
- }
-
- if (symbolicName==null && url!=null && !urlMatched) {
- // if we only "preferred" the url, and we did not match it, and we did not have a symbolic name,
- // then clear the results list!
- result.clear();
- }
-
- Collections.sort(result, new Comparator<Bundle>() {
- @Override
- public int compare(Bundle o1, Bundle o2) {
- return o2.getVersion().compareTo(o1.getVersion());
- }
- });
-
- return result;
- }
-
- public String getConstraintsDescription() {
- List<String> parts = MutableList.of();
- if (symbolicName!=null) parts.add("symbolicName="+symbolicName);
- if (version!=null) parts.add("version="+version);
- if (url!=null)
- parts.add("url["+(urlMandatory ? "required" : "preferred")+"]="+url);
- if (!predicates.isEmpty())
- parts.add("predicates="+predicates);
- return Joiner.on(";").join(parts);
- }
-
- public String toString() {
- return getClass().getCanonicalName()+"["+getConstraintsDescription()+"]";
- }
-
- public BundleFinder version(final Predicate<Version> versionPredicate) {
- return satisfying(new Predicate<Bundle>() {
- @Override
- public boolean apply(Bundle input) {
- return versionPredicate.apply(input.getVersion());
- }
- });
- }
-
- public BundleFinder satisfying(Predicate<? super Bundle> predicate) {
- predicates.add(predicate);
- return this;
- }
- }
-
- public static BundleFinder bundleFinder(Framework framework) {
- return new BundleFinder(framework);
- }
-
- /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated
- public static List<Bundle> getBundlesByName(Framework framework, String symbolicName, Predicate<Version> versionMatcher) {
- return bundleFinder(framework).symbolicName(symbolicName).version(versionMatcher).findAll();
- }
-
- /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated
- public static List<Bundle> getBundlesByName(Framework framework, String symbolicName) {
- return bundleFinder(framework).symbolicName(symbolicName).findAll();
- }
-
- /**
- * Tries to find a bundle in the given framework with name matching either `name' or `name:version'.
- * @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated
- public static Maybe<Bundle> getBundle(Framework framework, String symbolicNameOptionallyWithVersion) {
- return bundleFinder(framework).id(symbolicNameOptionallyWithVersion).find();
- }
-
- /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated
- public static Maybe<Bundle> getBundle(Framework framework, String symbolicName, String version) {
- return bundleFinder(framework).symbolicName(symbolicName).version(version).find();
- }
-
- /** @deprecated since 0.7.0 use {@link #bundleFinder(Framework)} */ @Deprecated
- public static Maybe<Bundle> getBundle(Framework framework, String symbolicName, Version version) {
- return bundleFinder(framework).symbolicName(symbolicName).version(Predicates.equalTo(version)).findUnique();
- }
-
- // -------- creating
-
- /*
- * loading framework factory and starting framework based on:
- * http://felix.apache.org/documentation/subprojects/apache-felix-framework/apache-felix-framework-launching-and-embedding.html
- */
-
- public static FrameworkFactory newFrameworkFactory() {
- URL url = Osgis.class.getClassLoader().getResource(
- "META-INF/services/org.osgi.framework.launch.FrameworkFactory");
- if (url != null) {
- try {
- BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream()));
- try {
- for (String s = br.readLine(); s != null; s = br.readLine()) {
- s = s.trim();
- // load the first non-empty, non-commented line
- if ((s.length() > 0) && (s.charAt(0) != '#')) {
- return (FrameworkFactory) Class.forName(s).newInstance();
- }
- }
- } finally {
- if (br != null) br.close();
- }
- } catch (Exception e) {
- // class creation exceptions are not interesting to caller...
- throw Exceptions.propagate(e);
- }
- }
- throw new IllegalStateException("Could not find framework factory.");
- }
-
- public static Framework newFrameworkStarted(String felixCacheDir, boolean clean, Map<?,?> extraStartupConfig) {
- Map<Object,Object> cfg = MutableMap.copyOf(extraStartupConfig);
- if (clean) cfg.put(Constants.FRAMEWORK_STORAGE_CLEAN, "onFirstInit");
- if (felixCacheDir!=null) cfg.put(Constants.FRAMEWORK_STORAGE, felixCacheDir);
- cfg.put(Constants.FRAMEWORK_BSNVERSION, Constants.FRAMEWORK_BSNVERSION_MULTIPLE);
- FrameworkFactory factory = newFrameworkFactory();
-
- Stopwatch timer = Stopwatch.createStarted();
- Framework framework = factory.newFramework(cfg);
- try {
- framework.init();
- installBootBundles(framework);
- framework.start();
- } catch (Exception e) {
- // framework bundle start exceptions are not interesting to caller...
- throw Exceptions.propagate(e);
- }
- LOG.debug("System bundles are: "+SYSTEM_BUNDLES);
- LOG.debug("OSGi framework started in " + Duration.of(timer));
- return framework;
- }
-
- private static void installBootBundles(Framework framework) {
- Stopwatch timer = Stopwatch.createStarted();
- LOG.debug("Installing OSGi boot bundles from "+Osgis.class.getClassLoader()+"...");
- Enumeration<URL> resources;
- try {
- resources = Osgis.class.getClassLoader().getResources(MANIFEST_PATH);
- } catch (IOException e) {
- throw Exceptions.propagate(e);
- }
- BundleContext bundleContext = framework.getBundleContext();
- Map<String, Bundle> installedBundles = getInstalledBundlesById(bundleContext);
- while(resources.hasMoreElements()) {
- URL url = resources.nextElement();
- ReferenceWithError<?> installResult = installExtensionBundle(bundleContext, url, installedBundles, getVersionedId(framework));
- if (installResult.hasError() && !installResult.masksErrorIfPresent()) {
- // it's reported as a critical error, so warn here
- LOG.warn("Unable to install manifest from "+url+": "+installResult.getError(), installResult.getError());
- } else {
- Object result = installResult.getWithoutError();
- if (result instanceof Bundle) {
- String v = getVersionedId( (Bundle)result );
- SYSTEM_BUNDLES.add(v);
- if (installResult.hasError()) {
- LOG.debug(installResult.getError().getMessage()+(result!=null ? " ("+result+"/"+v+")" : ""));
- } else {
- LOG.debug("Installed "+v+" from "+url);
- }
- } else if (installResult.hasError()) {
- LOG.debug(installResult.getError().getMessage());
- }
- }
- }
- LOG.debug("Installed OSGi boot bundles in "+Time.makeTimeStringRounded(timer)+": "+Arrays.asList(framework.getBundleContext().getBundles()));
- }
-
- private static Map<String, Bundle> getInstalledBundlesById(BundleContext bundleContext) {
- Map<String, Bundle> installedBundles = new HashMap<String, Bundle>();
- Bundle[] bundles = bundleContext.getBundles();
- for (Bundle b : bundles) {
- installedBundles.put(getVersionedId(b), b);
- }
- return installedBundles;
- }
-
- /** Wraps the bundle if successful or already installed, wraps TRUE if it's the system entry,
- * wraps null if the bundle is already installed from somewhere else;
- * in all these cases <i>masking</i> an explanatory error if already installed or it's the system entry.
- * <p>
- * Returns an instance wrapping null and <i>throwing</i> an error if the bundle could not be installed.
- */
- private static ReferenceWithError<?> installExtensionBundle(BundleContext bundleContext, URL manifestUrl, Map<String, Bundle> installedBundles, String frameworkVersionedId) {
- //ignore http://felix.extensions:9/ system entry
- if("felix.extensions".equals(manifestUrl.getHost()))
- return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Skipping install of internal extension bundle from "+manifestUrl));
-
- try {
- Manifest manifest = readManifest(manifestUrl);
- if (!isValidBundle(manifest))
- return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Resource at "+manifestUrl+" is not an OSGi bundle: no valid manifest"));
-
- String versionedId = getVersionedId(manifest);
- URL bundleUrl = ResourceUtils.getContainerUrl(manifestUrl, MANIFEST_PATH);
-
- Bundle existingBundle = installedBundles.get(versionedId);
- if (existingBundle != null) {
- if (!bundleUrl.equals(existingBundle.getLocation()) &&
- //the framework bundle is always pre-installed, don't display duplicate info
- !versionedId.equals(frameworkVersionedId)) {
- return ReferenceWithError.newInstanceMaskingError(null, new IllegalArgumentException("Bundle "+versionedId+" (from manifest " + manifestUrl + ") is already installed, from " + existingBundle.getLocation()));
- }
- return ReferenceWithError.newInstanceMaskingError(existingBundle, new IllegalArgumentException("Bundle "+versionedId+" from manifest " + manifestUrl + " is already installed"));
- }
-
- byte[] jar = buildExtensionBundle(manifest);
- LOG.debug("Installing boot bundle " + bundleUrl);
- //mark the bundle as extension so we can detect it later using the "system:" protocol
- //(since we cannot access BundleImpl.isExtension)
- Bundle newBundle = bundleContext.installBundle(EXTENSION_PROTOCOL + ":" + bundleUrl.toString(), new ByteArrayInputStream(jar));
- installedBundles.put(versionedId, newBundle);
- return ReferenceWithError.newInstanceWithoutError(newBundle);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- return ReferenceWithError.newInstanceThrowingError(null,
- new IllegalStateException("Problem installing extension bundle " + manifestUrl + ": "+e, e));
- }
- }
-
- private static Manifest readManifest(URL manifestUrl) throws IOException {
- Manifest manifest;
- InputStream in = null;
- try {
- in = manifestUrl.openStream();
- manifest = new Manifest(in);
- } finally {
- if (in != null) {
- try {in.close();}
- catch (Exception e) {};
- }
- }
- return manifest;
- }
-
- private static byte[] buildExtensionBundle(Manifest manifest) throws IOException {
- Attributes atts = manifest.getMainAttributes();
-
- //the following properties are invalid in extension bundles
- atts.remove(new Attributes.Name(Constants.IMPORT_PACKAGE));
- atts.remove(new Attributes.Name(Constants.REQUIRE_BUNDLE));
- atts.remove(new Attributes.Name(Constants.BUNDLE_NATIVECODE));
- atts.remove(new Attributes.Name(Constants.DYNAMICIMPORT_PACKAGE));
- atts.remove(new Attributes.Name(Constants.BUNDLE_ACTIVATOR));
-
- //mark as extension bundle
- atts.putValue(Constants.FRAGMENT_HOST, "system.bundle; extension:=framework");
-
- //create the jar containing the manifest
- ByteArrayOutputStream jar = new ByteArrayOutputStream();
- JarOutputStream out = new JarOutputStream(jar, manifest);
- out.close();
- return jar.toByteArray();
- }
-
- private static boolean isValidBundle(Manifest manifest) {
- Attributes atts = manifest.getMainAttributes();
- return atts.containsKey(new Attributes.Name(Constants.BUNDLE_MANIFESTVERSION));
- }
-
- private static String getVersionedId(Bundle b) {
- return b.getSymbolicName() + ":" + b.getVersion();
- }
-
- private static String getVersionedId(Manifest manifest) {
- Attributes atts = manifest.getMainAttributes();
- return atts.getValue(Constants.BUNDLE_SYMBOLICNAME) + ":" +
- atts.getValue(Constants.BUNDLE_VERSION);
- }
-
- /**
- * Installs a bundle from the given URL, doing a check if already installed, and
- * using the {@link ResourceUtils} loader for this project (brooklyn core)
- */
- public static Bundle install(Framework framework, String url) throws BundleException {
- boolean isLocal = isLocalUrl(url);
- String localUrl = url;
- if (!isLocal) {
- localUrl = cacheFile(url);
- }
-
- try {
- Bundle bundle = getInstalledBundle(framework, localUrl);
- if (bundle != null) {
- return bundle;
- }
-
- // use our URL resolution so we get classpath items
- LOG.debug("Installing bundle into {} from url: {}", framework, url);
- InputStream stream = getUrlStream(localUrl);
- Bundle installedBundle = framework.getBundleContext().installBundle(url, stream);
-
- return installedBundle;
- } finally {
- if (!isLocal) {
- try {
- new File(new URI(localUrl)).delete();
- } catch (URISyntaxException e) {
- throw Exceptions.propagate(e);
- }
- }
- }
- }
-
- private static String cacheFile(String url) {
- InputStream in = getUrlStream(url);
- File cache = Os.writeToTempFile(in, "bundle-cache", "jar");
- return cache.toURI().toString();
- }
-
- private static boolean isLocalUrl(String url) {
- String protocol = Urls.getProtocol(url);
- return "file".equals(protocol) ||
- "classpath".equals(protocol) ||
- "jar".equals(protocol);
- }
-
- private static Bundle getInstalledBundle(Framework framework, String url) {
- Bundle bundle = framework.getBundleContext().getBundle(url);
- if (bundle != null) {
- return bundle;
- }
-
- // We now support same version installed multiple times (avail since OSGi 4.3+).
- // However we do not support overriding *system* bundles, ie anything already on the classpath.
- // If we wanted to disable multiple versions, see comments below, and reference to FRAMEWORK_BSNVERSION_MULTIPLE above.
-
- // Felix already assumes the stream is pointing to a JAR
- JarInputStream stream;
- try {
- stream = new JarInputStream(getUrlStream(url));
- } catch (IOException e) {
- throw Exceptions.propagate(e);
- }
- Manifest manifest = stream.getManifest();
- Streams.closeQuietly(stream);
- if (manifest == null) {
- throw new IllegalStateException("Missing manifest file in bundle or not a jar file.");
- }
- String versionedId = getVersionedId(manifest);
- for (Bundle installedBundle : framework.getBundleContext().getBundles()) {
- if (versionedId.equals(getVersionedId(installedBundle))) {
- if (SYSTEM_BUNDLES.contains(versionedId)) {
- LOG.debug("Already have system bundle "+versionedId+" from "+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; not installing");
- // "System bundles" (ie things on the classpath) cannot be overridden
- return installedBundle;
- } else {
- LOG.debug("Already have bundle "+versionedId+" from "+installedBundle+"/"+installedBundle.getLocation()+" when requested "+url+"; but it is not a system bundle so proceeding");
- // Other bundles can be installed multiple times. To ignore multiples and continue to use the old one,
- // just return the installedBundle as done just above for system bundles.
- }
- }
- }
- return null;
- }
-
- private static InputStream getUrlStream(String url) {
- return ResourceUtils.create(Osgis.class).getResourceFromUrl(url);
- }
-
- public static boolean isExtensionBundle(Bundle bundle) {
- String location = bundle.getLocation();
- return location != null &&
- EXTENSION_PROTOCOL.equals(Urls.getProtocol(location));
- }
-
- /** Takes a string which might be of the form "symbolic-name" or "symbolic-name:version" (or something else entirely)
- * and returns a VersionedName. The versionedName.getVersion() will be null if if there was no version in the input
- * (or returning {@link Maybe#absent()} if not valid, with a suitable error message). */
- public static Maybe<VersionedName> parseOsgiIdentifier(String symbolicNameOptionalWithVersion) {
- if (Strings.isBlank(symbolicNameOptionalWithVersion))
- return Maybe.absent("OSGi identifier is blank");
-
- String[] parts = symbolicNameOptionalWithVersion.split(":");
- if (parts.length>2)
- return Maybe.absent("OSGi identifier has too many parts; max one ':' symbol");
-
- Version v = null;
- if (parts.length == 2) {
- try {
- v = Version.parseVersion(parts[1]);
- } catch (IllegalArgumentException e) {
- return Maybe.absent("OSGi identifier has invalid version string ("+e.getMessage()+")");
- }
- }
-
- return Maybe.of(new VersionedName(parts[0], v));
- }
-
- /**
- * The class is not used, staying for future reference.
- * Remove after OSGi transition is completed.
- */
- public static class ManifestHelper {
-
- private static ManifestParser parse;
- private Manifest manifest;
- private String source;
-
- private static final String WIRING_PACKAGE = PackageNamespace.PACKAGE_NAMESPACE;
-
- public static ManifestHelper forManifestContents(String contents) throws IOException, BundleException {
- ManifestHelper result = forManifest(Streams.newInputStreamWithContents(contents));
- result.source = contents;
- return result;
- }
-
- public static ManifestHelper forManifest(URL url) throws IOException, BundleException {
- InputStream in = null;
- try {
- in = url.openStream();
- return forManifest(in);
- } finally {
- if (in != null) in.close();
- }
- }
-
- public static ManifestHelper forManifest(InputStream in) throws IOException, BundleException {
- return forManifest(new Manifest(in));
- }
-
- public static ManifestHelper forManifest(Manifest manifest) throws BundleException {
- ManifestHelper result = new ManifestHelper();
- result.manifest = manifest;
- parse = new ManifestParser(null, null, null, new StringMap(manifest.getMainAttributes()));
- return result;
- }
-
- public String getSymbolicName() {
- return parse.getSymbolicName();
- }
-
- public Version getVersion() {
- return parse.getBundleVersion();
- }
-
- public String getSymbolicNameVersion() {
- return getSymbolicName()+":"+getVersion();
- }
-
- public List<String> getExportedPackages() {
- MutableList<String> result = MutableList.of();
- for (BundleCapability c: parse.getCapabilities()) {
- if (WIRING_PACKAGE.equals(c.getNamespace())) {
- result.add((String)c.getAttributes().get(WIRING_PACKAGE));
- }
- }
- return result;
- }
-
- @Nullable public String getSource() {
- return source;
- }
-
- public Manifest getManifest() {
- return manifest;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java b/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java
deleted file mode 100644
index f3511d7..0000000
--- a/core/src/main/java/brooklyn/util/task/AbstractExecutionContext.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.ExecutionManager;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-
-import com.google.common.collect.Maps;
-
-public abstract class AbstractExecutionContext implements ExecutionContext {
-
- /**
- * Submits the given runnable/callable/task for execution (in a separate thread);
- * supported keys in the map include: tags (add'l tags to put on the resulting task),
- * description (string), and others as described in the reference below
- *
- * @see ExecutionManager#submit(Map, Task)
- */
- @Override
- public Task<?> submit(Map<?,?> properties, Runnable runnable) { return submitInternal(properties, runnable); }
-
- /** @see #submit(Map, Runnable) */
- @Override
- public Task<?> submit(Runnable runnable) { return submitInternal(Maps.newLinkedHashMap(), runnable); }
-
- /** @see #submit(Map, Runnable) */
- @Override
- public <T> Task<T> submit(Callable<T> callable) { return submitInternal(Maps.newLinkedHashMap(), callable); }
-
- /** @see #submit(Map, Runnable) */
- @Override
- public <T> Task<T> submit(Map<?,?> properties, Callable<T> callable) { return submitInternal(properties, callable); }
-
- /** @see #submit(Map, Runnable) */
- @Override
- public <T> Task<T> submit(TaskAdaptable<T> task) { return submitInternal(Maps.newLinkedHashMap(), task.asTask()); }
-
- /** @see #submit(Map, Runnable) */
- @Override
- public <T> Task<T> submit(Map<?,?> properties, TaskAdaptable<T> task) { return submitInternal(properties, task.asTask()); }
-
- /**
- * Provided for compatibility
- *
- * Submit is preferred if a handle on the resulting Task is desired (although a task can be passed in so this is not always necessary)
- *
- * @see #submit(Map, Runnable)
- */
- public void execute(Runnable r) { submit(r); }
-
- /** does the work internally of submitting the task; note that the return value may be a wrapper task even if a task is passed in,
- * if the execution context where the target should run is different (e.g. submitting an effector task cross-context) */
- protected abstract <T> Task<T> submitInternal(Map<?,?> properties, Object task);
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java b/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java
deleted file mode 100644
index 8942a18..0000000
--- a/core/src/main/java/brooklyn/util/task/BasicExecutionContext.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import java.lang.reflect.Proxy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.management.ExecutionContext;
-import org.apache.brooklyn.api.management.ExecutionManager;
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.api.management.entitlement.EntitlementContext;
-import org.apache.brooklyn.core.management.entitlement.Entitlements;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.BrooklynTaskTags;
-import brooklyn.entity.basic.BrooklynTaskTags.WrappedEntity;
-import brooklyn.entity.basic.EntityInternal;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-
-/**
- * A means of executing tasks against an ExecutionManager with a given bucket/set of tags pre-defined
- * (so that it can look like an {@link Executor} and also supply {@link ExecutorService#submit(Callable)}
- */
-public class BasicExecutionContext extends AbstractExecutionContext {
-
- private static final Logger log = LoggerFactory.getLogger(BasicExecutionContext.class);
-
- static final ThreadLocal<BasicExecutionContext> perThreadExecutionContext = new ThreadLocal<BasicExecutionContext>();
-
- public static BasicExecutionContext getCurrentExecutionContext() { return perThreadExecutionContext.get(); }
-
- final ExecutionManager executionManager;
- final Set<Object> tags = new LinkedHashSet<Object>();
-
- public BasicExecutionContext(ExecutionManager executionManager) {
- this(Collections.emptyMap(), executionManager);
- }
-
- /**
- * Supported flags are {@code tag} and {@code tags}
- *
- * @see ExecutionManager#submit(Map, Task)
- */
- public BasicExecutionContext(Map<?, ?> flags, ExecutionManager executionManager) {
- this.executionManager = executionManager;
-
- if (flags.get("tag") != null) tags.add(flags.remove("tag"));
- if (flags.containsKey("tags")) tags.addAll((Collection<?>)flags.remove("tags"));
-
- // FIXME brooklyn-specific check, just for sanity
- // the context tag should always be a non-proxy entity, because that is what is passed to effector tasks
- // which may require access to internal methods
- for (Object tag: tags) {
- if (tag instanceof BrooklynTaskTags.WrappedEntity) {
- if (Proxy.isProxyClass(((WrappedEntity)tag).entity.getClass())) {
- log.warn(""+this+" has entity proxy in "+tag);
- }
- }
- }
- }
-
- public ExecutionManager getExecutionManager() {
- return executionManager;
- }
-
- /** returns tasks started by this context (or tasks which have all the tags on this object) */
- public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags((Set<?>)tags); }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) {
- if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>))
- return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask());
-
- Map properties = propertiesQ;
- if (properties.get("tags")==null) properties.put("tags", new ArrayList());
- Collection taskTags = (Collection)properties.get("tags");
-
- // FIXME some of this is brooklyn-specific logic, should be moved to a BrooklynExecContext subclass;
- // the issue is that we want to ensure that cross-entity calls switch execution contexts;
- // previously it was all very messy how that was handled (and it didn't really handle it in many cases)
- if (task instanceof Task<?>) taskTags.addAll( ((Task<?>)task).getTags() );
- Entity target = BrooklynTaskTags.getWrappedEntityOfType(taskTags, BrooklynTaskTags.TARGET_ENTITY);
-
- if (target!=null && !tags.contains(BrooklynTaskTags.tagForContextEntity(target))) {
- // task is switching execution context boundaries
- /*
- * longer notes:
- * you fall in to this block if the caller requests a target entity different to the current context
- * (e.g. where entity X is invoking an effector on Y, it will start in X's context,
- * but the effector should run in Y's context).
- *
- * if X is invoking an effector on himself in his own context, or a sensor or other task, it will not come in to this block.
- */
- final ExecutionContext tc = ((EntityInternal)target).getExecutionContext();
- if (log.isDebugEnabled())
- log.debug("Switching task context on execution of "+task+": from "+this+" to "+target+" (in "+Tasks.current()+")");
-
- if (task instanceof Task<?>) {
- final Task<T> t = (Task<T>)task;
- if (!Tasks.isQueuedOrSubmitted(t) && (!(Tasks.current() instanceof HasTaskChildren) ||
- !Iterables.contains( ((HasTaskChildren)Tasks.current()).getChildren(), t ))) {
- // this task is switching execution context boundaries _and_ it is not a child and not yet queued,
- // so wrap it in a task running in this context to keep a reference to the child
- // (this matters when we are navigating in the GUI; without it we lose the reference to the child
- // when browsing in the context of the parent)
- return submit(Tasks.<T>builder().name("Cross-context execution: "+t.getDescription()).dynamic(true).body(new Callable<T>() {
- public T call() {
- return DynamicTasks.get(t);
- }
- }).build());
- } else {
- // if we are already tracked by parent, just submit it
- return tc.submit(t);
- }
- } else {
- // as above, but here we are definitely not a child (what we are submitting isn't even a task)
- // (will only come here if properties defines tags including a target entity, which probably never happens)
- submit(Tasks.<T>builder().name("Cross-context execution").dynamic(true).body(new Callable<T>() {
- public T call() {
- if (task instanceof Callable) {
- return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Callable<T>)task).build() ).getUnchecked();
- } else if (task instanceof Runnable) {
- return DynamicTasks.queue( Tasks.<T>builder().dynamic(false).body((Runnable)task).build() ).getUnchecked();
- } else {
- throw new IllegalArgumentException("Unhandled task type: "+task+"; type="+(task!=null ? task.getClass() : "null"));
- }
- }
- }).build());
- }
- }
-
- EntitlementContext entitlementContext = BrooklynTaskTags.getEntitlement(taskTags);
- if (entitlementContext==null)
- entitlementContext = Entitlements.getEntitlementContext();
- if (entitlementContext!=null) {
- taskTags.add(BrooklynTaskTags.tagForEntitlement(entitlementContext));
- }
-
- taskTags.addAll(tags);
-
- if (Tasks.current()!=null && BrooklynTaskTags.isTransient(Tasks.current())
- && !taskTags.contains(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG) && !taskTags.contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
- // tag as transient if submitter is transient, unless explicitly tagged as non-transient
- taskTags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
- }
-
- final Object startCallback = properties.get("newTaskStartCallback");
- properties.put("newTaskStartCallback", new Function<Object,Void>() {
- public Void apply(Object it) {
- registerPerThreadExecutionContext();
- if (startCallback!=null) ExecutionUtils.invoke(startCallback, it);
- return null;
- }});
-
- final Object endCallback = properties.get("newTaskEndCallback");
- properties.put("newTaskEndCallback", new Function<Object,Void>() {
- public Void apply(Object it) {
- try {
- if (endCallback!=null) ExecutionUtils.invoke(endCallback, it);
- } finally {
- clearPerThreadExecutionContext();
- }
- return null;
- }});
-
- if (task instanceof Task) {
- return executionManager.submit(properties, (Task)task);
- } else if (task instanceof Callable) {
- return executionManager.submit(properties, (Callable)task);
- } else if (task instanceof Runnable) {
- return (Task<T>) executionManager.submit(properties, (Runnable)task);
- } else {
- throw new IllegalArgumentException("Unhandled task type: task="+task+"; type="+(task!=null ? task.getClass() : "null"));
- }
- }
-
- private void registerPerThreadExecutionContext() { perThreadExecutionContext.set(this); }
-
- private void clearPerThreadExecutionContext() { perThreadExecutionContext.remove(); }
-
- @Override
- public boolean isShutdown() {
- return getExecutionManager().isShutdown();
- }
-
- @Override
- public String toString() {
- return super.toString()+"("+tags+")";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java b/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
deleted file mode 100644
index 13d035b..0000000
--- a/core/src/main/java/brooklyn/util/task/BasicExecutionManager.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.util.task;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.brooklyn.api.management.ExecutionManager;
-import org.apache.brooklyn.api.management.HasTaskChildren;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.api.management.TaskAdaptable;
-import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.text.Identifiers;
-
-import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Manages the execution of atomic tasks and scheduled (recurring) tasks,
- * including setting tags and invoking callbacks.
- */
-public class BasicExecutionManager implements ExecutionManager {
- private static final Logger log = LoggerFactory.getLogger(BasicExecutionManager.class);
-
- private static final boolean RENAME_THREADS = BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_RENAME_THREADS);
-
- private static class PerThreadCurrentTaskHolder {
- public static final ThreadLocal<Task<?>> perThreadCurrentTask = new ThreadLocal<Task<?>>();
- }
-
- public static ThreadLocal<Task<?>> getPerThreadCurrentTask() {
- return PerThreadCurrentTaskHolder.perThreadCurrentTask;
- }
-
- private final ThreadFactory threadFactory;
-
- private final ThreadFactory daemonThreadFactory;
-
- private final ExecutorService runner;
-
- private final ScheduledExecutorService delayedRunner;
-
- // TODO Could have a set of all knownTasks; but instead we're having a separate set per tag,
- // so the same task could be listed multiple times if it has multiple tags...
-
- //access to this field AND to members in this field is synchronized,
- //to allow us to preserve order while guaranteeing thread-safe
- //(but more testing is needed before we are completely sure it is thread-safe!)
- //synch blocks are as finely grained as possible for efficiency;
- //NB CopyOnWriteArraySet is a perf bottleneck, and the simple map makes it easier to remove when a tag is empty
- private Map<Object,Set<Task<?>>> tasksByTag = new HashMap<Object,Set<Task<?>>>();
-
- private ConcurrentMap<String,Task<?>> tasksById = new ConcurrentHashMap<String,Task<?>>();
-
- private ConcurrentMap<Object, TaskScheduler> schedulerByTag = new ConcurrentHashMap<Object, TaskScheduler>();
-
- /** count of all tasks submitted, including finished */
- private final AtomicLong totalTaskCount = new AtomicLong();
-
- /** tasks submitted but not yet done (or in cases of interruption/cancelled not yet GC'd) */
- private Map<String,String> incompleteTaskIds = new ConcurrentHashMap<String,String>();
-
- /** tasks started but not yet finished */
- private final AtomicInteger activeTaskCount = new AtomicInteger();
-
- private final List<ExecutionListener> listeners = new CopyOnWriteArrayList<ExecutionListener>();
-
- private final static ThreadLocal<String> threadOriginalName = new ThreadLocal<String>() {
- protected String initialValue() {
- // should not happen, as only access is in _afterEnd with a check that _beforeStart was invoked
- log.warn("No original name recorded for thread "+Thread.currentThread().getName()+"; task "+Tasks.current());
- return "brooklyn-thread-pool-"+Identifiers.makeRandomId(8);
- }
- };
-
- public BasicExecutionManager(String contextid) {
- threadFactory = newThreadFactory(contextid);
- daemonThreadFactory = new ThreadFactoryBuilder()
- .setThreadFactory(threadFactory)
- .setDaemon(true)
- .build();
-
- // use Executors.newCachedThreadPool(daemonThreadFactory), but timeout of 1s rather than 60s for better shutdown!
- runner = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- daemonThreadFactory);
-
- delayedRunner = new ScheduledThreadPoolExecutor(1, daemonThreadFactory);
- }
-
- private final static class UncaughtExceptionHandlerImplementation implements Thread.UncaughtExceptionHandler {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- log.error("Uncaught exception in thread "+t.getName(), e);
- }
- }
-
- /**
- * For use by overriders to use custom thread factory.
- * But be extremely careful: called by constructor, so before sub-class' constructor will
- * have been invoked!
- */
- protected ThreadFactory newThreadFactory(String contextid) {
- return new ThreadFactoryBuilder()
- .setNameFormat("brooklyn-execmanager-"+contextid+"-%d")
- .setUncaughtExceptionHandler(new UncaughtExceptionHandlerImplementation())
- .build();
- }
-
- public void shutdownNow() {
- runner.shutdownNow();
- delayedRunner.shutdownNow();
- }
-
- public void addListener(ExecutionListener listener) {
- listeners.add(listener);
- }
-
- public void removeListener(ExecutionListener listener) {
- listeners.remove(listener);
- }
-
- /**
- * Deletes the given tag, including all tasks using this tag.
- *
- * Useful, for example, if an entity is being expunged so that we don't keep holding
- * a reference to it as a tag.
- */
- public void deleteTag(Object tag) {
- Set<Task<?>> tasks;
- synchronized (tasksByTag) {
- tasks = tasksByTag.remove(tag);
- }
- if (tasks != null) {
- for (Task<?> task : tasks) {
- deleteTask(task);
- }
- }
- }
-
- public void deleteTask(Task<?> task) {
- boolean removed = deleteTaskNonRecursive(task);
- if (!removed) return;
-
- if (task instanceof HasTaskChildren) {
- List<Task<?>> children = ImmutableList.copyOf(((HasTaskChildren)task).getChildren());
- for (Task<?> child : children) {
- deleteTask(child);
- }
- }
- }
-
- protected boolean deleteTaskNonRecursive(Task<?> task) {
- Set<?> tags = checkNotNull(task, "task").getTags();
- for (Object tag : tags) {
- synchronized (tasksByTag) {
- Set<Task<?>> tasks = tasksWithTagLiveOrNull(tag);
- if (tasks != null) {
- tasks.remove(task);
- if (tasks.isEmpty()) {
- tasksByTag.remove(tag);
- }
- }
- }
- }
- Task<?> removed = tasksById.remove(task.getId());
- incompleteTaskIds.remove(task.getId());
- if (removed!=null && removed.isSubmitted() && !removed.isDone()) {
- log.warn("Deleting submitted task before completion: "+removed+"; this task will continue to run in the background outwith "+this+", but perhaps it should have been cancelled?");
- }
- return removed != null;
- }
-
- public boolean isShutdown() {
- return runner.isShutdown();
- }
-
- /** count of all tasks submitted */
- public long getTotalTasksSubmitted() {
- return totalTaskCount.get();
- }
-
- /** count of tasks submitted but not ended */
- public long getNumIncompleteTasks() {
- return incompleteTaskIds.size();
- }
-
- /** count of tasks started but not ended */
- public long getNumActiveTasks() {
- return activeTaskCount.get();
- }
-
- /** count of tasks kept in memory, often including ended tasks */
- public long getNumInMemoryTasks() {
- return tasksById.size();
- }
-
- private Set<Task<?>> tasksWithTagCreating(Object tag) {
- Preconditions.checkNotNull(tag);
- synchronized (tasksByTag) {
- Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
- if (result==null) {
- result = Collections.synchronizedSet(new LinkedHashSet<Task<?>>());
- tasksByTag.put(tag, result);
- }
- return result;
- }
- }
-
- /** exposes live view, for internal use only */
- @Beta
- public Set<Task<?>> tasksWithTagLiveOrNull(Object tag) {
- synchronized (tasksByTag) {
- return tasksByTag.get(tag);
- }
- }
-
- @Override
- public Task<?> getTask(String id) {
- return tasksById.get(id);
- }
-
- /** not on interface because potentially expensive */
- public List<Task<?>> getAllTasks() {
- // not sure if synching makes any difference; have not observed CME's yet
- // (and so far this is only called when a CME was caught on a previous operation)
- synchronized (tasksById) {
- return MutableList.copyOf(tasksById.values());
- }
- }
-
- @Override
- public Set<Task<?>> getTasksWithTag(Object tag) {
- Set<Task<?>> result = tasksWithTagLiveOrNull(tag);
- if (result==null) return Collections.emptySet();
- synchronized (result) {
- return (Set<Task<?>>)Collections.unmodifiableSet(new LinkedHashSet<Task<?>>(result));
- }
- }
-
- @Override
- public Set<Task<?>> getTasksWithAnyTag(Iterable<?> tags) {
- Set<Task<?>> result = new LinkedHashSet<Task<?>>();
- Iterator<?> ti = tags.iterator();
- while (ti.hasNext()) {
- Set<Task<?>> tasksForTag = tasksWithTagLiveOrNull(ti.next());
- if (tasksForTag!=null) {
- synchronized (tasksForTag) {
- result.addAll(tasksForTag);
- }
- }
- }
- return Collections.unmodifiableSet(result);
- }
-
- /** only works with at least one tag; returns empty if no tags */
- @Override
- public Set<Task<?>> getTasksWithAllTags(Iterable<?> tags) {
- //NB: for this method retrieval for multiple tags could be made (much) more efficient (if/when it is used with multiple tags!)
- //by first looking for the least-used tag, getting those tasks, and then for each of those tasks
- //checking whether it contains the other tags (looking for second-least used, then third-least used, etc)
- Set<Task<?>> result = new LinkedHashSet<Task<?>>();
- boolean first = true;
- Iterator<?> ti = tags.iterator();
- while (ti.hasNext()) {
- Object tag = ti.next();
- if (first) {
- first = false;
- result.addAll(getTasksWithTag(tag));
- } else {
- result.retainAll(getTasksWithTag(tag));
- }
- }
- return Collections.unmodifiableSet(result);
- }
-
- /** live view of all tasks, for internal use only */
- @Beta
- public Collection<Task<?>> allTasksLive() { return tasksById.values(); }
-
- public Set<Object> getTaskTags() {
- synchronized (tasksByTag) {
- return Collections.unmodifiableSet(Sets.newLinkedHashSet(tasksByTag.keySet()));
- }
- }
-
- public Task<?> submit(Runnable r) { return submit(new LinkedHashMap<Object,Object>(1), r); }
- public Task<?> submit(Map<?,?> flags, Runnable r) { return submit(flags, new BasicTask<Void>(flags, r)); }
-
- public <T> Task<T> submit(Callable<T> c) { return submit(new LinkedHashMap<Object,Object>(1), c); }
- public <T> Task<T> submit(Map<?,?> flags, Callable<T> c) { return submit(flags, new BasicTask<T>(flags, c)); }
-
- public <T> Task<T> submit(TaskAdaptable<T> t) { return submit(new LinkedHashMap<Object,Object>(1), t); }
- public <T> Task<T> submit(Map<?,?> flags, TaskAdaptable<T> task) {
- if (!(task instanceof Task))
- task = task.asTask();
- synchronized (task) {
- if (((TaskInternal<?>)task).getInternalFuture()!=null) return (Task<T>)task;
- return submitNewTask(flags, (Task<T>) task);
- }
- }
-
- public <T> Task<T> scheduleWith(Task<T> task) { return scheduleWith(Collections.emptyMap(), task); }
- public <T> Task<T> scheduleWith(Map<?,?> flags, Task<T> task) {
- synchronized (task) {
- if (((TaskInternal<?>)task).getInternalFuture()!=null) return task;
- return submitNewTask(flags, task);
- }
- }
-
- protected Task<?> submitNewScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
- tasksById.put(task.getId(), task);
- totalTaskCount.incrementAndGet();
-
- beforeSubmitScheduledTaskAllIterations(flags, task);
-
- return submitSubsequentScheduledTask(flags, task);
- }
-
- @SuppressWarnings("unchecked")
- protected Task<?> submitSubsequentScheduledTask(final Map<?,?> flags, final ScheduledTask task) {
- if (!task.isDone()) {
- task.internalFuture = delayedRunner.schedule(new ScheduledTaskCallable(task, flags),
- task.delay.toNanoseconds(), TimeUnit.NANOSECONDS);
- } else {
- afterEndScheduledTaskAllIterations(flags, task);
- }
- return task;
- }
-
- protected class ScheduledTaskCallable implements Callable<Object> {
- public ScheduledTask task;
- public Map<?,?> flags;
-
- public ScheduledTaskCallable(ScheduledTask task, Map<?, ?> flags) {
- this.task = task;
- this.flags = flags;
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Object call() {
- if (task.startTimeUtc==-1) task.startTimeUtc = System.currentTimeMillis();
- TaskInternal<?> taskScheduled = null;
- try {
- beforeStartScheduledTaskSubmissionIteration(flags, task);
- taskScheduled = (TaskInternal<?>) task.newTask();
- taskScheduled.setSubmittedByTask(task);
- final Callable<?> oldJob = taskScheduled.getJob();
- final TaskInternal<?> taskScheduledF = taskScheduled;
- taskScheduled.setJob(new Callable() { public Object call() {
- boolean resubmitted = false;
- task.recentRun = taskScheduledF;
- try {
- synchronized (task) {
- task.notifyAll();
- }
- Object result;
- try {
- result = oldJob.call();
- } catch (Exception e) {
- if (!Tasks.isInterrupted()) {
- log.warn("Error executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution", e);
- } else {
- log.debug("Interrupted executing "+oldJob+" (scheduled job of "+task+" - "+task.getDescription()+"); cancelling scheduled execution: "+e);
- }
- throw Exceptions.propagate(e);
- }
- task.runCount++;
- if (task.period!=null && !task.isCancelled()) {
- task.delay = task.period;
- submitSubsequentScheduledTask(flags, task);
- resubmitted = true;
- }
- return result;
- } finally {
- // do in finally block in case we were interrupted
- if (!resubmitted)
- afterEndScheduledTaskAllIterations(flags, task);
- }
- }});
- task.nextRun = taskScheduled;
- BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext();
- if (ec!=null) return ec.submit(taskScheduled);
- else return submit(taskScheduled);
- } finally {
- afterEndScheduledTaskSubmissionIteration(flags, task, taskScheduled);
- }
- }
-
- @Override
- public String toString() {
- return "ScheduledTaskCallable["+task+","+flags+"]";
- }
- }
-
- private final class SubmissionCallable<T> implements Callable<T> {
- private final Map<?, ?> flags;
- private final Task<T> task;
-
- private SubmissionCallable(Map<?, ?> flags, Task<T> task) {
- this.flags = flags;
- this.task = task;
- }
-
- public T call() {
- try {
- T result = null;
- Throwable error = null;
- String oldThreadName = Thread.currentThread().getName();
- try {
- if (RENAME_THREADS) {
- String newThreadName = oldThreadName+"-"+task.getDisplayName()+
- "["+task.getId().substring(0, 8)+"]";
- Thread.currentThread().setName(newThreadName);
- }
- beforeStartAtomicTask(flags, task);
- if (!task.isCancelled()) {
- result = ((TaskInternal<T>)task).getJob().call();
- } else throw new CancellationException();
- } catch(Throwable e) {
- error = e;
- } finally {
- if (RENAME_THREADS) {
- Thread.currentThread().setName(oldThreadName);
- }
- afterEndAtomicTask(flags, task);
- }
- if (error!=null) {
- /* we throw, after logging debug.
- * the throw means the error is available for task submitters to monitor.
- * however it is possible no one is monitoring it, in which case we will have debug logging only for errors.
- * (the alternative, of warn-level logging in lots of places where we don't want it, seems worse!)
- */
- if (log.isDebugEnabled()) {
- // debug only here, because most submitters will handle failures
- log.debug("Exception running task "+task+" (rethrowing): "+error.getMessage(), error);
- if (log.isTraceEnabled())
- log.trace("Trace for exception running task "+task+" (rethrowing): "+error.getMessage(), error);
- }
- throw Exceptions.propagate(error);
- }
- return result;
- } finally {
- ((TaskInternal<?>)task).runListeners();
- }
- }
-
- @Override
- public String toString() {
- return "BEM.call("+task+","+flags+")";
- }
- }
-
- private final static class ListenableForwardingFutureForTask<T> extends ListenableForwardingFuture<T> {
- private final Task<T> task;
-
- private ListenableForwardingFutureForTask(Future<T> delegate, ExecutionList list, Task<T> task) {
- super(delegate, list);
- this.task = task;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- boolean result = false;
- if (!task.isCancelled()) result |= task.cancel(mayInterruptIfRunning);
- result |= super.cancel(mayInterruptIfRunning);
- ((TaskInternal<?>)task).runListeners();
- return result;
- }
- }
-
- private final class SubmissionListenerToCallOtherListeners<T> implements Runnable {
- private final Task<T> task;
-
- private SubmissionListenerToCallOtherListeners(Task<T> task) {
- this.task = task;
- }
-
- @Override
- public void run() {
- try {
- ((TaskInternal<?>)task).runListeners();
- } catch (Exception e) {
- log.warn("Error running task listeners for task "+task+" done", e);
- }
-
- for (ExecutionListener listener : listeners) {
- try {
- listener.onTaskDone(task);
- } catch (Exception e) {
- log.warn("Error running execution listener "+listener+" of task "+task+" done", e);
- }
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- protected <T> Task<T> submitNewTask(final Map<?,?> flags, final Task<T> task) {
- if (task instanceof ScheduledTask)
- return (Task<T>) submitNewScheduledTask(flags, (ScheduledTask)task);
-
- tasksById.put(task.getId(), task);
- totalTaskCount.incrementAndGet();
-
- beforeSubmitAtomicTask(flags, task);
-
- if (((TaskInternal<T>)task).getJob() == null)
- throw new NullPointerException("Task "+task+" submitted with with null job: job must be supplied.");
-
- Callable<T> job = new SubmissionCallable<T>(flags, task);
-
- // If there's a scheduler then use that; otherwise execute it directly
- Set<TaskScheduler> schedulers = null;
- for (Object tago: task.getTags()) {
- TaskScheduler scheduler = getTaskSchedulerForTag(tago);
- if (scheduler!=null) {
- if (schedulers==null) schedulers = new LinkedHashSet<TaskScheduler>(2);
- schedulers.add(scheduler);
- }
- }
- Future<T> future;
- if (schedulers!=null && !schedulers.isEmpty()) {
- if (schedulers.size()>1) log.warn("multiple schedulers detected, using only the first, for "+task+": "+schedulers);
- future = schedulers.iterator().next().submit(job);
- } else {
- future = runner.submit(job);
- }
- // on completion, listeners get triggered above; here, below we ensure they get triggered on cancel
- // (and we make sure the same ExecutionList is used in the future as in the task)
- ListenableFuture<T> listenableFuture = new ListenableForwardingFutureForTask<T>(future, ((TaskInternal<T>)task).getListeners(), task);
- // doesn't matter whether the listener is added to the listenableFuture or the task,
- // except that for the task we can more easily wrap it so that it only logs debug if the executor is shutdown
- // (avoid a bunch of ugly warnings in tests which start and stop things a lot!)
- // [probably even nicer to run this in the same thread, it doesn't do much; but that is messier to implement]
- ((TaskInternal<T>)task).addListener(new SubmissionListenerToCallOtherListeners<T>(task), runner);
-
- ((TaskInternal<T>)task).initInternalFuture(listenableFuture);
-
- return task;
- }
-
- protected void beforeSubmitScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
- internalBeforeSubmit(flags, task);
- }
- protected void beforeSubmitAtomicTask(Map<?,?> flags, Task<?> task) {
- internalBeforeSubmit(flags, task);
- }
- /** invoked when a task is submitted */
- protected void internalBeforeSubmit(Map<?,?> flags, Task<?> task) {
- incompleteTaskIds.put(task.getId(), task.getId());
-
- Task<?> currentTask = Tasks.current();
- if (currentTask!=null) ((TaskInternal<?>)task).setSubmittedByTask(currentTask);
- ((TaskInternal<?>)task).setSubmitTimeUtc(System.currentTimeMillis());
-
- if (flags.get("tag")!=null) ((TaskInternal<?>)task).getMutableTags().add(flags.remove("tag"));
- if (flags.get("tags")!=null) ((TaskInternal<?>)task).getMutableTags().addAll((Collection<?>)flags.remove("tags"));
-
- for (Object tag: ((TaskInternal<?>)task).getTags()) {
- tasksWithTagCreating(tag).add(task);
- }
- }
-
- protected void beforeStartScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> task) {
- internalBeforeStart(flags, task);
- }
- protected void beforeStartAtomicTask(Map<?,?> flags, Task<?> task) {
- internalBeforeStart(flags, task);
- }
-
- /** invoked in a task's thread when a task is starting to run (may be some time after submitted),
- * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */
- protected void internalBeforeStart(Map<?,?> flags, Task<?> task) {
- activeTaskCount.incrementAndGet();
-
- //set thread _before_ start time, so we won't get a null thread when there is a start-time
- if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task);
- if (!task.isCancelled()) {
- Thread thread = Thread.currentThread();
- ((TaskInternal<?>)task).setThread(thread);
- if (RENAME_THREADS) {
- threadOriginalName.set(thread.getName());
- String newThreadName = "brooklyn-" + CaseFormat.LOWER_HYPHEN.to(CaseFormat.LOWER_CAMEL, task.getDisplayName().replace(" ", "")) + "-" + task.getId().substring(0, 8);
- thread.setName(newThreadName);
- }
- PerThreadCurrentTaskHolder.perThreadCurrentTask.set(task);
- ((TaskInternal<?>)task).setStartTimeUtc(System.currentTimeMillis());
- }
- ExecutionUtils.invoke(flags.get("newTaskStartCallback"), task);
- }
-
- /** normally (if not interrupted) called once for each call to {@link #beforeSubmitScheduledTaskAllIterations(Map, Task)} */
- protected void afterEndScheduledTaskAllIterations(Map<?,?> flags, Task<?> task) {
- internalAfterEnd(flags, task, false, true);
- }
- /** called once for each call to {@link #beforeStartScheduledTaskSubmissionIteration(Map, Task)},
- * with a per-iteration task generated by the surrounding scheduled task */
- protected void afterEndScheduledTaskSubmissionIteration(Map<?,?> flags, Task<?> scheduledTask, Task<?> taskIteration) {
- internalAfterEnd(flags, scheduledTask, true, false);
- }
- /** called once for each task on which {@link #beforeStartAtomicTask(Map, Task)} is invoked,
- * and normally (if not interrupted prior to start)
- * called once for each task on which {@link #beforeSubmitAtomicTask(Map, Task)} */
- protected void afterEndAtomicTask(Map<?,?> flags, Task<?> task) {
- internalAfterEnd(flags, task, true, true);
- }
- /** normally (if not interrupted) called once for each call to {@link #internalBeforeSubmit(Map, Task)},
- * and, for atomic tasks and scheduled-task submission iterations where
- * always called once if {@link #internalBeforeStart(Map, Task)} is invoked and in the same thread as that method */
- protected void internalAfterEnd(Map<?,?> flags, Task<?> task, boolean startedInThisThread, boolean isEndingAllIterations) {
- if (log.isTraceEnabled()) log.trace(this+" afterEnd, task: "+task);
- if (startedInThisThread) {
- activeTaskCount.decrementAndGet();
- }
- if (isEndingAllIterations) {
- incompleteTaskIds.remove(task.getId());
- ExecutionUtils.invoke(flags.get("newTaskEndCallback"), task);
- ((TaskInternal<?>)task).setEndTimeUtc(System.currentTimeMillis());
- }
-
- if (startedInThisThread) {
- PerThreadCurrentTaskHolder.perThreadCurrentTask.remove();
- //clear thread _after_ endTime set, so we won't get a null thread when there is no end-time
- if (RENAME_THREADS && startedInThisThread) {
- Thread thread = task.getThread();
- if (thread==null) {
- log.warn("BasicTask.afterEnd invoked without corresponding beforeStart");
- } else {
- thread.setName(threadOriginalName.get());
- threadOriginalName.remove();
- }
- }
- ((TaskInternal<?>)task).setThread(null);
- }
- synchronized (task) { task.notifyAll(); }
- }
-
- public TaskScheduler getTaskSchedulerForTag(Object tag) {
- return schedulerByTag.get(tag);
- }
-
- public void setTaskSchedulerForTag(Object tag, Class<? extends TaskScheduler> scheduler) {
- synchronized (schedulerByTag) {
- TaskScheduler old = getTaskSchedulerForTag(tag);
- if (old!=null) {
- if (scheduler.isAssignableFrom(old.getClass())) {
- /* already have such an instance */
- return;
- }
- //might support multiple in future...
- throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+", has "+old+", setting new "+scheduler+")");
- }
- try {
- TaskScheduler schedulerI = scheduler.newInstance();
- // allow scheduler to have a nice name, for logging etc
- if (schedulerI instanceof CanSetName) ((CanSetName)schedulerI).setName(""+tag);
- setTaskSchedulerForTag(tag, schedulerI);
- } catch (InstantiationException e) {
- throw Exceptions.propagate(e);
- } catch (IllegalAccessException e) {
- throw Exceptions.propagate(e);
- }
- }
- }
-
- /**
- * Defines a {@link TaskScheduler} to run on all subsequently submitted jobs with the given tag.
- *
- * Maximum of one allowed currently. Resubmissions of the same scheduler (or scheduler class)
- * allowed. If changing, you must call {@link #clearTaskSchedulerForTag(Object)} between the two.
- *
- * @see #setTaskSchedulerForTag(Object, Class)
- */
- public void setTaskSchedulerForTag(Object tag, TaskScheduler scheduler) {
- synchronized (schedulerByTag) {
- scheduler.injectExecutor(runner);
-
- Object old = schedulerByTag.put(tag, scheduler);
- if (old!=null && old!=scheduler) {
- //might support multiple in future...
- throw new IllegalStateException("Not allowed to set multiple TaskSchedulers on ExecutionManager tag (tag "+tag+")");
- }
- }
- }
-
- /**
- * Forgets that any scheduler was associated with a tag.
- *
- * @see #setTaskSchedulerForTag(Object, TaskScheduler)
- * @see #setTaskSchedulerForTag(Object, Class)
- */
- public boolean clearTaskSchedulerForTag(Object tag) {
- synchronized (schedulerByTag) {
- Object old = schedulerByTag.remove(tag);
- return (old!=null);
- }
- }
-
- @VisibleForTesting
- public ConcurrentMap<Object, TaskScheduler> getSchedulerByTag() {
- return schedulerByTag;
- }
-
-}