You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/12/23 12:06:36 UTC
[13/71] [abbrv] incubator-brooklyn git commit: Merge commit 'e430723'
into reorg2
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java
index 0000000,ebeccd7..2c02874
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypePredicates.java
@@@ -1,0 -1,180 +1,257 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.core.typereg;
+
+ import javax.annotation.Nullable;
+
-import org.apache.brooklyn.api.catalog.BrooklynCatalog;
+ import org.apache.brooklyn.api.entity.Application;
+ import org.apache.brooklyn.api.entity.Entity;
+ import org.apache.brooklyn.api.location.Location;
+ import org.apache.brooklyn.api.mgmt.ManagementContext;
+ import org.apache.brooklyn.api.policy.Policy;
+ import org.apache.brooklyn.api.typereg.RegisteredType;
++import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext;
+ import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
++import org.apache.brooklyn.util.collections.CollectionFunctionals;
+
+ import com.google.common.base.Function;
+ import com.google.common.base.Predicate;
+ import com.google.common.base.Predicates;
+
+ public class RegisteredTypePredicates {
+
+ public static Predicate<RegisteredType> deprecated(final boolean deprecated) {
+ return new DeprecatedEqualTo(deprecated);
+ }
+
+ private static class DeprecatedEqualTo implements Predicate<RegisteredType> {
+ private final boolean deprecated;
+
+ public DeprecatedEqualTo(boolean deprecated) {
+ this.deprecated = deprecated;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ return (item != null) && item.isDeprecated() == deprecated;
+ }
+ }
+
+ public static Predicate<RegisteredType> disabled(boolean disabled) {
+ return new DisabledEqualTo(disabled);
+ }
+
+ private static class DisabledEqualTo implements Predicate<RegisteredType> {
+ private final boolean disabled;
+
+ public DisabledEqualTo(boolean disabled) {
+ this.disabled = disabled;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ return (item != null) && item.isDisabled() == disabled;
+ }
+ }
+
+ public static final Function<RegisteredType,String> ID_OF_ITEM_TRANSFORMER = new IdOfItemTransformer();
+
+ private static class IdOfItemTransformer implements Function<RegisteredType,String> {
+ @Override @Nullable
+ public String apply(@Nullable RegisteredType input) {
+ if (input==null) return null;
+ return input.getId();
+ }
+ };
+
+ public static Predicate<RegisteredType> displayName(final Predicate<? super String> filter) {
+ return new DisplayNameMatches(filter);
+ }
+
+ private static class DisplayNameMatches implements Predicate<RegisteredType> {
+ private final Predicate<? super String> filter;
+
+ public DisplayNameMatches(Predicate<? super String> filter) {
+ this.filter = filter;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ return (item != null) && filter.apply(item.getDisplayName());
+ }
+ }
+
++ public static Predicate<RegisteredType> symbolicName(final String name) {
++ return symbolicName(Predicates.equalTo(name));
++ }
+ public static Predicate<RegisteredType> symbolicName(final Predicate<? super String> filter) {
+ return new SymbolicNameMatches(filter);
+ }
+
+ private static class SymbolicNameMatches implements Predicate<RegisteredType> {
+ private final Predicate<? super String> filter;
+
+ public SymbolicNameMatches(Predicate<? super String> filter) {
+ this.filter = filter;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ return (item != null) && filter.apply(item.getSymbolicName());
+ }
+ }
+
++ public static Predicate<RegisteredType> version(final String name) {
++ return version(Predicates.equalTo(name));
++ }
++ public static Predicate<RegisteredType> version(final Predicate<? super String> filter) {
++ return new VersionMatches(filter);
++ }
++
++ private static class VersionMatches implements Predicate<RegisteredType> {
++ private final Predicate<? super String> filter;
++
++ public VersionMatches(Predicate<? super String> filter) {
++ this.filter = filter;
++ }
++ @Override
++ public boolean apply(@Nullable RegisteredType item) {
++ return (item != null) && filter.apply(item.getVersion());
++ }
++ }
++
++ public static Predicate<RegisteredType> alias(final String alias) {
++ return aliases(CollectionFunctionals.any(Predicates.equalTo(alias)));
++ }
++ public static Predicate<RegisteredType> aliases(final Predicate<? super Iterable<String>> filter) {
++ return new AliasesMatch(filter);
++ }
++
++ private static class AliasesMatch implements Predicate<RegisteredType> {
++ private final Predicate<? super Iterable<String>> filter;
++
++ public AliasesMatch(Predicate<? super Iterable<String>> filter) {
++ this.filter = filter;
++ }
++ @Override
++ public boolean apply(@Nullable RegisteredType item) {
++ return (item != null) && filter.apply(item.getAliases());
++ }
++ }
++
++ public static Predicate<RegisteredType> tag(final Object tag) {
++ return tags(CollectionFunctionals.any(Predicates.equalTo(tag)));
++ }
++ public static Predicate<RegisteredType> tags(final Predicate<? super Iterable<Object>> filter) {
++ return new TagsMatch(filter);
++ }
++
++ private static class TagsMatch implements Predicate<RegisteredType> {
++ private final Predicate<? super Iterable<Object>> filter;
++
++ public TagsMatch(Predicate<? super Iterable<Object>> filter) {
++ this.filter = filter;
++ }
++ @Override
++ public boolean apply(@Nullable RegisteredType item) {
++ return (item != null) && filter.apply(item.getTags());
++ }
++ }
++
+ public static <T> Predicate<RegisteredType> anySuperType(final Predicate<Class<T>> filter) {
+ return new AnySuperTypeMatches(filter);
+ }
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public static Predicate<RegisteredType> subtypeOf(final Class<?> filter) {
+ // the assignableFrom predicate checks if this class is assignable from the subsequent *input*.
+ // in other words, we're checking if any input is a subtype of this class
+ return anySuperType((Predicate)Predicates.assignableFrom(filter));
+ }
+
+ private static class AnySuperTypeMatches implements Predicate<RegisteredType> {
+ private final Predicate<Class<?>> filter;
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private AnySuperTypeMatches(Predicate filter) {
+ this.filter = filter;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ if (item==null) return false;
+ return RegisteredTypes.isAnyTypeOrSuperSatisfying(item.getSuperTypes(), filter);
+ }
+ }
+
+ public static final Predicate<RegisteredType> IS_APPLICATION = subtypeOf(Application.class);
+ public static final Predicate<RegisteredType> IS_ENTITY = subtypeOf(Entity.class);
+ public static final Predicate<RegisteredType> IS_LOCATION = subtypeOf(Location.class);
+ public static final Predicate<RegisteredType> IS_POLICY = subtypeOf(Policy.class);
+
+ public static Predicate<RegisteredType> entitledToSee(final ManagementContext mgmt) {
+ return new EntitledToSee(mgmt);
+ }
+
+ private static class EntitledToSee implements Predicate<RegisteredType> {
+ private final ManagementContext mgmt;
+
+ public EntitledToSee(ManagementContext mgmt) {
+ this.mgmt = mgmt;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ return (item != null) &&
+ Entitlements.isEntitled(mgmt.getEntitlementManager(), Entitlements.SEE_CATALOG_ITEM, item.getId());
+ }
+ }
+
+ public static Predicate<RegisteredType> isBestVersion(final ManagementContext mgmt) {
+ return new IsBestVersion(mgmt);
+ }
-
+ private static class IsBestVersion implements Predicate<RegisteredType> {
+ private final ManagementContext mgmt;
+
+ public IsBestVersion(ManagementContext mgmt) {
+ this.mgmt = mgmt;
+ }
+ @Override
+ public boolean apply(@Nullable RegisteredType item) {
+ return isBestVersion(mgmt, item);
+ }
+ }
-
+ public static boolean isBestVersion(ManagementContext mgmt, RegisteredType item) {
- RegisteredType bestVersion = mgmt.getTypeRegistry().get(item.getSymbolicName(), BrooklynCatalog.DEFAULT_VERSION);
- if (bestVersion==null) return false;
- return (bestVersion.getVersion().equals(item.getVersion()));
++ if (item==null) return false;
++ Iterable<RegisteredType> matches = mgmt.getTypeRegistry().getMatching(
++ RegisteredTypePredicates.symbolicName(item.getSymbolicName()) );
++ if (!matches.iterator().hasNext()) return false;
++ RegisteredType best = RegisteredTypes.getBestVersion(matches);
++ return (best.getVersion().equals(item.getVersion()));
++ }
++
++ public static Predicate<RegisteredType> satisfies(RegisteredTypeLoadingContext context) {
++ return new SatisfiesContext(context);
++ }
++ private static class SatisfiesContext implements Predicate<RegisteredType> {
++ private final RegisteredTypeLoadingContext context;
++
++ public SatisfiesContext(RegisteredTypeLoadingContext context) {
++ this.context = context;
++ }
++ @Override
++ public boolean apply(@Nullable RegisteredType item) {
++ return RegisteredTypes.tryValidate(item, context).isPresent();
++ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java
index 0000000,18f8f43..0c7b09b
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/core/typereg/RegisteredTypes.java
@@@ -1,0 -1,342 +1,426 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.core.typereg;
+
+ import java.lang.reflect.Method;
++import java.util.Comparator;
+ import java.util.Iterator;
+ import java.util.Map;
+ import java.util.Set;
+
+ import javax.annotation.Nullable;
+
+ import org.apache.brooklyn.api.catalog.CatalogItem;
+ import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec;
+ import org.apache.brooklyn.api.mgmt.ManagementContext;
+ import org.apache.brooklyn.api.objs.BrooklynObject;
+ import org.apache.brooklyn.api.typereg.BrooklynTypeRegistry.RegisteredTypeKind;
-import org.apache.brooklyn.api.typereg.OsgiBundleWithUrl;
+ import org.apache.brooklyn.api.typereg.RegisteredType;
+ import org.apache.brooklyn.api.typereg.RegisteredType.TypeImplementationPlan;
+ import org.apache.brooklyn.api.typereg.RegisteredTypeLoadingContext;
+ import org.apache.brooklyn.config.ConfigKey;
+ import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
+ import org.apache.brooklyn.core.config.ConfigKeys;
+ import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
+ import org.apache.brooklyn.core.typereg.JavaClassNameTypePlanTransformer.JavaClassNameTypeImplementationPlan;
+ import org.apache.brooklyn.util.exceptions.Exceptions;
+ import org.apache.brooklyn.util.guava.Maybe;
++import org.apache.brooklyn.util.guava.Maybe.Absent;
++import org.apache.brooklyn.util.text.NaturalOrderComparator;
++import org.apache.brooklyn.util.text.VersionComparator;
+ import org.apache.brooklyn.util.yaml.Yamls;
+
+ import com.google.common.annotations.Beta;
+ import com.google.common.base.Function;
+ import com.google.common.base.Predicate;
+ import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
++import com.google.common.collect.ComparisonChain;
++import com.google.common.collect.Ordering;
+ import com.google.common.reflect.TypeToken;
+
+ /**
+ * Utility and preferred creation mechanisms for working with {@link RegisteredType} instances.
+ * <p>
+ * Use {@link #bean(String, String, TypeImplementationPlan, Class)} and {@link #spec(String, String, TypeImplementationPlan, Class)}
+ * to create {@link RegisteredType} instances.
+ * <p>
+ * See {@link #isSubtypeOf(RegisteredType, Class)} or {@link #isSubtypeOf(RegisteredType, RegisteredType)} to
+ * inspect the type hierarchy.
+ */
+ public class RegisteredTypes {
+
+ @SuppressWarnings("serial")
+ static ConfigKey<Class<?>> ACTUAL_JAVA_TYPE = ConfigKeys.newConfigKey(new TypeToken<Class<?>>() {}, "java.type.actual",
+ "The actual Java type which will be instantiated (bean) or pointed at (spec)");
+
+ /** @deprecated since it was introduced in 0.9.0; for backwards compatibility only, may be removed at any point */
+ @Deprecated
+ static final Function<CatalogItem<?,?>,RegisteredType> CI_TO_RT = new Function<CatalogItem<?,?>, RegisteredType>() {
+ @Override
+ public RegisteredType apply(CatalogItem<?, ?> item) {
+ return of(item);
+ }
+ };
+
+ /** @deprecated since it was introduced in 0.9.0; for backwards compatibility only, may be removed at any point */
+ @Deprecated
+ public static RegisteredType of(CatalogItem<?, ?> item) {
+ if (item==null) return null;
+ TypeImplementationPlan impl = null;
+ if (item.getPlanYaml()!=null) {
+ impl = new BasicTypeImplementationPlan(null, item.getPlanYaml());
+ } else if (item.getJavaType()!=null) {
+ impl = new JavaClassNameTypeImplementationPlan(item.getJavaType());
+ } else {
+ throw new IllegalStateException("Unsupported catalog item "+item+" when trying to create RegisteredType");
+ }
+
+ BasicRegisteredType type = (BasicRegisteredType) spec(item.getSymbolicName(), item.getVersion(), impl, item.getCatalogItemJavaType());
- type.bundles = item.getLibraries()==null ? ImmutableList.<OsgiBundleWithUrl>of() : ImmutableList.<OsgiBundleWithUrl>copyOf(item.getLibraries());
+ type.displayName = item.getDisplayName();
+ type.description = item.getDescription();
+ type.iconUrl = item.getIconUrl();
++
+ type.disabled = item.isDisabled();
+ type.deprecated = item.isDeprecated();
++ if (item.getLibraries()!=null) type.bundles.addAll(item.getLibraries());
++ // aliases aren't on item
++ if (item.tags()!=null) type.tags.addAll(item.tags().getTags());
+
- // TODO
- // probably not: javaType, specType, registeredTypeName ...
- // maybe: tags ?
++ // these things from item we ignore: javaType, specType, registeredTypeName ...
+ return type;
+ }
+
+ /** Preferred mechanism for defining a bean {@link RegisteredType}. */
+ public static RegisteredType bean(String symbolicName, String version, TypeImplementationPlan plan, @Nullable Class<?> superType) {
+ return addSuperType(new BasicRegisteredType(RegisteredTypeKind.BEAN, symbolicName, version, plan), superType);
+ }
+
+ /** Preferred mechanism for defining a spec {@link RegisteredType}. */
+ // TODO we currently allow symbolicName and version to be null for the purposes of creation, internal only in BasicBrooklynTypeRegistry.createSpec
+ // (ideally the API in TypePlanTransformer can be changed so even that is not needed)
+ public static RegisteredType spec(String symbolicName, String version, TypeImplementationPlan plan, @Nullable Class<?> superType) {
+ return addSuperType(new BasicRegisteredType(RegisteredTypeKind.SPEC, symbolicName, version, plan), superType);
+ }
+
+ /** returns the {@link Class} object corresponding to the given java type name,
+ * using the cache on the type and the loader defined on the type
+ * @param mgmt */
+ @Beta
+ // TODO should this be on the AbstractTypePlanTransformer ?
+ public static Class<?> loadActualJavaType(String javaTypeName, ManagementContext mgmt, RegisteredType type, RegisteredTypeLoadingContext context) {
+ Class<?> result = ((BasicRegisteredType)type).getCache().get(ACTUAL_JAVA_TYPE);
+ if (result!=null) return result;
+
+ result = CatalogUtils.newClassLoadingContext(mgmt, type, context==null ? null : context.getLoader()).loadClass( javaTypeName );
+
+ ((BasicRegisteredType)type).getCache().put(ACTUAL_JAVA_TYPE, result);
+ return result;
+ }
+
+ @Beta
+ public static RegisteredType addSuperType(RegisteredType type, @Nullable Class<?> superType) {
+ if (superType!=null) {
+ ((BasicRegisteredType)type).superTypes.add(superType);
+ }
+ return type;
+ }
-
+ @Beta
+ public static RegisteredType addSuperType(RegisteredType type, @Nullable RegisteredType superType) {
+ if (superType!=null) {
+ if (isSubtypeOf(superType, type)) {
+ throw new IllegalStateException(superType+" declares "+type+" as a supertype; cannot set "+superType+" as a supertype of "+type);
+ }
+ ((BasicRegisteredType)type).superTypes.add(superType);
+ }
+ return type;
+ }
++ @Beta
++ public static RegisteredType addSuperTypes(RegisteredType type, Iterable<Object> superTypesAsClassOrRegisteredType) {
++ if (superTypesAsClassOrRegisteredType!=null) {
++ for (Object superType: superTypesAsClassOrRegisteredType) {
++ if (superType==null) {
++ // nothing
++ } else if (superType instanceof Class) {
++ addSuperType(type, (Class<?>)superType);
++ } else if (superType instanceof RegisteredType) {
++ addSuperType(type, (RegisteredType)superType);
++ } else {
++ throw new IllegalStateException(superType+" supplied as a supertype of "+type+" but it is not a supported supertype");
++ }
++ }
++ }
++ return type;
++ }
++
++ @Beta
++ public static RegisteredType addAlias(RegisteredType type, String alias) {
++ if (alias!=null) {
++ ((BasicRegisteredType)type).aliases.add( alias );
++ }
++ return type;
++ }
++ @Beta
++ public static RegisteredType addAliases(RegisteredType type, Iterable<String> aliases) {
++ if (aliases!=null) {
++ for (String alias: aliases) addAlias(type, alias);
++ }
++ return type;
++ }
++
++ @Beta
++ public static RegisteredType addTag(RegisteredType type, Object tag) {
++ if (tag!=null) {
++ ((BasicRegisteredType)type).tags.add( tag );
++ }
++ return type;
++ }
++ @Beta
++ public static RegisteredType addTags(RegisteredType type, Iterable<?> tags) {
++ if (tags!=null) {
++ for (Object tag: tags) addTag(type, tag);
++ }
++ return type;
++ }
+
+ /** returns the implementation data for a spec if it is a string (e.g. plan yaml or java class name); else throws */
+ @Beta
+ public static String getImplementationDataStringForSpec(RegisteredType item) {
+ if (item==null || item.getPlan()==null) return null;
+ Object data = item.getPlan().getPlanData();
+ if (!(data instanceof String)) throw new IllegalStateException("Expected plan data for "+item+" to be a string");
+ return (String)data;
+ }
+
+ /** returns an implementation of the spec class corresponding to the given target type;
+ * for use in {@link BrooklynTypePlanTransformer#create(RegisteredType, RegisteredTypeLoadingContext)}
+ * implementations when dealing with a spec; returns null if none found
+ * @param mgmt */
+ @Beta
+ public static AbstractBrooklynObjectSpec<?,?> newSpecInstance(ManagementContext mgmt, Class<? extends BrooklynObject> targetType) throws Exception {
+ Class<? extends AbstractBrooklynObjectSpec<?, ?>> specType = RegisteredTypeLoadingContexts.lookupSpecTypeForTarget(targetType);
+ if (specType==null) return null;
+ Method createMethod = specType.getMethod("create", Class.class);
+ return (AbstractBrooklynObjectSpec<?, ?>) createMethod.invoke(null, targetType);
+ }
+
+ /** Returns a wrapped map, if the object is YAML which parses as a map;
+ * otherwise returns absent capable of throwing an error with more details */
+ @SuppressWarnings("unchecked")
+ public static Maybe<Map<?,?>> getAsYamlMap(Object planData) {
+ if (!(planData instanceof String)) return Maybe.absent("not a string");
+ Iterable<Object> result;
+ try {
+ result = Yamls.parseAll((String)planData);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ return Maybe.absent(e);
+ }
+ Iterator<Object> ri = result.iterator();
+ if (!ri.hasNext()) return Maybe.absent("YAML has no elements in it");
+ Object r1 = ri.next();
+ if (ri.hasNext()) return Maybe.absent("YAML has multiple elements in it");
+ if (r1 instanceof Map) return (Maybe<Map<?,?>>)(Maybe<?>) Maybe.of(r1);
+ return Maybe.absent("YAML does not contain a map");
+ }
+
+ /**
+ * Queries recursively the supertypes of {@link RegisteredType} to see whether it
+ * inherits from the given {@link RegisteredType} */
+ public static boolean isSubtypeOf(RegisteredType type, RegisteredType superType) {
+ if (type.equals(superType)) return true;
+ for (Object st: type.getSuperTypes()) {
+ if (st instanceof RegisteredType) {
+ if (isSubtypeOf((RegisteredType)st, superType)) return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Queries recursively the supertypes of {@link RegisteredType} to see whether it
+ * inherits from the given {@link Class} */
+ public static boolean isSubtypeOf(RegisteredType type, Class<?> superType) {
+ return isAnyTypeSubtypeOf(type.getSuperTypes(), superType);
+ }
+
+ /**
+ * Queries recursively the given types (either {@link Class} or {@link RegisteredType})
+ * to see whether any inherit from the given {@link Class} */
+ public static boolean isAnyTypeSubtypeOf(Set<Object> candidateTypes, Class<?> superType) {
+ return isAnyTypeOrSuperSatisfying(candidateTypes, Predicates.assignableFrom(superType));
+ }
+
+ /**
+ * Queries recursively the given types (either {@link Class} or {@link RegisteredType})
+ * to see whether any java superclasses satisfy the given {@link Predicate} */
+ public static boolean isAnyTypeOrSuperSatisfying(Set<Object> candidateTypes, Predicate<Class<?>> filter) {
+ for (Object st: candidateTypes) {
+ if (st instanceof Class) {
+ if (filter.apply((Class<?>)st)) return true;
+ }
+ }
+ for (Object st: candidateTypes) {
+ if (st instanceof RegisteredType) {
+ if (isAnyTypeOrSuperSatisfying(((RegisteredType)st).getSuperTypes(), filter)) return true;
+ }
+ }
+ return false;
+ }
+
- public static RegisteredType validate(RegisteredType item, final RegisteredTypeLoadingContext constraint) {
- if (item==null || constraint==null) return item;
++ /** Validates that the given type matches the context (if supplied);
++ * if not satisfied. returns an {@link Absent} if failed with details of the error,
++ * with {@link Absent#isNull()} true if the object is null. */
++ public static Maybe<RegisteredType> tryValidate(RegisteredType item, final RegisteredTypeLoadingContext constraint) {
++ // kept as a Maybe in case someone wants a wrapper around item validity;
++ // unclear what the contract should be, as this can return Maybe.Present(null)
++ // which is suprising, but it is more natural to callers otherwise they'll likely do a separate null check on the item
++ // (often handling null different to errors) so the Maybe.get() is redundant as they have an object for the input anyway.
++
++ if (item==null || constraint==null) return Maybe.ofDisallowingNull(item);
+ if (constraint.getExpectedKind()!=null && !constraint.getExpectedKind().equals(item.getKind()))
- throw new IllegalStateException(item+" is not the expected kind "+constraint.getExpectedKind());
++ return Maybe.absent(item+" is not the expected kind "+constraint.getExpectedKind());
+ if (constraint.getExpectedJavaSuperType()!=null) {
+ if (!isSubtypeOf(item, constraint.getExpectedJavaSuperType())) {
- throw new IllegalStateException(item+" is not for the expected type "+constraint.getExpectedJavaSuperType());
++ return Maybe.absent(item+" is not for the expected type "+constraint.getExpectedJavaSuperType());
+ }
+ }
- return item;
++ return Maybe.of(item);
+ }
+
+ /**
+ * Checks whether the given object appears to be an instance of the given registered type */
+ private static boolean isSubtypeOf(Class<?> candidate, RegisteredType type) {
+ for (Object st: type.getSuperTypes()) {
+ if (st instanceof RegisteredType) {
+ if (!isSubtypeOf(candidate, (RegisteredType)st)) return false;
+ }
+ if (st instanceof Class) {
+ if (!((Class<?>)st).isAssignableFrom(candidate)) return false;
+ }
+ }
+ return true;
+ }
+
- public static <T> T validate(final T object, final RegisteredType type, final RegisteredTypeLoadingContext constraint) {
- RegisteredTypeKind kind = type!=null ? type.getKind() : constraint!=null ? constraint.getExpectedKind() : null;
++ public static RegisteredType getBestVersion(Iterable<RegisteredType> types) {
++ if (types==null || !types.iterator().hasNext()) return null;
++ return Ordering.from(RegisteredTypeComparator.INSTANCE).max(types);
++ }
++
++ public static class RegisteredTypeComparator implements Comparator<RegisteredType> {
++ public static Comparator<RegisteredType> INSTANCE = new RegisteredTypeComparator();
++ private RegisteredTypeComparator() {}
++ @Override
++ public int compare(RegisteredType o1, RegisteredType o2) {
++ return ComparisonChain.start()
++ .compareTrueFirst(o1.isDisabled(), o2.isDisabled())
++ .compareTrueFirst(o1.isDeprecated(), o2.isDeprecated())
++ .compare(o1.getSymbolicName(), o2.getSymbolicName(), NaturalOrderComparator.INSTANCE)
++ .compare(o1.getVersion(), o2.getVersion(), VersionComparator.INSTANCE)
++ .result();
++ }
++ }
++
++ /** validates that the given object (required) satisfies the constraints implied by the given
++ * type and context object, using {@link Maybe} as the result set absent containing the error(s)
++ * if not satisfied. returns an {@link Absent} if failed with details of the error,
++ * with {@link Absent#isNull()} true if the object is null. */
++ public static <T> Maybe<T> tryValidate(final T object, @Nullable final RegisteredType type, @Nullable final RegisteredTypeLoadingContext context) {
++ if (object==null) return Maybe.absentNull("object is null");
++
++ RegisteredTypeKind kind = type!=null ? type.getKind() : context!=null ? context.getExpectedKind() : null;
+ if (kind==null) {
+ if (object instanceof AbstractBrooklynObjectSpec) kind=RegisteredTypeKind.SPEC;
+ else kind=RegisteredTypeKind.BEAN;
+ }
- return new RegisteredTypeKindVisitor<T>() {
++ return new RegisteredTypeKindVisitor<Maybe<T>>() {
+ @Override
- protected T visitSpec() {
- return validateSpec(object, type, constraint);
++ protected Maybe<T> visitSpec() {
++ return tryValidateSpec(object, type, context);
+ }
+
+ @Override
- protected T visitBean() {
- return validateBean(object, type, constraint);
++ protected Maybe<T> visitBean() {
++ return tryValidateBean(object, type, context);
+ }
+ }.visit(kind);
+ }
+
- private static <T> T validateBean(T object, RegisteredType type, final RegisteredTypeLoadingContext constraint) {
- if (object==null) return null;
++ private static <T> Maybe<T> tryValidateBean(T object, RegisteredType type, final RegisteredTypeLoadingContext context) {
++ if (object==null) return Maybe.absentNull("object is null");
+
+ if (type!=null) {
+ if (type.getKind()!=RegisteredTypeKind.BEAN)
- throw new IllegalStateException("Validating a bean when type is "+type.getKind()+" "+type);
++ return Maybe.absent("Validating a bean when type is "+type.getKind()+" "+type);
+ if (!isSubtypeOf(object.getClass(), type))
- throw new IllegalStateException(object+" does not have all the java supertypes of "+type);
++ return Maybe.absent(object+" does not have all the java supertypes of "+type);
+ }
+
- if (constraint!=null) {
- if (constraint.getExpectedKind()!=RegisteredTypeKind.BEAN)
- throw new IllegalStateException("Validating a bean when constraint expected "+constraint.getExpectedKind());
- if (constraint.getExpectedJavaSuperType()!=null && !constraint.getExpectedJavaSuperType().isInstance(object))
- throw new IllegalStateException(object+" is not of the expected java supertype "+constraint.getExpectedJavaSuperType());
++ if (context!=null) {
++ if (context.getExpectedKind()!=RegisteredTypeKind.BEAN)
++ return Maybe.absent("Validating a bean when constraint expected "+context.getExpectedKind());
++ if (context.getExpectedJavaSuperType()!=null && !context.getExpectedJavaSuperType().isInstance(object))
++ return Maybe.absent(object+" is not of the expected java supertype "+context.getExpectedJavaSuperType());
+ }
+
- return object;
++ return Maybe.of(object);
+ }
+
- private static <T> T validateSpec(T object, RegisteredType rType, final RegisteredTypeLoadingContext constraint) {
- if (object==null) return null;
++ private static <T> Maybe<T> tryValidateSpec(T object, RegisteredType rType, final RegisteredTypeLoadingContext constraint) {
++ if (object==null) return Maybe.absentNull("object is null");
+
+ if (!(object instanceof AbstractBrooklynObjectSpec)) {
- throw new IllegalStateException("Found "+object+" when expecting a spec");
++ Maybe.absent("Found "+object+" when expecting a spec");
+ }
+ Class<?> targetType = ((AbstractBrooklynObjectSpec<?,?>)object).getType();
+
+ if (targetType==null) {
- throw new IllegalStateException("Spec "+object+" does not have a target type");
++ Maybe.absent("Spec "+object+" does not have a target type");
+ }
+
+ if (rType!=null) {
+ if (rType.getKind()!=RegisteredTypeKind.SPEC)
- throw new IllegalStateException("Validating a spec when type is "+rType.getKind()+" "+rType);
++ Maybe.absent("Validating a spec when type is "+rType.getKind()+" "+rType);
+ if (!isSubtypeOf(targetType, rType))
- throw new IllegalStateException(object+" does not have all the java supertypes of "+rType);
++ Maybe.absent(object+" does not have all the java supertypes of "+rType);
+ }
+
+ if (constraint!=null) {
+ if (constraint.getExpectedJavaSuperType()!=null) {
+ if (!constraint.getExpectedJavaSuperType().isAssignableFrom(targetType)) {
- throw new IllegalStateException(object+" does not target the expected java supertype "+constraint.getExpectedJavaSuperType());
++ Maybe.absent(object+" does not target the expected java supertype "+constraint.getExpectedJavaSuperType());
+ }
+ if (constraint.getExpectedJavaSuperType().isAssignableFrom(BrooklynObjectInternal.class)) {
+ // don't check spec type; any spec is acceptable
+ } else {
+ @SuppressWarnings("unchecked")
+ Class<? extends AbstractBrooklynObjectSpec<?, ?>> specType = RegisteredTypeLoadingContexts.lookupSpecTypeForTarget( (Class<? extends BrooklynObject>) constraint.getExpectedJavaSuperType());
+ if (specType==null) {
+ // means a problem in our classification of spec types!
- throw new IllegalStateException(object+" is returned as spec for unexpected java supertype "+constraint.getExpectedJavaSuperType());
++ Maybe.absent(object+" is returned as spec for unexpected java supertype "+constraint.getExpectedJavaSuperType());
+ }
+ if (!specType.isAssignableFrom(object.getClass())) {
- throw new IllegalStateException(object+" is not a spec of the expected java supertype "+constraint.getExpectedJavaSuperType());
++ Maybe.absent(object+" is not a spec of the expected java supertype "+constraint.getExpectedJavaSuperType());
+ }
+ }
+ }
+ }
- return object;
++ return Maybe.of(object);
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
index 0000000,db3a72d..558d50a
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/enricher/stock/reducer/Reducer.java
@@@ -1,0 -1,138 +1,138 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.enricher.stock.reducer;
+
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+
+ import org.apache.brooklyn.api.entity.Entity;
+ import org.apache.brooklyn.api.entity.EntityLocal;
+ import org.apache.brooklyn.api.sensor.AttributeSensor;
+ import org.apache.brooklyn.api.sensor.Sensor;
+ import org.apache.brooklyn.api.sensor.SensorEvent;
+ import org.apache.brooklyn.api.sensor.SensorEventListener;
+ import org.apache.brooklyn.config.ConfigKey;
+ import org.apache.brooklyn.core.config.ConfigKeys;
+ import org.apache.brooklyn.core.enricher.AbstractEnricher;
+ import org.apache.brooklyn.util.core.flags.SetFromFlag;
+ import org.apache.brooklyn.util.core.sensor.SensorPredicates;
+ import org.apache.brooklyn.util.core.task.Tasks;
+ import org.apache.brooklyn.util.core.task.ValueResolver;
+ import org.apache.brooklyn.util.text.StringFunctions;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
-import com.google.api.client.util.Lists;
+ import com.google.common.base.Function;
+ import com.google.common.base.Optional;
+ import com.google.common.base.Preconditions;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.Iterables;
++import com.google.common.collect.Lists;
+ import com.google.common.reflect.TypeToken;
+
+ @SuppressWarnings("serial")
+ public class Reducer extends AbstractEnricher implements SensorEventListener<Object> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Reducer.class);
+
+ @SetFromFlag("producer")
+ public static ConfigKey<Entity> PRODUCER = ConfigKeys.newConfigKey(Entity.class, "enricher.producer");
+ public static ConfigKey<Sensor<?>> TARGET_SENSOR = ConfigKeys.newConfigKey(new TypeToken<Sensor<?>>() {}, "enricher.targetSensor");
+ public static ConfigKey<List<? extends AttributeSensor<?>>> SOURCE_SENSORS = ConfigKeys.newConfigKey(new TypeToken<List<? extends AttributeSensor<?>>>() {}, "enricher.sourceSensors");
+ public static ConfigKey<Function<List<?>,?>> REDUCER_FUNCTION = ConfigKeys.newConfigKey(new TypeToken<Function<List<?>, ?>>() {}, "enricher.reducerFunction");
+ @SetFromFlag("transformation")
+ public static final ConfigKey<String> REDUCER_FUNCTION_TRANSFORMATION = ConfigKeys.newStringConfigKey("enricher.reducerFunction.transformation",
+ "A string matching a pre-defined named reducer function, such as joiner");
+ public static final ConfigKey<Map<String, Object>> PARAMETERS = ConfigKeys.newConfigKey(new TypeToken<Map<String, Object>>() {}, "enricher.reducerFunction.parameters",
+ "A map of parameters to pass into the reducer function");
+
+ protected Entity producer;
+ protected List<AttributeSensor<?>> subscribedSensors;
+ protected Sensor<?> targetSensor;
+ protected Function<Iterable<?>, ?> reducerFunction;
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ Preconditions.checkNotNull(getConfig(SOURCE_SENSORS), "source sensors");
+
+ this.producer = getConfig(PRODUCER) == null ? entity : getConfig(PRODUCER);
+ List<AttributeSensor<?>> sensorListTemp = Lists.newArrayList();
+
+ for (Object sensorO : getConfig(SOURCE_SENSORS)) {
+ AttributeSensor<?> sensor = Tasks.resolving(sensorO).as(AttributeSensor.class).timeout(ValueResolver.REAL_QUICK_WAIT).context(producer).get();
+ Optional<? extends Sensor<?>> foundSensor = Iterables.tryFind(sensorListTemp,
+ SensorPredicates.nameEqualTo(sensor.getName()));
+
+ if(!foundSensor.isPresent()) {
+ sensorListTemp.add(sensor);
+ }
+ }
+
+ String reducerName = config().get(REDUCER_FUNCTION_TRANSFORMATION);
+ Function<Iterable<?>, ?> reducerFunction = (Function) config().get(REDUCER_FUNCTION);
+ if(reducerFunction == null){
+ Map<String, ?> parameters = config().get(PARAMETERS);
+ reducerFunction = createReducerFunction(reducerName, parameters);
+ }
+
+ this.reducerFunction = reducerFunction;
+ Preconditions.checkState(sensorListTemp.size() > 0, "Nothing to reduce");
+
+ for (Sensor<?> sensor : sensorListTemp) {
+ subscribe(producer, sensor, this);
+ }
+
+ subscribedSensors = ImmutableList.copyOf(sensorListTemp);
+ }
+
+ // Default implementation, subclasses should override
+ protected Function<Iterable<?>, ?> createReducerFunction(String reducerName, Map<String, ?> parameters){
+ if(Objects.equals(reducerName, "joiner")){
+ String separator = (String) parameters.get("separator");
+ return StringFunctions.joiner(separator == null ? ", " : separator);
+ }
+
+ if (Objects.equals(reducerName, "formatString")){
+ String format = Preconditions.checkNotNull((String)parameters.get("format"), "format");
+ return StringFunctions.formatterForIterable(format);
+ }
+ throw new IllegalStateException("unknown function: " + reducerName);
+ }
+
+ @Override
+ public void onEvent(SensorEvent<Object> event) {
+ Sensor<?> destinationSensor = getConfig(TARGET_SENSOR);
+
+ List<Object> values = Lists.newArrayList();
+
+ for (AttributeSensor<?> sourceSensor : subscribedSensors) {
+ Object resolvedSensorValue = entity.sensors().get(sourceSensor);
+ values.add(resolvedSensorValue);
+ }
+ Object result = reducerFunction.apply(values);
+
+ if (LOG.isTraceEnabled()) LOG.trace("enricher {} got {}, propagating via {} as {}",
+ new Object[] {this, event, entity, reducerFunction, destinationSensor});
+
+ emit(destinationSensor, result);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/018a0e15/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
----------------------------------------------------------------------
diff --cc brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
index 0000000,756f665..b8e5c63
mode 000000,100644..100644
--- a/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
+++ b/brooklyn-server/core/src/main/java/org/apache/brooklyn/entity/group/DynamicClusterImpl.java
@@@ -1,0 -1,971 +1,972 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.brooklyn.entity.group;
+
+ import static com.google.common.base.Preconditions.checkArgument;
+ import static com.google.common.base.Preconditions.checkNotNull;
+
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.NoSuchElementException;
+ import java.util.Set;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+ import javax.annotation.Nullable;
+
+ import org.apache.brooklyn.api.entity.Entity;
+ import org.apache.brooklyn.api.entity.EntitySpec;
+ import org.apache.brooklyn.api.entity.Group;
+ import org.apache.brooklyn.api.location.Location;
+ import org.apache.brooklyn.api.location.MachineProvisioningLocation;
+ import org.apache.brooklyn.api.mgmt.Task;
+ import org.apache.brooklyn.api.policy.Policy;
+ import org.apache.brooklyn.api.sensor.AttributeSensor;
++import org.apache.brooklyn.core.config.Sanitizer;
+ import org.apache.brooklyn.core.config.render.RendererHints;
+ import org.apache.brooklyn.core.effector.Effectors;
+ import org.apache.brooklyn.core.entity.Entities;
+ import org.apache.brooklyn.core.entity.factory.EntityFactory;
+ import org.apache.brooklyn.core.entity.factory.EntityFactoryForLocation;
+ import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+ import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
+ import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
+ import org.apache.brooklyn.core.entity.trait.Startable;
+ import org.apache.brooklyn.core.entity.trait.StartableMethods;
+ import org.apache.brooklyn.core.location.Locations;
+ import org.apache.brooklyn.core.location.cloud.AvailabilityZoneExtension;
+ import org.apache.brooklyn.core.sensor.Sensors;
+ import org.apache.brooklyn.entity.stock.DelegateEntity;
+ import org.apache.brooklyn.feed.function.FunctionFeed;
+ import org.apache.brooklyn.feed.function.FunctionPollConfig;
+ import org.apache.brooklyn.util.collections.MutableList;
+ import org.apache.brooklyn.util.collections.MutableMap;
+ import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+ import org.apache.brooklyn.util.core.flags.TypeCoercions;
+ import org.apache.brooklyn.util.core.task.DynamicTasks;
+ import org.apache.brooklyn.util.core.task.TaskTags;
+ import org.apache.brooklyn.util.core.task.Tasks;
+ import org.apache.brooklyn.util.exceptions.Exceptions;
+ import org.apache.brooklyn.util.exceptions.ReferenceWithError;
+ import org.apache.brooklyn.util.guava.Maybe;
+ import org.apache.brooklyn.util.javalang.JavaClassNames;
+ import org.apache.brooklyn.util.javalang.Reflections;
+ import org.apache.brooklyn.util.text.StringPredicates;
+ import org.apache.brooklyn.util.text.Strings;
+ import org.apache.brooklyn.util.time.Duration;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.common.base.Function;
+ import com.google.common.base.Functions;
+ import com.google.common.base.Optional;
+ import com.google.common.base.Preconditions;
+ import com.google.common.base.Predicates;
+ import com.google.common.base.Supplier;
+ import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableMap;
+ import com.google.common.collect.ImmutableSet;
+ import com.google.common.collect.Iterables;
+ import com.google.common.collect.LinkedHashMultimap;
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Multimap;
+ import com.google.common.collect.Sets;
+ import com.google.common.reflect.TypeToken;
+
+ /**
+ * A cluster of entities that can dynamically increase or decrease the number of entities.
+ */
+ public class DynamicClusterImpl extends AbstractGroupImpl implements DynamicCluster {
+
+ @SuppressWarnings("serial")
+ private static final AttributeSensor<Supplier<Integer>> NEXT_CLUSTER_MEMBER_ID = Sensors.newSensor(new TypeToken<Supplier<Integer>>() {},
+ "next.cluster.member.id", "Returns the ID number of the next member to be added");
+
+ private volatile FunctionFeed clusterOneAndAllMembersUp;
+
+ // TODO better mechanism for arbitrary class name to instance type coercion
+ static {
+ TypeCoercions.registerAdapter(String.class, NodePlacementStrategy.class, new Function<String, NodePlacementStrategy>() {
+ @Override
+ public NodePlacementStrategy apply(final String input) {
+ ClassLoader classLoader = NodePlacementStrategy.class.getClassLoader();
+ Optional<NodePlacementStrategy> strategy = Reflections.<NodePlacementStrategy>invokeConstructorWithArgs(classLoader, input);
+ if (strategy.isPresent()) {
+ return strategy.get();
+ } else {
+ throw new IllegalStateException("Failed to create NodePlacementStrategy "+input);
+ }
+ }
+ });
+ TypeCoercions.registerAdapter(String.class, ZoneFailureDetector.class, new Function<String, ZoneFailureDetector>() {
+ @Override
+ public ZoneFailureDetector apply(final String input) {
+ ClassLoader classLoader = ZoneFailureDetector.class.getClassLoader();
+ Optional<ZoneFailureDetector> detector = Reflections.<ZoneFailureDetector>invokeConstructorWithArgs(classLoader, input);
+ if (detector.isPresent()) {
+ return detector.get();
+ } else {
+ throw new IllegalStateException("Failed to create ZoneFailureDetector "+input);
+ }
+ }
+ });
+ }
+
+ static {
+ RendererHints.register(FIRST, RendererHints.namedActionWithUrl("Open", DelegateEntity.EntityUrl.entityUrl()));
+ RendererHints.register(CLUSTER, RendererHints.namedActionWithUrl("Open", DelegateEntity.EntityUrl.entityUrl()));
+ }
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicClusterImpl.class);
+
+ /**
+ * Mutex for synchronizing during re-size operations.
+ * Sub-classes should use with great caution, to not introduce deadlocks!
+ */
+ protected final Object mutex = new Object[0];
+
+ private static final Function<Collection<Entity>, Entity> defaultRemovalStrategy = new Function<Collection<Entity>, Entity>() {
+ @Override public Entity apply(Collection<Entity> contenders) {
+ /*
+ * Choose the newest entity (largest cluster member ID or latest timestamp) that is stoppable.
+ * If none are stoppable, take the newest non-stoppable.
+ *
+ * Both cluster member ID and timestamp must be taken into consideration to account for legacy
+ * clusters that were created before the addition of the cluster member ID config value.
+ */
+ int largestClusterMemberId = -1;
+ long newestTime = 0L;
+ Entity newest = null;
+
+ for (Entity contender : contenders) {
+ Integer contenderClusterMemberId = contender.config().get(CLUSTER_MEMBER_ID);
+ long contenderCreationTime = contender.getCreationTime();
+
+ boolean newer = (contenderClusterMemberId != null && contenderClusterMemberId > largestClusterMemberId) ||
+ contenderCreationTime > newestTime;
+
+ if ((contender instanceof Startable && newer) ||
+ (!(newest instanceof Startable) && ((contender instanceof Startable) || newer))) {
+ newest = contender;
+
+ if (contenderClusterMemberId != null) largestClusterMemberId = contenderClusterMemberId;
+ newestTime = contenderCreationTime;
+ }
+ }
+
+ return newest;
+ }
+ };
+
+ private static class NextClusterMemberIdSupplier implements Supplier<Integer> {
+ private AtomicInteger nextId = new AtomicInteger(0);
+
+ @Override
+ public Integer get() {
+ return nextId.getAndIncrement();
+ }
+ }
+
+ public DynamicClusterImpl() {
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ initialiseMemberId();
+ connectAllMembersUp();
+ }
+
+ private void initialiseMemberId() {
+ synchronized (mutex) {
+ if (sensors().get(NEXT_CLUSTER_MEMBER_ID) == null) {
+ sensors().set(NEXT_CLUSTER_MEMBER_ID, new NextClusterMemberIdSupplier());
+ }
+ }
+ }
+
+ private void connectAllMembersUp() {
+ clusterOneAndAllMembersUp = FunctionFeed.builder()
+ .entity(this)
+ .period(Duration.FIVE_SECONDS)
+ .poll(new FunctionPollConfig<Boolean, Boolean>(CLUSTER_ONE_AND_ALL_MEMBERS_UP)
+ .onException(Functions.constant(Boolean.FALSE))
+ .callable(new ClusterOneAndAllMembersUpCallable(this)))
+ .build();
+ }
+
+ private static class ClusterOneAndAllMembersUpCallable implements Callable<Boolean> {
+
+ private final Group cluster;
+
+ public ClusterOneAndAllMembersUpCallable(Group cluster) {
+ this.cluster = cluster;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ if (cluster.getMembers().isEmpty())
+ return false;
+
+ if (Lifecycle.RUNNING != cluster.sensors().get(SERVICE_STATE_ACTUAL))
+ return false;
+
+ for (Entity member : cluster.getMembers())
+ if (!Boolean.TRUE.equals(member.sensors().get(SERVICE_UP)))
+ return false;
+
+ return true;
+ }
+ }
+
+ @Override
+ protected void initEnrichers() {
+ if (config().getRaw(UP_QUORUM_CHECK).isAbsent() && getConfig(INITIAL_SIZE)==0) {
+ // if initial size is 0 then override up check to allow zero if empty
+ config().set(UP_QUORUM_CHECK, QuorumChecks.atLeastOneUnlessEmpty());
+ sensors().set(SERVICE_UP, true);
+ } else {
+ sensors().set(SERVICE_UP, false);
+ }
+ super.initEnrichers();
+ // override previous enricher so that only members are checked
+ ServiceStateLogic.newEnricherFromChildrenUp().checkMembersOnly().requireUpChildren(getConfig(UP_QUORUM_CHECK)).addTo(this);
+ }
+
+ @Override
+ public void setRemovalStrategy(Function<Collection<Entity>, Entity> val) {
+ config().set(REMOVAL_STRATEGY, checkNotNull(val, "removalStrategy"));
+ }
+
+ protected Function<Collection<Entity>, Entity> getRemovalStrategy() {
+ Function<Collection<Entity>, Entity> result = getConfig(REMOVAL_STRATEGY);
+ return (result != null) ? result : defaultRemovalStrategy;
+ }
+
+ @Override
+ public void setZonePlacementStrategy(NodePlacementStrategy val) {
+ config().set(ZONE_PLACEMENT_STRATEGY, checkNotNull(val, "zonePlacementStrategy"));
+ }
+
+ protected NodePlacementStrategy getZonePlacementStrategy() {
+ return checkNotNull(getConfig(ZONE_PLACEMENT_STRATEGY), "zonePlacementStrategy config");
+ }
+
+ @Override
+ public void setZoneFailureDetector(ZoneFailureDetector val) {
+ config().set(ZONE_FAILURE_DETECTOR, checkNotNull(val, "zoneFailureDetector"));
+ }
+
+ protected ZoneFailureDetector getZoneFailureDetector() {
+ return checkNotNull(getConfig(ZONE_FAILURE_DETECTOR), "zoneFailureDetector config");
+ }
+
+ protected EntitySpec<?> getFirstMemberSpec() {
+ return getConfig(FIRST_MEMBER_SPEC);
+ }
+
+ protected EntitySpec<?> getMemberSpec() {
+ return getConfig(MEMBER_SPEC);
+ }
+
+ /** @deprecated since 0.7.0; use {@link #getMemberSpec()} */
+ @Deprecated
+ protected EntityFactory<?> getFactory() {
+ return getConfig(FACTORY);
+ }
+
+ @Override
+ public void setMemberSpec(EntitySpec<?> memberSpec) {
+ setConfigEvenIfOwned(MEMBER_SPEC, memberSpec);
+ }
+
+ /** @deprecated since 0.7.0; use {@link #setMemberSpec(EntitySpec)} */
+ @Deprecated
+ @Override
+ public void setFactory(EntityFactory<?> factory) {
+ setConfigEvenIfOwned(FACTORY, factory);
+ }
+
+ private Location getLocation() {
+ Collection<? extends Location> ll = Locations.getLocationsCheckingAncestors(getLocations(), this);
+ try {
+ return Iterables.getOnlyElement(ll);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ if (ll.isEmpty()) throw new IllegalStateException("No location available for "+this);
+ else throw new IllegalStateException("Ambiguous location for "+this+"; expected one but had "+ll);
+ }
+ }
+
+ protected boolean isAvailabilityZoneEnabled() {
+ return getConfig(ENABLE_AVAILABILITY_ZONES);
+ }
+
+ protected boolean isQuarantineEnabled() {
+ return getConfig(QUARANTINE_FAILED_ENTITIES);
+ }
+
+ protected QuarantineGroup getQuarantineGroup() {
+ return getAttribute(QUARANTINE_GROUP);
+ }
+
+ protected int getInitialQuorumSize() {
+ int initialSize = getConfig(INITIAL_SIZE).intValue();
+ int initialQuorumSize = getConfig(INITIAL_QUORUM_SIZE).intValue();
+ if (initialQuorumSize < 0) initialQuorumSize = initialSize;
+ if (initialQuorumSize > initialSize) {
+ LOG.warn("On start of cluster {}, misconfigured initial quorum size {} greater than initial size{}; using {}", new Object[] {initialQuorumSize, initialSize, initialSize});
+ initialQuorumSize = initialSize;
+ }
+ return initialQuorumSize;
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locsO) {
+ if (locsO!=null) {
+ checkArgument(locsO.size() <= 1, "Wrong number of locations supplied to start %s: %s", this, locsO);
+ addLocations(locsO);
+ }
+ Location loc = getLocation();
+
+ EntitySpec<?> spec = getConfig(MEMBER_SPEC);
+ if (spec!=null) {
+ setDefaultDisplayName("Cluster of "+JavaClassNames.simpleClassName(spec.getType()) +" ("+loc+")");
+ }
+
+ if (isAvailabilityZoneEnabled()) {
+ sensors().set(SUB_LOCATIONS, findSubLocations(loc));
+ }
+
+ ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
+ ServiceProblemsLogic.clearProblemsIndicator(this, START);
+ try {
+ doStart();
+ DynamicTasks.waitForLast();
+
+ } catch (Exception e) {
+ ServiceProblemsLogic.updateProblemsIndicator(this, START, "start failed with error: "+e);
+ throw Exceptions.propagate(e);
+ } finally {
+ ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING);
+ }
+ }
+
+ protected void doStart() {
+ if (isQuarantineEnabled()) {
+ QuarantineGroup quarantineGroup = getAttribute(QUARANTINE_GROUP);
+ if (quarantineGroup==null || !Entities.isManaged(quarantineGroup)) {
+ quarantineGroup = addChild(EntitySpec.create(QuarantineGroup.class).displayName("quarantine"));
+ sensors().set(QUARANTINE_GROUP, quarantineGroup);
+ }
+ }
+
+ int initialSize = getConfig(INITIAL_SIZE).intValue();
+ int initialQuorumSize = getInitialQuorumSize();
+ Exception internalError = null;
+
+ try {
+ resize(initialSize);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ // Apart from logging, ignore problems here; we extract them below.
+ // But if it was this thread that threw the exception (rather than a sub-task), then need
+ // to record that failure here.
+ LOG.debug("Error resizing "+this+" to size "+initialSize+" (collecting and handling): "+e, e);
+ internalError = e;
+ }
+
+ Iterable<Task<?>> failed = Tasks.failed(Tasks.children(Tasks.current()));
+ boolean noFailed = Iterables.isEmpty(failed);
+ boolean severalFailed = Iterables.size(failed) > 1;
+
+ int currentSize = getCurrentSize().intValue();
+ if (currentSize < initialQuorumSize) {
+ String message;
+ if (currentSize == 0 && !noFailed) {
+ if (severalFailed)
+ message = "All nodes in cluster "+this+" failed";
+ else
+ message = "Node in cluster "+this+" failed";
+ } else {
+ message = "On start of cluster " + this + ", failed to get to initial size of " + initialSize
+ + "; size is " + getCurrentSize()
+ + (initialQuorumSize != initialSize ? " (initial quorum size is " + initialQuorumSize + ")" : "");
+ }
+ Throwable firstError = Tasks.getError(Maybe.next(failed.iterator()).orNull());
+ if (firstError==null && internalError!=null) {
+ // only use the internal error if there were no nested task failures
+ // (otherwise the internal error should be a wrapper around the nested failures)
+ firstError = internalError;
+ }
+ if (firstError!=null) {
+ if (severalFailed) {
+ message += "; first failure is: "+Exceptions.collapseText(firstError);
+ } else {
+ message += ": "+Exceptions.collapseText(firstError);
+ }
+ }
+ throw new IllegalStateException(message, firstError);
+
+ } else if (currentSize < initialSize) {
+ LOG.warn(
+ "On start of cluster {}, size {} reached initial minimum quorum size of {} but did not reach desired size {}; continuing",
+ new Object[] { this, currentSize, initialQuorumSize, initialSize });
+ }
+
+ for (Policy it : policies()) {
+ it.resume();
+ }
+ }
+
+ protected List<Location> findSubLocations(Location loc) {
+ if (!loc.hasExtension(AvailabilityZoneExtension.class)) {
+ throw new IllegalStateException("Availability zone extension not supported for location " + loc);
+ }
+
+ AvailabilityZoneExtension zoneExtension = loc.getExtension(AvailabilityZoneExtension.class);
+
+ Collection<String> zoneNames = getConfig(AVAILABILITY_ZONE_NAMES);
+ Integer numZones = getConfig(NUM_AVAILABILITY_ZONES);
+
+ List<Location> subLocations;
+ if (zoneNames == null || zoneNames.isEmpty()) {
+ if (numZones != null) {
+ subLocations = zoneExtension.getSubLocations(numZones);
+
+ checkArgument(numZones > 0, "numZones must be greater than zero: %s", numZones);
+ if (numZones > subLocations.size()) {
+ throw new IllegalStateException("Number of required zones (" + numZones + ") not satisfied in " + loc
+ + "; only " + subLocations.size() + " available: " + subLocations);
+ }
+ } else {
+ subLocations = zoneExtension.getAllSubLocations();
+ }
+ } else {
+ // TODO check that these are valid region / availabilityZones?
+ subLocations = zoneExtension.getSubLocationsByName(StringPredicates.equalToAny(zoneNames), zoneNames.size());
+
+ if (zoneNames.size() > subLocations.size()) {
+ throw new IllegalStateException("Number of required zones (" + zoneNames.size() + " - " + zoneNames
+ + ") not satisfied in " + loc + "; only " + subLocations.size() + " available: " + subLocations);
+ }
+ }
+
+ LOG.info("Returning {} sub-locations: {}", subLocations.size(), Iterables.toString(subLocations));
+ return subLocations;
+ }
+
+ @Override
+ public void stop() {
+ ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING);
+ try {
+ for (Policy it : policies()) { it.suspend(); }
+
+ // run shrink without mutex to make things stop even if starting,
+ int size = getCurrentSize();
+ if (size > 0) { shrink(-size); }
+
+ // run resize with mutex to prevent others from starting things
+ resize(0);
+
+ // also stop any remaining stoppable children -- eg those on fire
+ // (this ignores the quarantine node which is not stoppable)
+ StartableMethods.stop(this);
+
+ ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPED);
+ } catch (Exception e) {
+ ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+ throw Exceptions.propagate(e);
+ } finally {
+ if (clusterOneAndAllMembersUp != null) clusterOneAndAllMembersUp.stop();
+ }
+ }
+
+ @Override
+ public void restart() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Integer resize(Integer desiredSize) {
+ synchronized (mutex) {
+ int originalSize = getCurrentSize();
+ int delta = desiredSize - originalSize;
+ if (delta != 0) {
+ LOG.info("Resize {} from {} to {}", new Object[] {this, originalSize, desiredSize});
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("Resize no-op {} from {} to {}", new Object[] {this, originalSize, desiredSize});
+ }
+ resizeByDelta(delta);
+ }
+ return getCurrentSize();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}.
+ */
+ @Override
+ public String replaceMember(String memberId) {
+ Entity member = getEntityManager().getEntity(memberId);
+ LOG.info("In {}, replacing member {} ({})", new Object[] {this, memberId, member});
+
+ if (member == null) {
+ throw new NoSuchElementException("In "+this+", entity "+memberId+" cannot be resolved, so not replacing");
+ }
+
+ synchronized (mutex) {
+ if (!getMembers().contains(member)) {
+ throw new NoSuchElementException("In "+this+", entity "+member+" is not a member so not replacing");
+ }
+
+ Location memberLoc = null;
+ if (isAvailabilityZoneEnabled()) {
+ // this member's location could be a machine provisioned by a sub-location, or the actual sub-location
+ List<Location> subLocations = findSubLocations(getLocation());
+ Collection<Location> actualMemberLocs = member.getLocations();
+ boolean foundMatch = false;
+ for (Iterator<Location> iter = actualMemberLocs.iterator(); !foundMatch && iter.hasNext();) {
+ Location actualMemberLoc = iter.next();
+ Location contenderMemberLoc = actualMemberLoc;
+ do {
+ if (subLocations.contains(contenderMemberLoc)) {
+ memberLoc = contenderMemberLoc;
+ foundMatch = true;
+ LOG.debug("In {} replacing member {} ({}), inferred its sub-location is {}", new Object[] {this, memberId, member, memberLoc});
+ }
+ contenderMemberLoc = contenderMemberLoc.getParent();
+ } while (!foundMatch && contenderMemberLoc != null);
+ }
+ if (!foundMatch) {
+ if (actualMemberLocs.isEmpty()) {
+ memberLoc = subLocations.get(0);
+ LOG.warn("In {} replacing member {} ({}), has no locations; falling back to first availability zone: {}", new Object[] {this, memberId, member, memberLoc});
+ } else {
+ memberLoc = Iterables.tryFind(actualMemberLocs, Predicates.instanceOf(MachineProvisioningLocation.class)).or(Iterables.getFirst(actualMemberLocs, null));
+ LOG.warn("In {} replacing member {} ({}), could not find matching sub-location; falling back to its actual location: {}", new Object[] {this, memberId, member, memberLoc});
+ }
+ } else if (memberLoc == null) {
+ // impossible to get here, based on logic above!
+ throw new IllegalStateException("Unexpected condition! cluster="+this+"; member="+member+"; actualMemberLocs="+actualMemberLocs);
+ }
+ } else {
+ // Replacing member, so new member should be in the same location as that being replaced.
+ // Expect this to agree with `getMemberSpec().getLocations()` (if set). If not, then
+ // presumably there was a reason this specific member was started somewhere else!
+ memberLoc = getLocation();
+ }
+
+ Entity replacement = replaceMember(member, memberLoc, ImmutableMap.of());
+ return replacement.getId();
+ }
+ }
+
+ /**
+ * @throws StopFailedRuntimeException If stop failed, after successfully starting replacement
+ */
+ protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) {
+ synchronized (mutex) {
+ ReferenceWithError<Optional<Entity>> added = addInSingleLocation(memberLoc, extraFlags);
+
+ if (!added.getWithoutError().isPresent()) {
+ String msg = String.format("In %s, failed to grow, to replace %s; not removing", this, member);
+ if (added.hasError())
+ throw new IllegalStateException(msg, added.getError());
+ throw new IllegalStateException(msg);
+ }
+
+ try {
+ stopAndRemoveNode(member);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ throw new StopFailedRuntimeException("replaceMember failed to stop and remove old member "+member.getId(), e);
+ }
+
+ return added.getWithError().get();
+ }
+ }
+
+ protected Multimap<Location, Entity> getMembersByLocation() {
+ Multimap<Location, Entity> result = LinkedHashMultimap.create();
+ for (Entity member : getMembers()) {
+ Collection<Location> memberLocs = member.getLocations();
+ Location memberLoc = Iterables.getFirst(memberLocs, null);
+ if (memberLoc != null) {
+ result.put(memberLoc, member);
+ }
+ }
+ return result;
+ }
+
+ protected List<Location> getNonFailedSubLocations() {
+ List<Location> result = Lists.newArrayList();
+ Set<Location> failed = Sets.newLinkedHashSet();
+ List<Location> subLocations = findSubLocations(getLocation());
+ Set<Location> oldFailedSubLocations = getAttribute(FAILED_SUB_LOCATIONS);
+ if (oldFailedSubLocations == null)
+ oldFailedSubLocations = ImmutableSet.<Location> of();
+
+ for (Location subLocation : subLocations) {
+ if (getZoneFailureDetector().hasFailed(subLocation)) {
+ failed.add(subLocation);
+ } else {
+ result.add(subLocation);
+ }
+ }
+
+ Set<Location> newlyFailed = Sets.difference(failed, oldFailedSubLocations);
+ Set<Location> newlyRecovered = Sets.difference(oldFailedSubLocations, failed);
+ sensors().set(FAILED_SUB_LOCATIONS, failed);
+ sensors().set(SUB_LOCATIONS, result);
+ if (newlyFailed.size() > 0) {
+ LOG.warn("Detected probably zone failures for {}: {}", this, newlyFailed);
+ }
+ if (newlyRecovered.size() > 0) {
+ LOG.warn("Detected probably zone recoveries for {}: {}", this, newlyRecovered);
+ }
+
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <strong>Note</strong> for sub-classes; this method can be called while synchronized on {@link #mutex}.
+ */
+ @Override
+ public Collection<Entity> resizeByDelta(int delta) {
+ synchronized (mutex) {
+ if (delta > 0) {
+ return grow(delta);
+ } else if (delta < 0) {
+ return shrink(delta);
+ } else {
+ return ImmutableList.<Entity>of();
+ }
+ }
+ }
+
+ /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
+ protected Collection<Entity> grow(int delta) {
+ Preconditions.checkArgument(delta > 0, "Must call grow with positive delta.");
+
+ // choose locations to be deployed to
+ List<Location> chosenLocations;
+ List<Location> memberLocations = getMemberSpec() == null ? null : getMemberSpec().getLocations();
+ if (memberLocations != null && memberLocations.size() > 0) {
+ // The memberSpec overrides the location passed to cluster.start(); use
+ // the location defined on the member.
+ if (isAvailabilityZoneEnabled()) {
+ LOG.warn("Cluster {} has availability-zone enabled, but memberSpec overrides location with {}; using "
+ + "memberSpec's location; availability-zone behaviour will not apply", this, memberLocations);
+ }
+ chosenLocations = Collections.nCopies(delta, memberLocations.get(0));
+ } else if (isAvailabilityZoneEnabled()) {
+ List<Location> subLocations = getNonFailedSubLocations();
+ Multimap<Location, Entity> membersByLocation = getMembersByLocation();
+ chosenLocations = getZonePlacementStrategy().locationsForAdditions(membersByLocation, subLocations, delta);
+ if (chosenLocations.size() != delta) {
+ throw new IllegalStateException("Node placement strategy chose " + Iterables.size(chosenLocations)
+ + ", when expected delta " + delta + " in " + this);
+ }
+ } else {
+ chosenLocations = Collections.nCopies(delta, getLocation());
+ }
+
+ // create and start the entities
+ return addInEachLocation(chosenLocations, ImmutableMap.of()).getWithError();
+ }
+
+ /** <strong>Note</strong> for sub-clases; this method can be called while synchronized on {@link #mutex}. */
+ @SuppressWarnings("unchecked")
+ protected Collection<Entity> shrink(int delta) {
+ Preconditions.checkArgument(delta < 0, "Must call shrink with negative delta.");
+ int size = getCurrentSize();
+ if (-delta > size) {
+ // some subclasses (esp in tests) use custom sizes without the members set always being accurate, so put a limit on the size
+ LOG.warn("Call to shrink "+this+" by "+delta+" when size is "+size+"; amending");
+ delta = -size;
+ }
+ if (delta==0) return ImmutableList.<Entity>of();
+
+ Collection<Entity> removedEntities = pickAndRemoveMembers(delta * -1);
+
+ // FIXME symmetry in order of added as child, managed, started, and added to group
+ Task<?> invoke = Entities.invokeEffector(this, (Iterable<Entity>)(Iterable<?>)Iterables.filter(removedEntities, Startable.class), Startable.STOP, Collections.<String,Object>emptyMap());
+ try {
+ invoke.get();
+ return removedEntities;
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ for (Entity removedEntity : removedEntities) {
+ discardNode(removedEntity);
+ }
+ }
+ }
+
+ protected ReferenceWithError<Optional<Entity>> addInSingleLocation(Location location, Map<?,?> flags) {
+ ReferenceWithError<Collection<Entity>> added = addInEachLocation(ImmutableList.of(location), flags);
+
+ Optional<Entity> result = Iterables.isEmpty(added.getWithoutError()) ? Optional.<Entity>absent() : Optional.of(Iterables.getOnlyElement(added.get()));
+ if (!added.hasError()) {
+ return ReferenceWithError.newInstanceWithoutError( result );
+ } else {
+ if (added.masksErrorIfPresent()) {
+ return ReferenceWithError.newInstanceMaskingError( result, added.getError() );
+ } else {
+ return ReferenceWithError.newInstanceThrowingError( result, added.getError() );
+ }
+ }
+ }
+
+ protected ReferenceWithError<Collection<Entity>> addInEachLocation(Iterable<Location> locations, Map<?,?> flags) {
+ List<Entity> addedEntities = Lists.newArrayList();
+ Map<Entity, Location> addedEntityLocations = Maps.newLinkedHashMap();
+ Map<Entity, Task<?>> tasks = Maps.newLinkedHashMap();
+
+ for (Location loc : locations) {
+ Entity entity = addNode(loc, flags);
+ addedEntities.add(entity);
+ addedEntityLocations.put(entity, loc);
+ if (entity instanceof Startable) {
+ Map<String, ?> args = ImmutableMap.of("locations", ImmutableList.of(loc));
+ Task<Void> task = Effectors.invocation(entity, Startable.START, args).asTask();
+ tasks.put(entity, task);
+ }
+ }
+
+ Task<List<?>> parallel = Tasks.parallel("starting "+tasks.size()+" node"+Strings.s(tasks.size())+" (parallel)", tasks.values());
+ TaskTags.markInessential(parallel);
+ DynamicTasks.queueIfPossible(parallel).orSubmitAsync(this);
+ Map<Entity, Throwable> errors = waitForTasksOnEntityStart(tasks);
+
+ // if tracking, then report success/fail to the ZoneFailureDetector
+ if (isAvailabilityZoneEnabled()) {
+ for (Map.Entry<Entity, Location> entry : addedEntityLocations.entrySet()) {
+ Entity entity = entry.getKey();
+ Location loc = entry.getValue();
+ Throwable err = errors.get(entity);
+ if (err == null) {
+ getZoneFailureDetector().onStartupSuccess(loc, entity);
+ } else {
+ getZoneFailureDetector().onStartupFailure(loc, entity, err);
+ }
+ }
+ }
+
+ Collection<Entity> result = MutableList.<Entity> builder()
+ .addAll(addedEntities)
+ .removeAll(errors.keySet())
+ .build();
+
+ // quarantine/cleanup as necessary
+ if (!errors.isEmpty()) {
+ if (isQuarantineEnabled()) {
+ quarantineFailedNodes(errors.keySet());
+ } else {
+ cleanupFailedNodes(errors.keySet());
+ }
+ return ReferenceWithError.newInstanceMaskingError(result, Exceptions.create(errors.values()));
+ }
+
+ return ReferenceWithError.newInstanceWithoutError(result);
+ }
+
+ protected void quarantineFailedNodes(Collection<Entity> failedEntities) {
+ for (Entity entity : failedEntities) {
+ sensors().emit(ENTITY_QUARANTINED, entity);
+ getQuarantineGroup().addMember(entity);
+ removeMember(entity);
+ }
+ }
+
+ protected void cleanupFailedNodes(Collection<Entity> failedEntities) {
+ // TODO Could also call stop on them?
+ for (Entity entity : failedEntities) {
+ discardNode(entity);
+ }
+ }
+
+ protected Map<Entity, Throwable> waitForTasksOnEntityStart(Map<? extends Entity,? extends Task<?>> tasks) {
+ // TODO Could have CompoundException, rather than propagating first
+ Map<Entity, Throwable> errors = Maps.newLinkedHashMap();
+
+ for (Map.Entry<? extends Entity,? extends Task<?>> entry : tasks.entrySet()) {
+ Entity entity = entry.getKey();
+ Task<?> task = entry.getValue();
+ try {
+ task.get();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } catch (Throwable t) {
+ Throwable interesting = Exceptions.getFirstInteresting(t);
+ LOG.error("Cluster "+this+" failed to start entity "+entity+" (removing): "+interesting, interesting);
+ LOG.debug("Trace for: Cluster "+this+" failed to start entity "+entity+" (removing): "+t, t);
+ // previously we unwrapped but now there is no need I think
+ errors.put(entity, t);
+ }
+ }
+ return errors;
+ }
+
+ @Override
+ public boolean removeChild(Entity child) {
+ boolean changed = super.removeChild(child);
+ if (changed) {
+ removeMember(child);
+ }
+ return changed;
+ }
+
+ protected Map<?,?> getCustomChildFlags() {
+ return getConfig(CUSTOM_CHILD_FLAGS);
+ }
+
+ @Override
+ public Entity addNode(Location loc, Map<?, ?> extraFlags) {
+ // In case subclasses are foolish and do not call super.init() when overriding.
+ initialiseMemberId();
+ Map<?, ?> createFlags = MutableMap.builder()
+ .putAll(getCustomChildFlags())
+ .putAll(extraFlags)
+ .put(CLUSTER_MEMBER_ID, sensors().get(NEXT_CLUSTER_MEMBER_ID).get())
+ .build();
+ if (LOG.isDebugEnabled()) {
- LOG.debug("Creating and adding a node to cluster {}({}) with properties {}", new Object[] { this, getId(), createFlags });
++ LOG.debug("Creating and adding a node to cluster {}({}) with properties {}", new Object[] { this, getId(), Sanitizer.sanitize(createFlags) });
+ }
+
+ // TODO should refactor to have a createNodeSpec; and spec should support initial sensor values
+ Entity entity = createNode(loc, createFlags);
+
+ entity.sensors().set(CLUSTER_MEMBER, true);
+ entity.sensors().set(CLUSTER, this);
+
+ // Continue to call manage(), because some uses of NodeFactory (in tests) still instantiate the
+ // entity via its constructor
+ Entities.manage(entity);
+
+ addMember(entity);
+ return entity;
+ }
+
+ protected Entity createNode(@Nullable Location loc, Map<?,?> flags) {
+ EntitySpec<?> memberSpec = null;
+ if (getMembers().isEmpty()) memberSpec = getFirstMemberSpec();
+ if (memberSpec == null) memberSpec = getMemberSpec();
+
+ if (memberSpec != null) {
+ return addChild(EntitySpec.create(memberSpec).configure(flags).location(loc));
+ }
+
+ EntityFactory<?> factory = getFactory();
+ if (factory == null) {
+ throw new IllegalStateException("No member spec nor entity factory supplied for dynamic cluster "+this);
+ }
+ EntityFactory<?> factoryToUse = (factory instanceof EntityFactoryForLocation) ? ((EntityFactoryForLocation<?>) factory).newFactoryForLocation(loc) : factory;
+ Entity entity = factoryToUse.newEntity(flags, this);
+ if (entity==null) {
+ throw new IllegalStateException("EntityFactory factory routine returned null entity, in "+this);
+ }
+ if (entity.getParent()==null) entity.setParent(this);
+
+ return entity;
+ }
+
+ protected List<Entity> pickAndRemoveMembers(int delta) {
+ if (delta==0)
+ return Lists.newArrayList();
+
+ if (delta == 1 && !isAvailabilityZoneEnabled()) {
+ Maybe<Entity> member = tryPickAndRemoveMember();
+ return (member.isPresent()) ? ImmutableList.of(member.get()) : ImmutableList.<Entity>of();
+ }
+
+ // TODO inefficient impl
+ Preconditions.checkState(getMembers().size() > 0, "Attempt to remove a node (delta "+delta+") when members is empty, from cluster " + this);
+ if (LOG.isDebugEnabled()) LOG.debug("Removing a node from {}", this);
+
+ if (isAvailabilityZoneEnabled()) {
+ Multimap<Location, Entity> membersByLocation = getMembersByLocation();
+ List<Entity> entities = getZonePlacementStrategy().entitiesToRemove(membersByLocation, delta);
+
+ Preconditions.checkState(entities.size() == delta, "Incorrect num entity chosen for removal from %s (%s when expected %s)",
+ getId(), entities.size(), delta);
+
+ for (Entity entity : entities) {
+ removeMember(entity);
+ }
+ return entities;
+ } else {
+ List<Entity> entities = Lists.newArrayList();
+ for (int i = 0; i < delta; i++) {
+ // don't assume we have enough members; e.g. if shrinking to zero and someone else concurrently stops a member,
+ // then just return what we were able to remove.
+ Maybe<Entity> member = tryPickAndRemoveMember();
+ if (member.isPresent()) entities.add(member.get());
+ }
+ return entities;
+ }
+ }
+
+ private Maybe<Entity> tryPickAndRemoveMember() {
+ assert !isAvailabilityZoneEnabled() : "should instead call pickAndRemoveMembers(int) if using availability zones";
+
+ // TODO inefficient impl
+ Collection<Entity> members = getMembers();
+ if (members.isEmpty()) return Maybe.absent();
+
+ if (LOG.isDebugEnabled()) LOG.debug("Removing a node from {}", this);
+ Entity entity = getRemovalStrategy().apply(members);
+ Preconditions.checkNotNull(entity, "No entity chosen for removal from "+getId());
+
+ removeMember(entity);
+ return Maybe.of(entity);
+ }
+
+ protected void discardNode(Entity entity) {
+ removeMember(entity);
+ try {
+ Entities.unmanage(entity);
+ } catch (IllegalStateException e) {
+ //probably already unmanaged
+ LOG.debug("Exception during removing member of cluster " + this + ", unmanaging node " + entity + ". The node is probably already unmanaged.", e);
+ }
+ }
+
+ protected void stopAndRemoveNode(Entity member) {
+ removeMember(member);
+
+ try {
+ if (member instanceof Startable) {
+ Task<?> task = member.invoke(Startable.STOP, Collections.<String,Object>emptyMap());
+ task.getUnchecked();
+ }
+ } finally {
+ Entities.unmanage(member);
+ }
+ }
+ }