You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:53 UTC
[17/51] [abbrv] [partial] brooklyn-library git commit: move subdir
from incubator up a level as it is promoted to its own repo (first
non-incubator commit!)
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
deleted file mode 100644
index 79003c2..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
+++ /dev/null
@@ -1,629 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.apache.brooklyn.core.effector.EffectorBody;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityPredicates;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
-import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic;
-import org.apache.brooklyn.core.location.Machines;
-import org.apache.brooklyn.enricher.stock.Enrichers;
-import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import org.apache.brooklyn.entity.group.DynamicClusterImpl;
-import org.apache.brooklyn.entity.group.DynamicGroup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.util.collections.MutableList;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.core.ResourceUtils;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Time;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.common.net.HostAndPort;
-
-/**
- * Implementation of {@link CassandraDatacenter}.
- * <p>
- * Several subtleties to note:
- * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
- * (so we wait for thrift port to be contactable)
- * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
- * (each up to 1m; often very close to the 1m)
- */
-public class CassandraDatacenterImpl extends DynamicClusterImpl implements CassandraDatacenter {
-
- /*
- * TODO Seed management is hard!
- * - The ServiceRestarter is not doing customize(), so is not refreshing the seeds in cassandra.yaml.
- * If we have two nodes that were seeds for each other and they both restart at the same time, we'll have a split brain.
- */
-
- private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterImpl.class);
-
- // Mutex for synchronizing during re-size operations
- private final Object mutex = new Object[0];
-
- private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
- // Mutex for (re)calculating our seeds
- // TODO is this very dangerous?! Calling out to SeedTracker, which calls out to alien getAttribute()/getConfig(). But I think that's ok.
- // TODO might not need mutex? previous race was being caused by something else, other than concurrent calls!
- private final Object seedMutex = new Object();
-
- @Override
- public Set<Entity> get() {
- synchronized (seedMutex) {
- boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
- int quorumSize = getSeedQuorumSize();
- Set<Entity> potentialSeeds = gatherPotentialSeeds();
- Set<Entity> potentialRunningSeeds = gatherPotentialRunningSeeds();
- boolean stillWaitingForQuorum = (!hasPublishedSeeds) && (potentialSeeds.size() < quorumSize);
-
- if (stillWaitingForQuorum) {
- if (log.isDebugEnabled()) log.debug("Not refreshed seeds of cluster {}, because still waiting for quorum (need {}; have {} potentials)", new Object[] {CassandraDatacenterImpl.class, quorumSize, potentialSeeds.size()});
- return ImmutableSet.of();
- } else if (hasPublishedSeeds) {
- Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
- if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) {
- if (Sets.intersection(currentSeeds, potentialSeeds).isEmpty()) {
- log.warn("Cluster {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraDatacenterImpl.this, currentSeeds});
- }
- return currentSeeds;
- } else if (potentialRunningSeeds.isEmpty()) {
- // TODO Could be race where nodes have only just returned from start() and are about to
- // transition to serviceUp; so don't just abandon all our seeds!
- log.warn("Cluster {} has no running seeds (yet?); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraDatacenterImpl.this});
- return currentSeeds;
- } else {
- Set<Entity> result = trim(quorumSize, potentialRunningSeeds);
- log.debug("Cluster {} updating seeds: chosen={}; potentialRunning={}", new Object[] {CassandraDatacenterImpl.this, result, potentialRunningSeeds});
- return result;
- }
- } else {
- Set<Entity> result = trim(quorumSize, potentialSeeds);
- if (log.isDebugEnabled()) log.debug("Cluster {} has reached seed quorum: seeds={}", new Object[] {CassandraDatacenterImpl.this, result});
- return result;
- }
- }
- }
- private Set<Entity> trim(int num, Set<Entity> contenders) {
- // Prefer existing seeds wherever possible; otherwise accept any other contenders
- Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
- Set<Entity> result = Sets.newLinkedHashSet();
- result.addAll(Sets.intersection(currentSeeds, contenders));
- result.addAll(contenders);
- return ImmutableSet.copyOf(Iterables.limit(result, num));
- }
- };
-
- protected SeedTracker seedTracker = new SeedTracker();
- protected TokenGenerator tokenGenerator = null;
-
- public CassandraDatacenterImpl() {
- }
-
- @Override
- public void init() {
- super.init();
-
- /*
- * subscribe to hostname, and keep an accurate set of current seeds in a sensor;
- * then at nodes we set the initial seeds to be the current seeds when ready (non-empty)
- */
- subscriptions().subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener<String>() {
- @Override
- public void onEvent(SensorEvent<String> event) {
- seedTracker.onHostnameChanged(event.getSource(), event.getValue());
- }
- });
- subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
- @Override public void onEvent(SensorEvent<Entity> event) {
- seedTracker.onMemberRemoved(event.getValue());
- }
- });
- subscriptions().subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
- @Override
- public void onEvent(SensorEvent<Boolean> event) {
- seedTracker.onServiceUpChanged(event.getSource(), event.getValue());
- }
- });
- subscriptions().subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener<Lifecycle>() {
- @Override
- public void onEvent(SensorEvent<Lifecycle> event) {
- // trigger a recomputation also when lifecycle state changes,
- // because it might not have ruled a seed as inviable when service up went true
- // because service state was not yet running
- seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue());
- }
- });
-
- // Track the datacenters for this cluster
- subscriptions().subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener<String>() {
- @Override
- public void onEvent(SensorEvent<String> event) {
- Entity member = event.getSource();
- String dcName = event.getValue();
- if (dcName != null) {
- Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
- Multimap<String, Entity> mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.<String, Entity>create() : LinkedHashMultimap.create(datacenterUsage);
- Optional<String> oldDcName = getKeyOfVal(mutableDatacenterUsage, member);
- if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) {
- mutableDatacenterUsage.values().remove(member);
- mutableDatacenterUsage.put(dcName, member);
- sensors().set(DATACENTER_USAGE, mutableDatacenterUsage);
- sensors().set(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
- }
- }
- }
- private <K,V> Optional<K> getKeyOfVal(Multimap<K,V> map, V val) {
- for (Map.Entry<K,V> entry : map.entries()) {
- if (Objects.equal(val, entry.getValue())) {
- return Optional.of(entry.getKey());
- }
- }
- return Optional.absent();
- }
- });
- subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
- @Override public void onEvent(SensorEvent<Entity> event) {
- Entity entity = event.getSource();
- Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
- if (datacenterUsage != null && datacenterUsage.containsValue(entity)) {
- Multimap<String, Entity> mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage);
- mutableDatacenterUsage.values().remove(entity);
- sensors().set(DATACENTER_USAGE, mutableDatacenterUsage);
- sensors().set(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
- }
- }
- });
-
- getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() {
- @Override
- public String call(ConfigBag parameters) {
- return executeScript((String)parameters.getStringKey("commands"));
- }
- });
- }
-
- protected Supplier<Set<Entity>> getSeedSupplier() {
- Supplier<Set<Entity>> seedSupplier = getConfig(SEED_SUPPLIER);
- return (seedSupplier == null) ? defaultSeedSupplier : seedSupplier;
- }
-
- protected boolean useVnodes() {
- return Boolean.TRUE.equals(getConfig(USE_VNODES));
- }
-
- protected synchronized TokenGenerator getTokenGenerator() {
- if (tokenGenerator!=null)
- return tokenGenerator;
-
- try {
- tokenGenerator = getConfig(TOKEN_GENERATOR_CLASS).newInstance();
-
- BigInteger shift = getConfig(TOKEN_SHIFT);
- if (shift==null)
- shift = BigDecimal.valueOf(Math.random()).multiply(
- new BigDecimal(tokenGenerator.range())).toBigInteger();
- tokenGenerator.setOrigin(shift);
-
- return tokenGenerator;
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- protected int getSeedQuorumSize() {
- Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
- if (quorumSize!=null && quorumSize>0)
- return quorumSize;
- // default 2 is recommended, unless initial size is smaller
- return Math.min(Math.max(getConfig(INITIAL_SIZE), 1), DEFAULT_SEED_QUORUM);
- }
-
- @Override
- public Set<Entity> gatherPotentialSeeds() {
- return seedTracker.gatherPotentialSeeds();
- }
-
- @Override
- public Set<Entity> gatherPotentialRunningSeeds() {
- return seedTracker.gatherPotentialRunningSeeds();
- }
-
- /**
- * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra nodes.
- */
- @Override
- protected EntitySpec<?> getMemberSpec() {
- return getConfig(MEMBER_SPEC, EntitySpec.create(CassandraNode.class));
- }
-
- @Override
- public String getClusterName() {
- return getAttribute(CLUSTER_NAME);
- }
-
- @Override
- public Collection<Entity> grow(int delta) {
- if (useVnodes()) {
- // nothing to do for token generator
- } else {
- if (getCurrentSize() == 0) {
- getTokenGenerator().growingCluster(delta);
- }
- }
- return super.grow(delta);
- }
-
- @Override
- protected Entity createNode(@Nullable Location loc, Map<?,?> flags) {
- Map<Object, Object> allflags = MutableMap.copyOf(flags);
-
- if (flags.containsKey("token") || flags.containsKey("cassandra.token")) {
- // TODO Delete in future version; was deprecated in 0.7.0; deleted config key in 0.9.0
- log.warn("Cassandra token no longer supported - use 'tokens' in "+CassandraDatacenterImpl.this);
- }
- if (flags.containsKey(CassandraNode.TOKENS) || flags.containsKey("tokens") || flags.containsKey("cassandra.tokens")) {
- // leave token config as-is
- } else if (!useVnodes()) {
- BigInteger token = getTokenGenerator().newToken();
- if (token != null) {
- allflags.put(CassandraNode.TOKENS, ImmutableSet.of(token));
- }
- }
-
- if ((flags.containsKey(CassandraNode.NUM_TOKENS_PER_NODE) || flags.containsKey("numTokensPerNode"))) {
- // leave num_tokens as-is
- } else if (useVnodes()) {
- Integer numTokensPerNode = getConfig(NUM_TOKENS_PER_NODE);
- allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, numTokensPerNode);
- } else {
- allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, 1);
- }
-
- return super.createNode(loc, allflags);
- }
-
- @Override
- protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) {
- Set<BigInteger> oldTokens = ((CassandraNode) member).getTokens();
- Set<BigInteger> newTokens = (oldTokens != null && oldTokens.size() > 0) ? getTokenGenerator().getTokensForReplacementNode(oldTokens) : null;
- return super.replaceMember(member, memberLoc, MutableMap.copyOf(extraFlags).add(CassandraNode.TOKENS, newTokens));
- }
-
- @Override
- public void start(Collection<? extends Location> locations) {
- Machines.warnIfLocalhost(locations, "CassandraCluster does not support multiple nodes on localhost, " +
- "due to assumptions Cassandra makes about the use of the same port numbers used across the cluster.");
-
- // force this to be set - even if it is using the default
- sensors().set(CLUSTER_NAME, getConfig(CLUSTER_NAME));
-
- super.start(locations);
-
- connectSensors();
-
- // TODO wait until all nodes which we think are up are consistent
- // i.e. all known nodes use the same schema, as reported by
- // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
- // once we've done that we can revert to using 2 seed nodes.
- // see CassandraCluster.DEFAULT_SEED_QUORUM
- // (also ensure the cluster is ready if we are about to run a creation script)
- Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
-
- String scriptUrl = getConfig(CassandraNode.CREATION_SCRIPT_URL);
- if (Strings.isNonEmpty(scriptUrl)) {
- executeScript(new ResourceUtils(this).getResourceAsString(scriptUrl));
- }
-
- update();
- }
-
- protected void connectSensors() {
- connectEnrichers();
-
- policies().add(PolicySpec.create(MemberTrackingPolicy.class)
- .displayName("Cassandra Cluster Tracker")
- .configure("sensorsToTrack", ImmutableSet.of(Attributes.SERVICE_UP, Attributes.HOSTNAME, CassandraNode.THRIFT_PORT))
- .configure("group", this));
- }
-
- public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
- @Override
- protected void onEntityChange(Entity member) {
- if (log.isDebugEnabled()) log.debug("Node {} updated in Cluster {}", member, this);
- ((CassandraDatacenterImpl)entity).update();
- }
- @Override
- protected void onEntityAdded(Entity member) {
- if (log.isDebugEnabled()) log.debug("Node {} added to Cluster {}", member, this);
- ((CassandraDatacenterImpl)entity).update();
- }
- @Override
- protected void onEntityRemoved(Entity member) {
- if (log.isDebugEnabled()) log.debug("Node {} removed from Cluster {}", member, this);
- ((CassandraDatacenterImpl)entity).update();
- }
- };
-
- @SuppressWarnings("unchecked")
- protected void connectEnrichers() {
- List<? extends List<? extends AttributeSensor<? extends Number>>> summingEnricherSetup = ImmutableList.of(
- ImmutableList.of(CassandraNode.READ_ACTIVE, READ_ACTIVE),
- ImmutableList.of(CassandraNode.READ_PENDING, READ_PENDING),
- ImmutableList.of(CassandraNode.WRITE_ACTIVE, WRITE_ACTIVE),
- ImmutableList.of(CassandraNode.WRITE_PENDING, WRITE_PENDING)
- );
-
- List<? extends List<? extends AttributeSensor<? extends Number>>> averagingEnricherSetup = ImmutableList.of(
- ImmutableList.of(CassandraNode.READS_PER_SECOND_LAST, READS_PER_SECOND_LAST_PER_NODE),
- ImmutableList.of(CassandraNode.WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_LAST_PER_NODE),
- ImmutableList.of(CassandraNode.WRITES_PER_SECOND_IN_WINDOW, WRITES_PER_SECOND_IN_WINDOW_PER_NODE),
- ImmutableList.of(CassandraNode.READS_PER_SECOND_IN_WINDOW, READS_PER_SECOND_IN_WINDOW_PER_NODE),
- ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY, THRIFT_PORT_LATENCY_PER_NODE),
- ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW, THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE),
- ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_LAST, PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE),
- ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_IN_WINDOW, PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE)
- );
-
- for (List<? extends AttributeSensor<? extends Number>> es : summingEnricherSetup) {
- AttributeSensor<? extends Number> t = es.get(0);
- AttributeSensor<? extends Number> total = es.get(1);
- enrichers().add(Enrichers.builder()
- .aggregating(t)
- .publishing(total)
- .fromMembers()
- .computingSum()
- .defaultValueForUnreportedSensors(null)
- .valueToReportIfNoSensors(null)
- .build());
- }
-
- for (List<? extends AttributeSensor<? extends Number>> es : averagingEnricherSetup) {
- AttributeSensor<Number> t = (AttributeSensor<Number>) es.get(0);
- AttributeSensor<Double> average = (AttributeSensor<Double>) es.get(1);
- enrichers().add(Enrichers.builder()
- .aggregating(t)
- .publishing(average)
- .fromMembers()
- .computingAverage()
- .defaultValueForUnreportedSensors(null)
- .valueToReportIfNoSensors(null)
- .build());
-
- }
- }
-
- @Override
- public void stop() {
- disconnectSensors();
-
- super.stop();
- }
-
- protected void disconnectSensors() {
- }
-
- @Override
- public void update() {
- synchronized (mutex) {
- // Update our seeds, as necessary
- seedTracker.refreshSeeds();
-
- // Choose the first available cluster member to set host and port (and compute one-up)
- Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
-
- if (upNode.isPresent()) {
- sensors().set(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
- sensors().set(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
-
- List<String> currentNodes = getAttribute(CASSANDRA_CLUSTER_NODES);
- Set<String> oldNodes = (currentNodes != null) ? ImmutableSet.copyOf(currentNodes) : ImmutableSet.<String>of();
- Set<String> newNodes = MutableSet.<String>of();
- for (Entity member : getMembers()) {
- if (member instanceof CassandraNode && Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) {
- String hostname = member.getAttribute(Attributes.HOSTNAME);
- Integer thriftPort = member.getAttribute(CassandraNode.THRIFT_PORT);
- if (hostname != null && thriftPort != null) {
- newNodes.add(HostAndPort.fromParts(hostname, thriftPort).toString());
- }
- }
- }
- if (Sets.symmetricDifference(oldNodes, newNodes).size() > 0) {
- sensors().set(CASSANDRA_CLUSTER_NODES, MutableList.copyOf(newNodes));
- }
- } else {
- sensors().set(HOSTNAME, null);
- sensors().set(THRIFT_PORT, null);
- sensors().set(CASSANDRA_CLUSTER_NODES, Collections.<String>emptyList());
- }
-
- ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyList(this, CASSANDRA_CLUSTER_NODES);
- }
- }
-
- /**
- * For tracking our seeds. This gets fiddly! High-level logic is:
- * <ul>
- * <li>If we have never reached quorum (i.e. have never published seeds), then continue to wait for quorum;
- * because entity-startup may be blocking for this. This is handled by the seedSupplier.
- * <li>If we previously reached quorum (i.e. have previousy published seeds), then always update;
- * we never want stale/dead entities listed in our seeds.
- * <li>If an existing seed looks unhealthy, then replace it.
- * <li>If a new potential seed becomes available (and we're in need of more), then add it.
- * <ul>
- *
- * Also note that {@link CassandraFabric} can take over, because it know about multiple sub-clusters!
- * It will provide a different {@link CassandraDatacenter#SEED_SUPPLIER}. Each time we think that our seeds
- * need to change, we call that. The fabric will call into {@link CassandraDatacenterImpl#gatherPotentialSeeds()}
- * to find out what's available.
- *
- * @author aled
- */
- protected class SeedTracker {
- private final Map<Entity, Boolean> memberUpness = Maps.newLinkedHashMap();
-
- public void onMemberRemoved(Entity member) {
- Set<Entity> seeds = getSeeds();
- boolean maybeRemove = seeds.contains(member);
- memberUpness.remove(member);
-
- if (maybeRemove) {
- refreshSeeds();
- } else {
- if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} removed)", new Object[] {CassandraDatacenterImpl.this, member});
- return;
- }
- }
- public void onHostnameChanged(Entity member, String hostname) {
- Set<Entity> seeds = getSeeds();
- int quorum = getSeedQuorumSize();
- boolean isViable = isViableSeed(member);
- boolean maybeAdd = isViable && seeds.size() < quorum;
- boolean maybeRemove = seeds.contains(member) && !isViable;
-
- if (maybeAdd || maybeRemove) {
- refreshSeeds();
- } else {
- if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed hostname {})", new Object[] {CassandraDatacenterImpl.this, member, hostname});
- return;
- }
- }
- public void onServiceUpChanged(Entity member, Boolean serviceUp) {
- Boolean oldVal = memberUpness.put(member, serviceUp);
- if (Objects.equal(oldVal, serviceUp)) {
- if (log.isTraceEnabled()) log.trace("Ignoring duplicate service-up in "+CassandraDatacenterImpl.this+" for "+member+", "+serviceUp);
- }
- Set<Entity> seeds = getSeeds();
- int quorum = getSeedQuorumSize();
- boolean isViable = isViableSeed(member);
- boolean maybeAdd = isViable && seeds.size() < quorum;
- boolean maybeRemove = seeds.contains(member) && !isViable;
-
- if (log.isDebugEnabled())
- log.debug("Considering refresh of seeds for "+CassandraDatacenterImpl.this+" because "+member+" is now "+serviceUp+" ("+isViable+" / "+maybeAdd+" / "+maybeRemove+")");
- if (maybeAdd || maybeRemove) {
- refreshSeeds();
- } else {
- if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed serviceUp {})", new Object[] {CassandraDatacenterImpl.this, member, serviceUp});
- return;
- }
- }
- protected Set<Entity> getSeeds() {
- Set<Entity> result = getAttribute(CURRENT_SEEDS);
- return (result == null) ? ImmutableSet.<Entity>of() : result;
- }
- public void refreshSeeds() {
- Set<Entity> oldseeds = getAttribute(CURRENT_SEEDS);
- Set<Entity> newseeds = getSeedSupplier().get();
- if (Objects.equal(oldseeds, newseeds)) {
- if (log.isTraceEnabled()) log.debug("Seed refresh no-op for cluster {}: still={}", new Object[] {CassandraDatacenterImpl.this, oldseeds});
- } else {
- if (log.isDebugEnabled()) log.debug("Refreshing seeds of cluster {}: now={}; old={}", new Object[] {this, newseeds, oldseeds});
- sensors().set(CURRENT_SEEDS, newseeds);
- if (newseeds != null && newseeds.size() > 0) {
- sensors().set(HAS_PUBLISHED_SEEDS, true);
- }
- }
- }
- public Set<Entity> gatherPotentialSeeds() {
- Set<Entity> result = Sets.newLinkedHashSet();
- for (Entity member : getMembers()) {
- if (isViableSeed(member)) {
- result.add(member);
- }
- }
- if (log.isTraceEnabled()) log.trace("Viable seeds in Cluster {}: {}", new Object[] {result});
- return result;
- }
- public Set<Entity> gatherPotentialRunningSeeds() {
- Set<Entity> result = Sets.newLinkedHashSet();
- for (Entity member : getMembers()) {
- if (isRunningSeed(member)) {
- result.add(member);
- }
- }
- if (log.isTraceEnabled()) log.trace("Viable running seeds in Cluster {}: {}", new Object[] {result});
- return result;
- }
- public boolean isViableSeed(Entity member) {
- // TODO would be good to reuse the better logic in ServiceFailureDetector
- // (e.g. if that didn't just emit a notification but set a sensor as well?)
- boolean managed = Entities.isManaged(member);
- String hostname = member.getAttribute(Attributes.HOSTNAME);
- boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
- Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
- boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
- boolean result = (hostname != null && !hasFailed);
- if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, this, result, hostname, serviceUp, serviceState, hasFailed});
- return result;
- }
- public boolean isRunningSeed(Entity member) {
- boolean viableSeed = isViableSeed(member);
- boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
- Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
- boolean result = viableSeed && serviceUp && serviceState == Lifecycle.RUNNING;
- if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: runningSeed={}; viableSeed={}; serviceUp={}; serviceState={}", new Object[] {member, this, result, viableSeed, serviceUp, serviceState});
- return result;
- }
- }
-
- @Override
- public String executeScript(String commands) {
- Entity someChild = Iterables.getFirst(getMembers(), null);
- if (someChild==null)
- throw new IllegalStateException("No Cassandra nodes available");
- // FIXME cross-etntity method-style calls such as below do not set up a queueing context (DynamicSequentialTask)
-// return ((CassandraNode)someChild).executeScript(commands);
- return Entities.invokeEffector(this, someChild, CassandraNode.EXECUTE_SCRIPT, MutableMap.of("commands", commands)).getUnchecked();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
deleted file mode 100644
index 5d9a9ca..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import java.util.Set;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.annotation.Effector;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.effector.MethodEffector;
-import org.apache.brooklyn.entity.group.DynamicFabric;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Multimap;
-import com.google.common.reflect.TypeToken;
-
-/**
- * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations.
- * <p>
- * Each {@link CassandraDatacenter} child instance is actually just a part of the whole cluster. It consists of the
- * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology).
- */
-@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " +
- "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
- "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
-@ImplementedBy(CassandraFabricImpl.class)
-public interface CassandraFabric extends DynamicFabric {
-
- ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey(
- "fabric.initial.quorumSize",
- "Initial fabric quorum size - number of initial nodes that must have been successfully started " +
- "to report success (if less than 0, then use a value based on INITIAL_SIZE of clusters)",
- -1);
-
- @SuppressWarnings("serial")
- ConfigKey<Function<Location, String>> DATA_CENTER_NAMER = ConfigKeys.newConfigKey(new TypeToken<Function<Location, String>>(){},
- "cassandra.fabric.datacenter.namer",
- "Function used to provide the cassandra.replication.datacenterName for a given location");
-
- int DEFAULT_SEED_QUORUM = 5;
-
- AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = CassandraDatacenter.DATACENTER_USAGE;
-
- AttributeSensor<Set<String>> DATACENTERS = CassandraDatacenter.DATACENTERS;
-
- AttributeSensor<Set<Entity>> CURRENT_SEEDS = CassandraDatacenter.CURRENT_SEEDS;
-
- AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = CassandraDatacenter.HAS_PUBLISHED_SEEDS;
-
- AttributeSensor<String> HOSTNAME = CassandraDatacenter.HOSTNAME;
-
- AttributeSensor<Integer> THRIFT_PORT = CassandraDatacenter.THRIFT_PORT;
-
- MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraFabric.class, "update");
-
- @Effector(description="Updates the cluster members")
- void update();
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
deleted file mode 100644
index 5aa108d..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.EntityPredicates;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
-import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import org.apache.brooklyn.entity.group.DynamicFabricImpl;
-import org.apache.brooklyn.entity.group.DynamicGroup;
-import org.apache.brooklyn.util.collections.CollectionFunctionals;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.MutableSet;
-import org.apache.brooklyn.util.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-
-/**
- * Implementation of {@link CassandraDatacenter}.
- * <p>
- * Serveral subtleties to note:
- * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
- * (so we wait for thrift port to be contactable)
- * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
- * (each up to 1m; often very close to the 1m)
- */
-public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric {
-
- private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class);
-
- // Mutex for synchronizing during re-size operations
- private final Object mutex = new Object[0];
-
- private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
- @Override public Set<Entity> get() {
- // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier
- Set<Entity> seeds = getAttribute(CURRENT_SEEDS);
- boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
- int quorumSize = getSeedQuorumSize();
-
- // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters
- // as we do not take a new seed from the new datacenter
- if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) {
- Set<Entity> newseeds;
- Map<CassandraDatacenter,Set<Entity>> potentialSeeds = MutableMap.of();
- int potentialSeedCount = 0;
- for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
- Set<Entity> dcPotentialSeeds = member.gatherPotentialSeeds();
- potentialSeeds.put(member, dcPotentialSeeds);
- potentialSeedCount += dcPotentialSeeds.size();
- }
-
- if (hasPublishedSeeds) {
- Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
- Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL);
- if (serviceState == Lifecycle.STARTING) {
- if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) {
- log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds});
- }
- newseeds = currentSeeds;
- } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) {
- if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds});
- newseeds = currentSeeds;
- } else if (potentialSeedCount == 0) {
- // TODO Could be race where nodes have only just returned from start() and are about to
- // transition to serviceUp; so don't just abandon all our seeds!
- log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
- newseeds = currentSeeds;
- } else if (!allNonEmpty(potentialSeeds.values())) {
- log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
- newseeds = currentSeeds;
- } else {
- Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
- if (log.isDebugEnabled() && !Objects.equal(seeds, result)) {
- log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds});
- }
- newseeds = result;
- }
- } else if (potentialSeedCount < quorumSize) {
- if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()});
- newseeds = ImmutableSet.of();
- } else if (!allNonEmpty(potentialSeeds.values())) {
- if (log.isDebugEnabled()) {
- Map<CassandraDatacenter, Integer> datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction());
- log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts});
- }
- newseeds = ImmutableSet.of();
- } else {
- // yay, we're quorate
- Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
- log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result});
- newseeds = result;
- }
-
- if (!Objects.equal(seeds, newseeds)) {
- sensors().set(CURRENT_SEEDS, newseeds);
-
- if (newseeds != null && newseeds.size() > 0) {
- sensors().set(HAS_PUBLISHED_SEEDS, true);
-
- // Need to tell every datacenter that seeds are ready.
- // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc),
- // and not call seedSupplier.get() again.
- for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
- member.update();
- }
- }
- return newseeds;
- } else {
- return seeds;
- }
- } else {
- if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}",
- new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds});
- return seeds;
- }
- }
- private boolean allNonEmpty(Collection<? extends Collection<Entity>> contenders) {
- for (Collection<Entity> contender: contenders)
- if (contender.isEmpty()) return false;
- return true;
- }
- private Set<Entity> selectSeeds(int num, Map<CassandraDatacenter,? extends Collection<Entity>> contenders) {
- // Prefer existing seeds wherever possible;
- // otherwise prefer a seed from each sub-cluster;
- // otherwise accept any other contenders
- Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
- MutableSet<Entity> result = MutableSet.of();
- result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values())));
- for (CassandraDatacenter cluster : contenders.keySet()) {
- Set<Entity> contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster));
- if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) {
- result.add(Iterables.getFirst(contendersInCluster, null));
- }
- }
- result.addAll(Iterables.concat(contenders.values()));
- return ImmutableSet.copyOf(Iterables.limit(result, num));
- }
- private boolean containsDownEntity(Set<Entity> seeds) {
- for (Entity seed : seeds) {
- if (!isViableSeed(seed)) {
- return true;
- }
- }
- return false;
- }
- public boolean isViableSeed(Entity member) {
- // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed
- boolean managed = Entities.isManaged(member);
- String hostname = member.getAttribute(Attributes.HOSTNAME);
- boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
- Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
- boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
- boolean result = (hostname != null && !hasFailed);
- if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed});
- return result;
- }
- };
-
- public CassandraFabricImpl() {
- }
-
- @Override
- public void init() {
- super.init();
-
- if (!config().getRaw(CassandraDatacenter.SEED_SUPPLIER).isPresentAndNonNull())
- config().set(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
-
- // track members
- policies().add(PolicySpec.create(MemberTrackingPolicy.class)
- .displayName("Cassandra Fabric Tracker")
- .configure("group", this));
-
- // Track first node's startup
- subscriptions().subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener<Long>() {
- @Override
- public void onEvent(SensorEvent<Long> event) {
- Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC);
- Long newval = event.getValue();
- if (oldval == null && newval != null) {
- sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
- for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
- ((EntityInternal)member).sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
- }
- }
- }
- });
-
- // Track the datacenters for this cluster
- subscriptions().subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener<Multimap<String,Entity>>() {
- @Override
- public void onEvent(SensorEvent<Multimap<String,Entity>> event) {
- Multimap<String, Entity> usage = calculateDatacenterUsage();
- sensors().set(DATACENTER_USAGE, usage);
- sensors().set(DATACENTERS, usage.keySet());
- }
- });
- subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
- @Override public void onEvent(SensorEvent<Entity> event) {
- Multimap<String, Entity> usage = calculateDatacenterUsage();
- sensors().set(DATACENTER_USAGE, usage);
- sensors().set(DATACENTERS, usage.keySet());
- }
- });
- }
-
- public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
- @Override
- protected void onEntityChange(Entity member) {
- if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity);
- ((CassandraFabricImpl)entity).update();
- }
- @Override
- protected void onEntityAdded(Entity member) {
- if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity);
- ((CassandraFabricImpl)entity).update();
- }
- @Override
- protected void onEntityRemoved(Entity member) {
- if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity);
- ((CassandraFabricImpl)entity).update();
- }
- };
-
- protected int getSeedQuorumSize() {
- Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
- if (quorumSize!=null && quorumSize>0)
- return quorumSize;
-
- int initialSizeSum = 0;
- for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
- initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE);
- }
- if (initialSizeSum>5) initialSizeSum /= 2;
- else if (initialSizeSum>3) initialSizeSum -= 2;
- else if (initialSizeSum>2) initialSizeSum -= 1;
-
- return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM);
- }
-
- /**
- * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters.
- */
- @Override
- protected EntitySpec<?> getMemberSpec() {
- // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config
- // (unless they've explicitly overridden the seedSupplier as well!)
- // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default!
- EntitySpec<?> custom = getConfig(MEMBER_SPEC);
- if (custom == null) {
- return EntitySpec.create(CassandraDatacenter.class)
- .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
- } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) {
- return custom;
- } else {
- return EntitySpec.create(custom)
- .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
- }
- }
-
- @Override
- protected Entity createCluster(Location location, Map flags) {
- Function<Location, String> dataCenterNamer = getConfig(DATA_CENTER_NAMER);
- if (dataCenterNamer != null) {
- flags = ImmutableMap.builder()
- .putAll(flags)
- .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location))
- .build();
- }
- return super.createCluster(location, flags);
- }
-
- /**
- * Prefers one node per location, and then others from anywhere.
- * Then trims result down to the "quorumSize".
- */
- public Supplier<Set<Entity>> getSeedSupplier() {
- return defaultSeedSupplier;
- }
-
- @Override
- public void start(Collection<? extends Location> locations) {
- super.start(locations);
-
- connectSensors();
-
- // TODO wait until all nodes which we think are up are consistent
- // i.e. all known nodes use the same schema, as reported by
- // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
- // once we've done that we can revert to using 2 seed nodes.
- // see CassandraCluster.DEFAULT_SEED_QUORUM
- Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER));
-
- update();
- }
-
- protected void connectSensors() {
- connectEnrichers();
- }
-
- protected void connectEnrichers() {
- // TODO Aggregate across sub-clusters
-
- subscriptions().subscribeToMembers(this, SERVICE_UP, new SensorEventListener<Boolean>() {
- @Override public void onEvent(SensorEvent<Boolean> event) {
- sensors().set(SERVICE_UP, calculateServiceUp());
- }
- });
- }
-
- @Override
- public void stop() {
- disconnectSensors();
-
- super.stop();
- }
-
- protected void disconnectSensors() {
- }
-
- protected boolean calculateServiceUp() {
- Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
- return upNode.isPresent();
- }
-
- protected Multimap<String, Entity> calculateDatacenterUsage() {
- Multimap<String, Entity> result = LinkedHashMultimap.<String, Entity>create();
- for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
- Multimap<String, Entity> memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE);
- if (memberUsage != null) result.putAll(memberUsage);
- }
- return result;
- }
-
- @Override
- public void update() {
- synchronized (mutex) {
- for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
- member.update();
- }
-
- calculateServiceUp();
-
- // Choose the first available location to set host and port (and compute one-up)
- Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
-
- if (upNode.isPresent()) {
- sensors().set(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
- sensors().set(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
deleted file mode 100644
index fb937ae..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import java.math.BigInteger;
-import java.util.Set;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.effector.Effector;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.BasicConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.entity.BrooklynConfigKeys;
-import org.apache.brooklyn.core.location.PortRanges;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.entity.database.DatastoreMixins;
-import org.apache.brooklyn.entity.java.UsesJavaMXBeans;
-import org.apache.brooklyn.entity.java.UsesJmx;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.util.core.flags.SetFromFlag;
-import org.apache.brooklyn.util.time.Duration;
-
-import com.google.common.reflect.TypeToken;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a Cassandra node in a {@link CassandraDatacenter}.
- */
-@Catalog(name="Apache Cassandra Node", description="Cassandra is a highly scalable, eventually " +
- "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
- "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
-@ImplementedBy(CassandraNodeImpl.class)
-public interface CassandraNode extends DatastoreMixins.DatastoreCommon, SoftwareProcess, UsesJmx, UsesJavaMXBeans, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript {
-
- @SetFromFlag("version")
- ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.16");
- // when this changes remember to put a copy under releng2:/var/www/developer/brooklyn/repository/ !
- // TODO experiment with supporting 2.0.x
-
- @SetFromFlag("downloadUrl")
- BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
- SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-cassandra-${version}-bin.tar.gz");
-
- /** download mirror, if desired */
- @SetFromFlag("mirrorUrl")
- ConfigKey<String> MIRROR_URL = new BasicConfigKey<String>(String.class, "cassandra.install.mirror.url", "URL of mirror",
- "http://www.mirrorservice.org/sites/ftp.apache.org/cassandra"
- // for older versions, but slower:
-// "http://archive.apache.org/dist/cassandra/"
- );
-
- @SetFromFlag("tgzUrl")
- ConfigKey<String> TGZ_URL = new BasicConfigKey<String>(String.class, "cassandra.install.tgzUrl", "URL of TGZ download file");
-
- @SetFromFlag("clusterName")
- BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = CassandraDatacenter.CLUSTER_NAME;
-
- @SetFromFlag("snitchName")
- ConfigKey<String> ENDPOINT_SNITCH_NAME = CassandraDatacenter.ENDPOINT_SNITCH_NAME;
-
- @SetFromFlag("gossipPort")
- PortAttributeSensorAndConfigKey GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.gossip.port", "Cassandra Gossip communications port", PortRanges.fromString("7000+"));
-
- @SetFromFlag("sslGgossipPort")
- PortAttributeSensorAndConfigKey SSL_GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.ssl-gossip.port", "Cassandra Gossip SSL communications port", PortRanges.fromString("7001+"));
-
- @SetFromFlag("thriftPort")
- PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("cassandra.thrift.port", "Cassandra Thrift RPC port", PortRanges.fromString("9160+"));
-
- @SetFromFlag("nativePort")
- PortAttributeSensorAndConfigKey NATIVE_TRANSPORT_PORT = new PortAttributeSensorAndConfigKey("cassandra.native.port", "Cassandra Native Transport port", PortRanges.fromString("9042+"));
-
- @SetFromFlag("rmiRegistryPort")
- // cassandra nodetool and others want 7199 - not required, but useful
- PortAttributeSensorAndConfigKey RMI_REGISTRY_PORT = new PortAttributeSensorAndConfigKey(UsesJmx.RMI_REGISTRY_PORT,
- PortRanges.fromInteger(7199));
-
- // some of the cassandra tooing (eg nodetool) use RMI, but we want JMXMP, so do both!
- ConfigKey<JmxAgentModes> JMX_AGENT_MODE = ConfigKeys.newConfigKeyWithDefault(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.JMXMP_AND_RMI);
-
- // TODO the multicloud-snitch has to be built manually, it is available in the brooklyn sandbox
- @SetFromFlag("customSnitchJarUrl")
- ConfigKey<String> CUSTOM_SNITCH_JAR_URL = ConfigKeys.newStringConfigKey("cassandra.config.customSnitchUrl",
- "URL for a jar file to be uploaded (e.g. \"classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-multicloud-snitch.jar\"); defaults to null which means nothing to upload",
- null);
-
- @SetFromFlag("cassandraConfigTemplateUrl")
- ConfigKey<String> CASSANDRA_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
- "cassandra.config.templateUrl", "A URL (in freemarker format) for a cassandra.yaml config file (in freemarker format)",
- "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-${entity.majorMinorVersion}.yaml");
-
- @SetFromFlag("cassandraConfigFileName")
- ConfigKey<String> CASSANDRA_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
- "cassandra.config.fileName", "Name for the copied config file", "cassandra.yaml");
-
- @SetFromFlag("cassandraRackdcConfigTemplateUrl")
- ConfigKey<String> CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
- "cassandra.config.rackdc.templateUrl", "Template file (in freemarker format) for the cassandra-rackdc.properties config file",
- "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-rackdc.properties");
-
- @SetFromFlag("cassandraRackdcConfigFileName")
- ConfigKey<String> CASSANDRA_RACKDC_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
- "cassandra.config.rackdc.fileName", "Name for the copied rackdc config file (used for configuring replication, when a suitable snitch is used)", "cassandra-rackdc.properties");
-
- @SetFromFlag("datacenterName")
- BasicAttributeSensorAndConfigKey<String> DATACENTER_NAME = new BasicAttributeSensorAndConfigKey<String>(
- String.class, "cassandra.replication.datacenterName", "Datacenter name (used for configuring replication, when a suitable snitch is used)",
- null);
-
- @SetFromFlag("rackName")
- BasicAttributeSensorAndConfigKey<String> RACK_NAME = new BasicAttributeSensorAndConfigKey<String>(
- String.class, "cassandra.replication.rackName", "Rack name (used for configuring replication, when a suitable snitch is used)",
- null);
-
- ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode",
- "Number of tokens per node; if using vnodes, should set this to a value like 256",
- 1);
-
- @SetFromFlag("tokens")
- @SuppressWarnings("serial")
- BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>(
- new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens");
-
- @SetFromFlag("useThriftMonitoring")
- ConfigKey<Boolean> USE_THRIFT_MONITORING = ConfigKeys.newConfigKey("thriftMonitoring.enabled", "Thrift-port monitoring enabled", Boolean.TRUE);
-
- AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster");
-
- AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster");
-
- /* Metrics for read/write performance. */
-
- AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.read.pending", "Current pending ReadStage tasks");
- AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.read.active", "Current active ReadStage tasks");
- AttributeSensor<Long> READ_COMPLETED = Sensors.newLongSensor("cassandra.read.completed", "Total completed ReadStage tasks");
- AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.write.pending", "Current pending MutationStage tasks");
- AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.write.active", "Current active MutationStage tasks");
- AttributeSensor<Long> WRITE_COMPLETED = Sensors.newLongSensor("cassandra.write.completed", "Total completed MutationStage tasks");
-
- AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("cassandra.service.jmx.up", "Whether JMX is up for this service");
- AttributeSensor<Long> THRIFT_PORT_LATENCY = Sensors.newLongSensor("cassandra.thrift.latency", "Latency for thrift port connection (ms) or null if down");
-
- AttributeSensor<Double> READS_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.reads.perSec.last", "Reads/sec (last datapoint)");
- AttributeSensor<Double> WRITES_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.write.perSec.last", "Writes/sec (last datapoint)");
-
- AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed", "Latency for thrift port (ms, averaged over time window)");
- AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed", "Reads/sec (over time window)");
- AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed", "Writes/sec (over time window)");
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- ConfigKey<Set<Entity>> INITIAL_SEEDS = (ConfigKey)ConfigKeys.newConfigKey(Set.class, "cassandra.cluster.seeds.initial",
- "List of cluster nodes to seed this node");
-
- ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES);
-
- ConfigKey<String> LISTEN_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.listenAddressSensor", "sensor name from which to take the listen address; default (null) is a smart lookup");
- ConfigKey<String> BROADCAST_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.broadcastAddressSensor", "sensor name from which to take the broadcast address; default (null) is a smart lookup");
- ConfigKey<String> RPC_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.rpcAddressSensor", "sensor name from which to take the RPC address; default (null) is 0.0.0.0");
-
- Effector<String> EXECUTE_SCRIPT = CassandraDatacenter.EXECUTE_SCRIPT;
-
- /* Accessors used from template */
-
- String getMajorMinorVersion();
- Integer getGossipPort();
- Integer getSslGossipPort();
- Integer getThriftPort();
- Integer getNativeTransportPort();
- String getClusterName();
- String getListenAddress();
- String getBroadcastAddress();
- String getRpcAddress();
- String getSeeds();
-
- String getPrivateIp();
- String getPublicIp();
-
- int getNumTokensPerNode();
-
- /**
- * Returns the set of tokens.
- * Each is in the range 0 to (2^127)-1.
- * Returns null if there are no tokens.
- */
- Set<BigInteger> getTokens();
-
- /** string value of comma-separated tokens; or blank if none */
- String getTokensAsString();
-
- /* For configuration */
-
- void setToken(String token);
-
- /* Using Cassandra */
-
- String executeScript(String commands);
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java b/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
deleted file mode 100644
index 3893373..0000000
--- a/brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.nosql.cassandra;
-
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessDriver;
-import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
-
-public interface CassandraNodeDriver extends JavaSoftwareProcessDriver {
-
- Integer getGossipPort();
-
- Integer getSslGossipPort();
-
- Integer getThriftPort();
-
- Integer getNativeTransportPort();
-
- String getClusterName();
-
- String getCassandraConfigTemplateUrl();
-
- String getCassandraConfigFileName();
-
- boolean isClustered();
-
- ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
-
- /** returns the address that the given hostname resolves to at the target */
- String getResolvedAddress(String hostname);
-
-}