You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/08/06 18:32:31 UTC
[17/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the
NoSQL packages
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java
new file mode 100644
index 0000000..7036285
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+/**
+ * @deprecated since 0.7.0; use {@link CassandraDatacenter} which is equivalent but has
+ * a less ambiguous name; <em>Cluster</em> in Cassandra corresponds to what Brooklyn terms a <em>Fabric</em>.
+ */
+@Deprecated
+public class CassandraClusterImpl extends CassandraDatacenterImpl implements CassandraCluster {
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java
new file mode 100644
index 0000000..7ef646f
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.brooklyn.catalog.Catalog;
+import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.database.DatastoreMixins;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A group of {@link CassandraNode}s -- based on Brooklyn's {@link DynamicCluster}
+ * (though it is a "Datacenter" in Cassandra terms, where Cassandra's "cluster" corresponds
+ * to a Brooklyn Fabric, cf {@link CassandraFabric}).
+ * The Datacenter can be resized, manually or by policy if required.
+ * Tokens are selected intelligently.
+ * <p>
+ * Note that due to how Cassandra assumes ports are the same across a cluster,
+ * it is <em>NOT</em> possible to deploy a cluster of size larger than 1 to localhost.
+ * (Some exploratory work has been done to use different 127.0.0.x IP's for localhost,
+ * and there is evidence this could be made to work.)
+ */
+@Catalog(name="Apache Cassandra Datacenter Cluster", description="Cassandra is a highly scalable, eventually " +
+ "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
+ "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@ImplementedBy(CassandraDatacenterImpl.class)
+public interface CassandraDatacenter extends DynamicCluster, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript {
+
+ // FIXME datacenter name -- also CASS_CLUSTER_NODES should be CASS_DC_NODES
+ @SetFromFlag("clusterName")
+ BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, "cassandra.cluster.name", "Name of the Cassandra cluster", "BrooklynCluster");
+
+ @SetFromFlag("snitchName")
+ ConfigKey<String> ENDPOINT_SNITCH_NAME = ConfigKeys.newStringConfigKey("cassandra.cluster.snitchName", "Type of the Cassandra snitch", "SimpleSnitch");
+
+ @SetFromFlag("seedSupplier")
+ @SuppressWarnings("serial")
+ ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = ConfigKeys.newConfigKey(new TypeToken<Supplier<Set<Entity>>>() { }, "cassandra.cluster.seedSupplier", "For determining the seed nodes", null);
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("tokenGeneratorClass")
+ ConfigKey<Class<? extends TokenGenerator>> TOKEN_GENERATOR_CLASS = ConfigKeys.newConfigKey(
+ new TypeToken<Class<? extends TokenGenerator>>() {}, "cassandra.cluster.tokenGenerator.class", "For determining the tokens of nodes",
+ PosNeg63TokenGenerator.class);
+
+ @SetFromFlag("tokenShift")
+ ConfigKey<BigInteger> TOKEN_SHIFT = ConfigKeys.newConfigKey(BigInteger.class, "cassandra.cluster.tokenShift",
+ "Delta applied to all tokens generated for this Cassandra datacenter, "
+ + "useful when configuring multiple datacenters which should be shifted; "
+ + "if not set, a random shift is applied. (Pass 0 to prevent any shift.)", null);
+
+ ConfigKey<Boolean> USE_VNODES = ConfigKeys.newBooleanConfigKey(
+ "cassandra.cluster.useVnodes",
+ "Determines whether to use vnodes; if doing so, tokens will not be explicitly assigned to nodes in the cluster",
+ false);
+
+ /**
+ * num_tokens will automatically be reset to 1 for each node if {@link #USE_VNODES} is false.
+ */
+ ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode",
+ "Number of tokens per node; if using vnodes, should set this to a value like 256; will be overridden to 1 if USE_VNODES==false",
+ 256);
+
+ /**
+ * Additional time after the nodes in the cluster are up when starting
+ * before announcing the cluster as up.
+ * <p>
+ * Useful to ensure nodes have synchronized.
+ * <p>
+ * On 1.2.2 this could be as much as 120s when using 2 seed nodes,
+ * or just a few seconds with 1 seed node. On 1.2.9 it seems a few
+ * seconds is sufficient even with 2 seed nodes
+ */
+ @SetFromFlag("delayBeforeAdvertisingCluster")
+ ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "cassandra.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS);
+
+ @SuppressWarnings("serial")
+ AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = Sensors.newSensor(new TypeToken<Multimap<String,Entity>>() { }, "cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with nodes in each");
+
+ @SuppressWarnings("serial")
+ AttributeSensor<Set<String>> DATACENTERS = Sensors.newSensor(new TypeToken<Set<String>>() { }, "cassandra.cluster.datacenters", "Current set of datacenters in use");
+
+ AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we have published any seeds");
+
+ @SuppressWarnings("serial")
+ AttributeSensor<Set<Entity>> CURRENT_SEEDS = Sensors.newSensor(new TypeToken<Set<Entity>>() { }, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster");
+
+ AttributeSensor<String> HOSTNAME = Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to cluster with");
+
+ @SuppressWarnings("serial")
+ AttributeSensor<List<String>> CASSANDRA_CLUSTER_NODES = Sensors.newSensor(new TypeToken<List<String>>() {},
+ "cassandra.cluster.nodes", "List of host:port of all active nodes in the cluster (thrift port, and public hostname/IP)");
+
+ AttributeSensor<Integer> THRIFT_PORT = Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC port to connect to cluster with");
+
+ AttributeSensor<Long> FIRST_NODE_STARTED_TIME_UTC = Sensors.newLongSensor("cassandra.cluster.first.node.started.utc", "Time (UTC) when the first node was started");
+ @SuppressWarnings("serial")
+ AttributeSensor<List<Entity>> QUEUED_START_NODES = Sensors.newSensor(new TypeToken<List<Entity>>() {}, "cassandra.cluster.start.nodes.queued",
+ "Nodes queued for starting (for sequential start)");
+
+ AttributeSensor<Integer> SCHEMA_VERSION_COUNT = Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count",
+ "Number of different schema versions in the cluster; should be 1 for a healthy cluster, 0 when off; " +
+ "2 and above indicats a Schema Disagreement Error (and keyspace access may fail)");
+
+ AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.cluster.read.pending", "Current pending ReadStage tasks");
+ AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.read.active", "Current active ReadStage tasks");
+ AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.cluster.write.pending", "Current pending MutationStage tasks");
+ AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.write.active", "Current active MutationStage tasks");
+
+ AttributeSensor<Long> THRIFT_PORT_LATENCY_PER_NODE = Sensors.newLongSensor("cassandra.cluster.thrift.latency.perNode", "Latency for thrift port connection averaged over all nodes (ms)");
+ AttributeSensor<Double> READS_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.last.perNode", "Reads/sec (last datapoint) averaged over all nodes");
+ AttributeSensor<Double> WRITES_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.write.perSec.last.perNode", "Writes/sec (last datapoint) averaged over all nodes");
+ AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.perNode", "Fraction of CPU time used (percentage reported by JMX), averaged over all nodes");
+
+ AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed.perNode", "Reads/sec (over time window) averaged over all nodes");
+ AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed.perNode", "Writes/sec (over time window) averaged over all nodes");
+ AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed.perNode", "Latency for thrift port (ms, over time window) averaged over all nodes");
+ AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.windowed", "Fraction of CPU time used (percentage, over time window), averaged over all nodes");
+
+ MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraDatacenter.class, "update");
+
+ brooklyn.entity.Effector<String> EXECUTE_SCRIPT = Effectors.effector(DatastoreMixins.EXECUTE_SCRIPT)
+ .description("executes the given script contents using cassandra-cli")
+ .buildAbstract();
+
+ /**
+ * Sets the number of nodes used to seed the cluster.
+ * <p>
+ * Version 1.2.2 is buggy and requires a big delay for 2 nodes both seeds to reconcile,
+ * with 1.2.9 this seems fine, with just a few seconds' delay after starting.
+ *
+ * @see <a href="http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005" />
+ */
+ int DEFAULT_SEED_QUORUM = 2;
+
+ /**
+ * Can insert a delay after the first node comes up.
+ * <p>
+ * Reportedly not needed with 1.2.9, but we are still seeing some seed failures so re-introducing it.
+ * (This does not seem to help with the bug in 1.2.2.)
+ */
+ Duration DELAY_AFTER_FIRST = Duration.ONE_MINUTE;
+
+ /**
+ * If set (ie non-null), this waits the indicated time after a successful launch of one node
+ * before starting the next. (If it is null, all nodes start simultaneously,
+ * possibly after the DELAY_AFTER_FIRST.)
+ * <p>
+ * When subsequent nodes start simultaneously, we occasionally see schema disagreement problems;
+ * if nodes start sequentially, we occasionally get "no sources for (tokenRange]" problems.
+ * Either way the node stops. Ideally this can be solved at the Cassandra level,
+ * but if not, we will have to introduce some restarts at the Cassandra nodes (which does seem
+ * to resolve the problems.)
+ */
+ Duration DELAY_BETWEEN_STARTS = null;
+
+ /**
+ * Whether to wait for the first node to start up
+ * <p>
+ * not sure whether this is needed or not. Need to test in env where not all nodes are seed nodes,
+ * what happens if non-seed nodes start before the seed nodes?
+ */
+ boolean WAIT_FOR_FIRST = true;
+
+ @Effector(description="Updates the cluster members")
+ void update();
+
+ /**
+ * The name of the cluster.
+ */
+ String getClusterName();
+
+ Set<Entity> gatherPotentialSeeds();
+
+ Set<Entity> gatherPotentialRunningSeeds();
+
+ String executeScript(String commands);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
new file mode 100644
index 0000000..baa9a17
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.location.basic.Machines;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
+
+/**
+ * Implementation of {@link CassandraDatacenter}.
+ * <p>
+ * Several subtleties to note:
+ * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
+ * (so we wait for thrift port to be contactable)
+ * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
+ * (each up to 1m; often very close to the 1m)
+ */
+public class CassandraDatacenterImpl extends DynamicClusterImpl implements CassandraDatacenter {
+
+ /*
+ * TODO Seed management is hard!
+ * - The ServiceRestarter is not doing customize(), so is not refreshing the seeds in cassandra.yaml.
+ * If we have two nodes that were seeds for each other and they both restart at the same time, we'll have a split brain.
+ */
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterImpl.class);
+
+ // Mutex for synchronizing during re-size operations
+ private final Object mutex = new Object[0];
+
+ private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
+ // Mutex for (re)calculating our seeds
+ // TODO is this very dangerous?! Calling out to SeedTracker, which calls out to alien getAttribute()/getConfig(). But I think that's ok.
+ // TODO might not need mutex? previous race was being caused by something else, other than concurrent calls!
+ private final Object seedMutex = new Object();
+
+ @Override
+ public Set<Entity> get() {
+ synchronized (seedMutex) {
+ boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
+ int quorumSize = getSeedQuorumSize();
+ Set<Entity> potentialSeeds = gatherPotentialSeeds();
+ Set<Entity> potentialRunningSeeds = gatherPotentialRunningSeeds();
+ boolean stillWaitingForQuorum = (!hasPublishedSeeds) && (potentialSeeds.size() < quorumSize);
+
+ if (stillWaitingForQuorum) {
+ if (log.isDebugEnabled()) log.debug("Not refreshed seeds of cluster {}, because still waiting for quorum (need {}; have {} potentials)", new Object[] {CassandraDatacenterImpl.class, quorumSize, potentialSeeds.size()});
+ return ImmutableSet.of();
+ } else if (hasPublishedSeeds) {
+ Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
+ if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) {
+ if (Sets.intersection(currentSeeds, potentialSeeds).isEmpty()) {
+ log.warn("Cluster {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraDatacenterImpl.this, currentSeeds});
+ }
+ return currentSeeds;
+ } else if (potentialRunningSeeds.isEmpty()) {
+ // TODO Could be race where nodes have only just returned from start() and are about to
+ // transition to serviceUp; so don't just abandon all our seeds!
+ log.warn("Cluster {} has no running seeds (yet?); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraDatacenterImpl.this});
+ return currentSeeds;
+ } else {
+ Set<Entity> result = trim(quorumSize, potentialRunningSeeds);
+ log.debug("Cluster {} updating seeds: chosen={}; potentialRunning={}", new Object[] {CassandraDatacenterImpl.this, result, potentialRunningSeeds});
+ return result;
+ }
+ } else {
+ Set<Entity> result = trim(quorumSize, potentialSeeds);
+ if (log.isDebugEnabled()) log.debug("Cluster {} has reached seed quorum: seeds={}", new Object[] {CassandraDatacenterImpl.this, result});
+ return result;
+ }
+ }
+ }
+ private Set<Entity> trim(int num, Set<Entity> contenders) {
+ // Prefer existing seeds wherever possible; otherwise accept any other contenders
+ Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
+ Set<Entity> result = Sets.newLinkedHashSet();
+ result.addAll(Sets.intersection(currentSeeds, contenders));
+ result.addAll(contenders);
+ return ImmutableSet.copyOf(Iterables.limit(result, num));
+ }
+ };
+
+ protected SeedTracker seedTracker = new SeedTracker();
+ protected TokenGenerator tokenGenerator = null;
+
+ public CassandraDatacenterImpl() {
+ }
+
+ @Override
+ public void init() {
+ super.init();
+
+ /*
+ * subscribe to hostname, and keep an accurate set of current seeds in a sensor;
+ * then at nodes we set the initial seeds to be the current seeds when ready (non-empty)
+ */
+ subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener<String>() {
+ @Override
+ public void onEvent(SensorEvent<String> event) {
+ seedTracker.onHostnameChanged(event.getSource(), event.getValue());
+ }
+ });
+ subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ seedTracker.onMemberRemoved(event.getValue());
+ }
+ });
+ subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
+ @Override
+ public void onEvent(SensorEvent<Boolean> event) {
+ seedTracker.onServiceUpChanged(event.getSource(), event.getValue());
+ }
+ });
+ subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener<Lifecycle>() {
+ @Override
+ public void onEvent(SensorEvent<Lifecycle> event) {
+ // trigger a recomputation also when lifecycle state changes,
+ // because it might not have ruled a seed as inviable when service up went true
+ // because service state was not yet running
+ seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue());
+ }
+ });
+
+ // Track the datacenters for this cluster
+ subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener<String>() {
+ @Override
+ public void onEvent(SensorEvent<String> event) {
+ Entity member = event.getSource();
+ String dcName = event.getValue();
+ if (dcName != null) {
+ Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
+ Multimap<String, Entity> mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.<String, Entity>create() : LinkedHashMultimap.create(datacenterUsage);
+ Optional<String> oldDcName = getKeyOfVal(mutableDatacenterUsage, member);
+ if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) {
+ mutableDatacenterUsage.values().remove(member);
+ mutableDatacenterUsage.put(dcName, member);
+ setAttribute(DATACENTER_USAGE, mutableDatacenterUsage);
+ setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
+ }
+ }
+ }
+ private <K,V> Optional<K> getKeyOfVal(Multimap<K,V> map, V val) {
+ for (Map.Entry<K,V> entry : map.entries()) {
+ if (Objects.equal(val, entry.getValue())) {
+ return Optional.of(entry.getKey());
+ }
+ }
+ return Optional.absent();
+ }
+ });
+ subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ Entity entity = event.getSource();
+ Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
+ if (datacenterUsage != null && datacenterUsage.containsValue(entity)) {
+ Multimap<String, Entity> mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage);
+ mutableDatacenterUsage.values().remove(entity);
+ setAttribute(DATACENTER_USAGE, mutableDatacenterUsage);
+ setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
+ }
+ }
+ });
+
+ getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() {
+ @Override
+ public String call(ConfigBag parameters) {
+ return executeScript((String)parameters.getStringKey("commands"));
+ }
+ });
+ }
+
+ protected Supplier<Set<Entity>> getSeedSupplier() {
+ Supplier<Set<Entity>> seedSupplier = getConfig(SEED_SUPPLIER);
+ return (seedSupplier == null) ? defaultSeedSupplier : seedSupplier;
+ }
+
+ protected boolean useVnodes() {
+ return Boolean.TRUE.equals(getConfig(USE_VNODES));
+ }
+
+ protected synchronized TokenGenerator getTokenGenerator() {
+ if (tokenGenerator!=null)
+ return tokenGenerator;
+
+ try {
+ tokenGenerator = getConfig(TOKEN_GENERATOR_CLASS).newInstance();
+
+ BigInteger shift = getConfig(TOKEN_SHIFT);
+ if (shift==null)
+ shift = BigDecimal.valueOf(Math.random()).multiply(
+ new BigDecimal(tokenGenerator.range())).toBigInteger();
+ tokenGenerator.setOrigin(shift);
+
+ return tokenGenerator;
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ protected int getSeedQuorumSize() {
+ Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
+ if (quorumSize!=null && quorumSize>0)
+ return quorumSize;
+ // default 2 is recommended, unless initial size is smaller
+ return Math.min(Math.max(getConfig(INITIAL_SIZE), 1), DEFAULT_SEED_QUORUM);
+ }
+
+ @Override
+ public Set<Entity> gatherPotentialSeeds() {
+ return seedTracker.gatherPotentialSeeds();
+ }
+
+ @Override
+ public Set<Entity> gatherPotentialRunningSeeds() {
+ return seedTracker.gatherPotentialRunningSeeds();
+ }
+
+ /**
+ * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra nodes.
+ */
+ @Override
+ protected EntitySpec<?> getMemberSpec() {
+ return getConfig(MEMBER_SPEC, EntitySpec.create(CassandraNode.class));
+ }
+
+ @Override
+ public String getClusterName() {
+ return getAttribute(CLUSTER_NAME);
+ }
+
+ @Override
+ public Collection<Entity> grow(int delta) {
+ if (useVnodes()) {
+ // nothing to do for token generator
+ } else {
+ if (getCurrentSize() == 0) {
+ getTokenGenerator().growingCluster(delta);
+ }
+ }
+ return super.grow(delta);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected Entity createNode(@Nullable Location loc, Map<?,?> flags) {
+ Map<Object, Object> allflags = MutableMap.copyOf(flags);
+
+ if ((flags.containsKey(CassandraNode.TOKEN) || flags.containsKey("token")) || (flags.containsKey(CassandraNode.TOKENS) || flags.containsKey("tokens"))) {
+ // leave token config as-is
+ } else if (!useVnodes()) {
+ BigInteger token = getTokenGenerator().newToken();
+ allflags.put(CassandraNode.TOKEN, token);
+ }
+
+ if ((flags.containsKey(CassandraNode.NUM_TOKENS_PER_NODE) || flags.containsKey("numTokensPerNode"))) {
+ // leave num_tokens as-is
+ } else if (useVnodes()) {
+ Integer numTokensPerNode = getConfig(NUM_TOKENS_PER_NODE);
+ allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, numTokensPerNode);
+ } else {
+ allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, 1);
+ }
+
+ return super.createNode(loc, allflags);
+ }
+
+ @Override
+ protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) {
+ Set<BigInteger> oldTokens = ((CassandraNode) member).getTokens();
+ Set<BigInteger> newTokens = (oldTokens != null && oldTokens.size() > 0) ? getTokenGenerator().getTokensForReplacementNode(oldTokens) : null;
+ return super.replaceMember(member, memberLoc, MutableMap.copyOf(extraFlags).add(CassandraNode.TOKENS, newTokens));
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locations) {
+ Machines.warnIfLocalhost(locations, "CassandraCluster does not support multiple nodes on localhost, " +
+ "due to assumptions Cassandra makes about the use of the same port numbers used across the cluster.");
+
+ // force this to be set - even if it is using the default
+ setAttribute(CLUSTER_NAME, getConfig(CLUSTER_NAME));
+
+ super.start(locations);
+
+ connectSensors();
+
+ // TODO wait until all nodes which we think are up are consistent
+ // i.e. all known nodes use the same schema, as reported by
+ // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
+ // once we've done that we can revert to using 2 seed nodes.
+ // see CassandraCluster.DEFAULT_SEED_QUORUM
+ // (also ensure the cluster is ready if we are about to run a creation script)
+ Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
+
+ String scriptUrl = getConfig(CassandraNode.CREATION_SCRIPT_URL);
+ if (Strings.isNonEmpty(scriptUrl)) {
+ executeScript(new ResourceUtils(this).getResourceAsString(scriptUrl));
+ }
+
+ update();
+ }
+
+ protected void connectSensors() {
+ connectEnrichers();
+
+ addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+ .displayName("Cassandra Cluster Tracker")
+ .configure("sensorsToTrack", ImmutableSet.of(Attributes.SERVICE_UP, Attributes.HOSTNAME, CassandraNode.THRIFT_PORT))
+ .configure("group", this));
+ }
+
+ public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+ @Override
+ protected void onEntityChange(Entity member) {
+ if (log.isDebugEnabled()) log.debug("Node {} updated in Cluster {}", member, this);
+ ((CassandraDatacenterImpl)entity).update();
+ }
+ @Override
+ protected void onEntityAdded(Entity member) {
+ if (log.isDebugEnabled()) log.debug("Node {} added to Cluster {}", member, this);
+ ((CassandraDatacenterImpl)entity).update();
+ }
+ @Override
+ protected void onEntityRemoved(Entity member) {
+ if (log.isDebugEnabled()) log.debug("Node {} removed from Cluster {}", member, this);
+ ((CassandraDatacenterImpl)entity).update();
+ }
+ };
+
+ @SuppressWarnings("unchecked")
+ protected void connectEnrichers() {
+ List<? extends List<? extends AttributeSensor<? extends Number>>> summingEnricherSetup = ImmutableList.of(
+ ImmutableList.of(CassandraNode.READ_ACTIVE, READ_ACTIVE),
+ ImmutableList.of(CassandraNode.READ_PENDING, READ_PENDING),
+ ImmutableList.of(CassandraNode.WRITE_ACTIVE, WRITE_ACTIVE),
+ ImmutableList.of(CassandraNode.WRITE_PENDING, WRITE_PENDING)
+ );
+
+ List<? extends List<? extends AttributeSensor<? extends Number>>> averagingEnricherSetup = ImmutableList.of(
+ ImmutableList.of(CassandraNode.READS_PER_SECOND_LAST, READS_PER_SECOND_LAST_PER_NODE),
+ ImmutableList.of(CassandraNode.WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_LAST_PER_NODE),
+ ImmutableList.of(CassandraNode.WRITES_PER_SECOND_IN_WINDOW, WRITES_PER_SECOND_IN_WINDOW_PER_NODE),
+ ImmutableList.of(CassandraNode.READS_PER_SECOND_IN_WINDOW, READS_PER_SECOND_IN_WINDOW_PER_NODE),
+ ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY, THRIFT_PORT_LATENCY_PER_NODE),
+ ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW, THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE),
+ ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_LAST, PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE),
+ ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_IN_WINDOW, PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE)
+ );
+
+ for (List<? extends AttributeSensor<? extends Number>> es : summingEnricherSetup) {
+ AttributeSensor<? extends Number> t = es.get(0);
+ AttributeSensor<? extends Number> total = es.get(1);
+ addEnricher(Enrichers.builder()
+ .aggregating(t)
+ .publishing(total)
+ .fromMembers()
+ .computingSum()
+ .defaultValueForUnreportedSensors(null)
+ .valueToReportIfNoSensors(null)
+ .build());
+ }
+
+ for (List<? extends AttributeSensor<? extends Number>> es : averagingEnricherSetup) {
+ AttributeSensor<Number> t = (AttributeSensor<Number>) es.get(0);
+ AttributeSensor<Double> average = (AttributeSensor<Double>) es.get(1);
+ addEnricher(Enrichers.builder()
+ .aggregating(t)
+ .publishing(average)
+ .fromMembers()
+ .computingAverage()
+ .defaultValueForUnreportedSensors(null)
+ .valueToReportIfNoSensors(null)
+ .build());
+
+ }
+ }
+
+ @Override
+ public void stop() {
+ disconnectSensors();
+
+ super.stop();
+ }
+
+ protected void disconnectSensors() {
+ }
+
+ @Override
+ public void update() {
+ synchronized (mutex) {
+ // Update our seeds, as necessary
+ seedTracker.refreshSeeds();
+
+ // Choose the first available cluster member to set host and port (and compute one-up)
+ Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
+
+ if (upNode.isPresent()) {
+ setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
+ setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
+
+ List<String> currentNodes = getAttribute(CASSANDRA_CLUSTER_NODES);
+ Set<String> oldNodes = (currentNodes != null) ? ImmutableSet.copyOf(currentNodes) : ImmutableSet.<String>of();
+ Set<String> newNodes = MutableSet.<String>of();
+ for (Entity member : getMembers()) {
+ if (member instanceof CassandraNode && Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) {
+ String hostname = member.getAttribute(Attributes.HOSTNAME);
+ Integer thriftPort = member.getAttribute(CassandraNode.THRIFT_PORT);
+ if (hostname != null && thriftPort != null) {
+ newNodes.add(HostAndPort.fromParts(hostname, thriftPort).toString());
+ }
+ }
+ }
+ if (Sets.symmetricDifference(oldNodes, newNodes).size() > 0) {
+ setAttribute(CASSANDRA_CLUSTER_NODES, MutableList.copyOf(newNodes));
+ }
+ } else {
+ setAttribute(HOSTNAME, null);
+ setAttribute(THRIFT_PORT, null);
+ setAttribute(CASSANDRA_CLUSTER_NODES, Collections.<String>emptyList());
+ }
+
+ ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyList(this, CASSANDRA_CLUSTER_NODES);
+ }
+ }
+
+ /**
+ * For tracking our seeds. This gets fiddly! High-level logic is:
+ * <ul>
+ * <li>If we have never reached quorum (i.e. have never published seeds), then continue to wait for quorum;
+ * because entity-startup may be blocking for this. This is handled by the seedSupplier.
+ * <li>If we previously reached quorum (i.e. have previousy published seeds), then always update;
+ * we never want stale/dead entities listed in our seeds.
+ * <li>If an existing seed looks unhealthy, then replace it.
+ * <li>If a new potential seed becomes available (and we're in need of more), then add it.
+ * <ul>
+ *
+ * Also note that {@link CassandraFabric} can take over, because it know about multiple sub-clusters!
+ * It will provide a different {@link CassandraDatacenter#SEED_SUPPLIER}. Each time we think that our seeds
+ * need to change, we call that. The fabric will call into {@link CassandraDatacenterImpl#gatherPotentialSeeds()}
+ * to find out what's available.
+ *
+ * @author aled
+ */
+ protected class SeedTracker {
+ private final Map<Entity, Boolean> memberUpness = Maps.newLinkedHashMap();
+
+ public void onMemberRemoved(Entity member) {
+ Set<Entity> seeds = getSeeds();
+ boolean maybeRemove = seeds.contains(member);
+ memberUpness.remove(member);
+
+ if (maybeRemove) {
+ refreshSeeds();
+ } else {
+ if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} removed)", new Object[] {CassandraDatacenterImpl.this, member});
+ return;
+ }
+ }
+ public void onHostnameChanged(Entity member, String hostname) {
+ Set<Entity> seeds = getSeeds();
+ int quorum = getSeedQuorumSize();
+ boolean isViable = isViableSeed(member);
+ boolean maybeAdd = isViable && seeds.size() < quorum;
+ boolean maybeRemove = seeds.contains(member) && !isViable;
+
+ if (maybeAdd || maybeRemove) {
+ refreshSeeds();
+ } else {
+ if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed hostname {})", new Object[] {CassandraDatacenterImpl.this, member, hostname});
+ return;
+ }
+ }
+ public void onServiceUpChanged(Entity member, Boolean serviceUp) {
+ Boolean oldVal = memberUpness.put(member, serviceUp);
+ if (Objects.equal(oldVal, serviceUp)) {
+ if (log.isTraceEnabled()) log.trace("Ignoring duplicate service-up in "+CassandraDatacenterImpl.this+" for "+member+", "+serviceUp);
+ }
+ Set<Entity> seeds = getSeeds();
+ int quorum = getSeedQuorumSize();
+ boolean isViable = isViableSeed(member);
+ boolean maybeAdd = isViable && seeds.size() < quorum;
+ boolean maybeRemove = seeds.contains(member) && !isViable;
+
+ if (log.isDebugEnabled())
+ log.debug("Considering refresh of seeds for "+CassandraDatacenterImpl.this+" because "+member+" is now "+serviceUp+" ("+isViable+" / "+maybeAdd+" / "+maybeRemove+")");
+ if (maybeAdd || maybeRemove) {
+ refreshSeeds();
+ } else {
+ if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed serviceUp {})", new Object[] {CassandraDatacenterImpl.this, member, serviceUp});
+ return;
+ }
+ }
+ protected Set<Entity> getSeeds() {
+ Set<Entity> result = getAttribute(CURRENT_SEEDS);
+ return (result == null) ? ImmutableSet.<Entity>of() : result;
+ }
+ public void refreshSeeds() {
+ Set<Entity> oldseeds = getAttribute(CURRENT_SEEDS);
+ Set<Entity> newseeds = getSeedSupplier().get();
+ if (Objects.equal(oldseeds, newseeds)) {
+ if (log.isTraceEnabled()) log.debug("Seed refresh no-op for cluster {}: still={}", new Object[] {CassandraDatacenterImpl.this, oldseeds});
+ } else {
+ if (log.isDebugEnabled()) log.debug("Refreshing seeds of cluster {}: now={}; old={}", new Object[] {this, newseeds, oldseeds});
+ setAttribute(CURRENT_SEEDS, newseeds);
+ if (newseeds != null && newseeds.size() > 0) {
+ setAttribute(HAS_PUBLISHED_SEEDS, true);
+ }
+ }
+ }
+ public Set<Entity> gatherPotentialSeeds() {
+ Set<Entity> result = Sets.newLinkedHashSet();
+ for (Entity member : getMembers()) {
+ if (isViableSeed(member)) {
+ result.add(member);
+ }
+ }
+ if (log.isTraceEnabled()) log.trace("Viable seeds in Cluster {}: {}", new Object[] {result});
+ return result;
+ }
+ public Set<Entity> gatherPotentialRunningSeeds() {
+ Set<Entity> result = Sets.newLinkedHashSet();
+ for (Entity member : getMembers()) {
+ if (isRunningSeed(member)) {
+ result.add(member);
+ }
+ }
+ if (log.isTraceEnabled()) log.trace("Viable running seeds in Cluster {}: {}", new Object[] {result});
+ return result;
+ }
+ public boolean isViableSeed(Entity member) {
+ // TODO would be good to reuse the better logic in ServiceFailureDetector
+ // (e.g. if that didn't just emit a notification but set a sensor as well?)
+ boolean managed = Entities.isManaged(member);
+ String hostname = member.getAttribute(Attributes.HOSTNAME);
+ boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
+ Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+ boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
+ boolean result = (hostname != null && !hasFailed);
+ if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, this, result, hostname, serviceUp, serviceState, hasFailed});
+ return result;
+ }
+ public boolean isRunningSeed(Entity member) {
+ boolean viableSeed = isViableSeed(member);
+ boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
+ Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+ boolean result = viableSeed && serviceUp && serviceState == Lifecycle.RUNNING;
+ if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: runningSeed={}; viableSeed={}; serviceUp={}; serviceState={}", new Object[] {member, this, result, viableSeed, serviceUp, serviceState});
+ return result;
+ }
+ }
+
+ @Override
+ public String executeScript(String commands) {
+ Entity someChild = Iterables.getFirst(getMembers(), null);
+ if (someChild==null)
+ throw new IllegalStateException("No Cassandra nodes available");
+ // FIXME cross-etntity method-style calls such as below do not set up a queueing context (DynamicSequentialTask)
+// return ((CassandraNode)someChild).executeScript(commands);
+ return Entities.invokeEffector(this, someChild, CassandraNode.EXECUTE_SCRIPT, MutableMap.of("commands", commands)).getUnchecked();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
new file mode 100644
index 0000000..23db92c
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.util.Set;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.group.DynamicFabric;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.location.Location;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations.
+ * <p>
+ * Each {@link CassandraDatacenter} child instance is actually just a part of the whole cluster. It consists of the
+ * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology).
+ */
+@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " +
+ "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
+ "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@ImplementedBy(CassandraFabricImpl.class)
+public interface CassandraFabric extends DynamicFabric {
+
+ ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey(
+ "fabric.initial.quorumSize",
+ "Initial fabric quorum size - number of initial nodes that must have been successfully started " +
+ "to report success (if less than 0, then use a value based on INITIAL_SIZE of clusters)",
+ -1);
+
+ @SuppressWarnings("serial")
+ ConfigKey<Function<Location, String>> DATA_CENTER_NAMER = ConfigKeys.newConfigKey(new TypeToken<Function<Location, String>>(){},
+ "cassandra.fabric.datacenter.namer",
+ "Function used to provide the cassandra.replication.datacenterName for a given location");
+
+ int DEFAULT_SEED_QUORUM = 5;
+
+ AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = CassandraDatacenter.DATACENTER_USAGE;
+
+ AttributeSensor<Set<String>> DATACENTERS = CassandraDatacenter.DATACENTERS;
+
+ AttributeSensor<Set<Entity>> CURRENT_SEEDS = CassandraDatacenter.CURRENT_SEEDS;
+
+ AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = CassandraDatacenter.HAS_PUBLISHED_SEEDS;
+
+ AttributeSensor<String> HOSTNAME = CassandraDatacenter.HOSTNAME;
+
+ AttributeSensor<Integer> THRIFT_PORT = CassandraDatacenter.THRIFT_PORT;
+
+ MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraFabric.class, "update");
+
+ @Effector(description="Updates the cluster members")
+ void update();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
new file mode 100644
index 0000000..bce7cac
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicFabricImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link CassandraDatacenter}.
+ * <p>
+ * Serveral subtleties to note:
+ * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
+ * (so we wait for thrift port to be contactable)
+ * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
+ * (each up to 1m; often very close to the 1m)
+ */
+public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric {
+
+ private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class);
+
+ // Mutex for synchronizing during re-size operations
+ private final Object mutex = new Object[0];
+
+ private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
+ @Override public Set<Entity> get() {
+ // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier
+ Set<Entity> seeds = getAttribute(CURRENT_SEEDS);
+ boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
+ int quorumSize = getSeedQuorumSize();
+
+ // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters
+ // as we do not take a new seed from the new datacenter
+ if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) {
+ Set<Entity> newseeds;
+ Map<CassandraDatacenter,Set<Entity>> potentialSeeds = MutableMap.of();
+ int potentialSeedCount = 0;
+ for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+ Set<Entity> dcPotentialSeeds = member.gatherPotentialSeeds();
+ potentialSeeds.put(member, dcPotentialSeeds);
+ potentialSeedCount += dcPotentialSeeds.size();
+ }
+
+ if (hasPublishedSeeds) {
+ Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
+ Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL);
+ if (serviceState == Lifecycle.STARTING) {
+ if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) {
+ log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds});
+ }
+ newseeds = currentSeeds;
+ } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) {
+ if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds});
+ newseeds = currentSeeds;
+ } else if (potentialSeedCount == 0) {
+ // TODO Could be race where nodes have only just returned from start() and are about to
+ // transition to serviceUp; so don't just abandon all our seeds!
+ log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
+ newseeds = currentSeeds;
+ } else if (!allNonEmpty(potentialSeeds.values())) {
+ log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
+ newseeds = currentSeeds;
+ } else {
+ Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
+ if (log.isDebugEnabled() && !Objects.equal(seeds, result)) {
+ log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds});
+ }
+ newseeds = result;
+ }
+ } else if (potentialSeedCount < quorumSize) {
+ if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()});
+ newseeds = ImmutableSet.of();
+ } else if (!allNonEmpty(potentialSeeds.values())) {
+ if (log.isDebugEnabled()) {
+ Map<CassandraDatacenter, Integer> datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction());
+ log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts});
+ }
+ newseeds = ImmutableSet.of();
+ } else {
+ // yay, we're quorate
+ Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
+ log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result});
+ newseeds = result;
+ }
+
+ if (!Objects.equal(seeds, newseeds)) {
+ setAttribute(CURRENT_SEEDS, newseeds);
+
+ if (newseeds != null && newseeds.size() > 0) {
+ setAttribute(HAS_PUBLISHED_SEEDS, true);
+
+ // Need to tell every datacenter that seeds are ready.
+ // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc),
+ // and not call seedSupplier.get() again.
+ for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+ member.update();
+ }
+ }
+ return newseeds;
+ } else {
+ return seeds;
+ }
+ } else {
+ if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}",
+ new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds});
+ return seeds;
+ }
+ }
+ private boolean allNonEmpty(Collection<? extends Collection<Entity>> contenders) {
+ for (Collection<Entity> contender: contenders)
+ if (contender.isEmpty()) return false;
+ return true;
+ }
+ private Set<Entity> selectSeeds(int num, Map<CassandraDatacenter,? extends Collection<Entity>> contenders) {
+ // Prefer existing seeds wherever possible;
+ // otherwise prefer a seed from each sub-cluster;
+ // otherwise accept any other contenders
+ Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
+ MutableSet<Entity> result = MutableSet.of();
+ result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values())));
+ for (CassandraDatacenter cluster : contenders.keySet()) {
+ Set<Entity> contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster));
+ if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) {
+ result.add(Iterables.getFirst(contendersInCluster, null));
+ }
+ }
+ result.addAll(Iterables.concat(contenders.values()));
+ return ImmutableSet.copyOf(Iterables.limit(result, num));
+ }
+ private boolean containsDownEntity(Set<Entity> seeds) {
+ for (Entity seed : seeds) {
+ if (!isViableSeed(seed)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ public boolean isViableSeed(Entity member) {
+ // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed
+ boolean managed = Entities.isManaged(member);
+ String hostname = member.getAttribute(Attributes.HOSTNAME);
+ boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
+ Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+ boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
+ boolean result = (hostname != null && !hasFailed);
+ if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed});
+ return result;
+ }
+ };
+
+ public CassandraFabricImpl() {
+ }
+
+ @Override
+ public void init() {
+ super.init();
+
+ if (!getConfigRaw(CassandraDatacenter.SEED_SUPPLIER, true).isPresentAndNonNull())
+ setConfig(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
+
+ // track members
+ addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+ .displayName("Cassandra Fabric Tracker")
+ .configure("group", this));
+
+ // Track first node's startup
+ subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener<Long>() {
+ @Override
+ public void onEvent(SensorEvent<Long> event) {
+ Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC);
+ Long newval = event.getValue();
+ if (oldval == null && newval != null) {
+ setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
+ for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+ ((EntityInternal)member).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
+ }
+ }
+ }
+ });
+
+ // Track the datacenters for this cluster
+ subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener<Multimap<String,Entity>>() {
+ @Override
+ public void onEvent(SensorEvent<Multimap<String,Entity>> event) {
+ Multimap<String, Entity> usage = calculateDatacenterUsage();
+ setAttribute(DATACENTER_USAGE, usage);
+ setAttribute(DATACENTERS, usage.keySet());
+ }
+ });
+ subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+ @Override public void onEvent(SensorEvent<Entity> event) {
+ Multimap<String, Entity> usage = calculateDatacenterUsage();
+ setAttribute(DATACENTER_USAGE, usage);
+ setAttribute(DATACENTERS, usage.keySet());
+ }
+ });
+ }
+
+ public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+ @Override
+ protected void onEntityChange(Entity member) {
+ if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity);
+ ((CassandraFabricImpl)entity).update();
+ }
+ @Override
+ protected void onEntityAdded(Entity member) {
+ if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity);
+ ((CassandraFabricImpl)entity).update();
+ }
+ @Override
+ protected void onEntityRemoved(Entity member) {
+ if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity);
+ ((CassandraFabricImpl)entity).update();
+ }
+ };
+
+ protected int getSeedQuorumSize() {
+ Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
+ if (quorumSize!=null && quorumSize>0)
+ return quorumSize;
+
+ int initialSizeSum = 0;
+ for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+ initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE);
+ }
+ if (initialSizeSum>5) initialSizeSum /= 2;
+ else if (initialSizeSum>3) initialSizeSum -= 2;
+ else if (initialSizeSum>2) initialSizeSum -= 1;
+
+ return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM);
+ }
+
+ /**
+ * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters.
+ */
+ @Override
+ protected EntitySpec<?> getMemberSpec() {
+ // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config
+ // (unless they've explicitly overridden the seedSupplier as well!)
+ // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default!
+ EntitySpec<?> custom = getConfig(MEMBER_SPEC);
+ if (custom == null) {
+ return EntitySpec.create(CassandraDatacenter.class)
+ .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
+ } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) {
+ return custom;
+ } else {
+ return EntitySpec.create(custom)
+ .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
+ }
+ }
+
+ @Override
+ protected Entity createCluster(Location location, Map flags) {
+ Function<Location, String> dataCenterNamer = getConfig(DATA_CENTER_NAMER);
+ if (dataCenterNamer != null) {
+ flags = ImmutableMap.builder()
+ .putAll(flags)
+ .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location))
+ .build();
+ }
+ return super.createCluster(location, flags);
+ }
+
+ /**
+ * Prefers one node per location, and then others from anywhere.
+ * Then trims result down to the "quorumSize".
+ */
+ public Supplier<Set<Entity>> getSeedSupplier() {
+ return defaultSeedSupplier;
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locations) {
+ super.start(locations);
+
+ connectSensors();
+
+ // TODO wait until all nodes which we think are up are consistent
+ // i.e. all known nodes use the same schema, as reported by
+ // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
+ // once we've done that we can revert to using 2 seed nodes.
+ // see CassandraCluster.DEFAULT_SEED_QUORUM
+ Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER));
+
+ update();
+ }
+
+ protected void connectSensors() {
+ connectEnrichers();
+ }
+
+ protected void connectEnrichers() {
+ // TODO Aggregate across sub-clusters
+
+ subscribeToMembers(this, SERVICE_UP, new SensorEventListener<Boolean>() {
+ @Override public void onEvent(SensorEvent<Boolean> event) {
+ setAttribute(SERVICE_UP, calculateServiceUp());
+ }
+ });
+ }
+
+ @Override
+ public void stop() {
+ disconnectSensors();
+
+ super.stop();
+ }
+
+ protected void disconnectSensors() {
+ }
+
+ protected boolean calculateServiceUp() {
+ Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
+ return upNode.isPresent();
+ }
+
+ protected Multimap<String, Entity> calculateDatacenterUsage() {
+ Multimap<String, Entity> result = LinkedHashMultimap.<String, Entity>create();
+ for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+ Multimap<String, Entity> memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE);
+ if (memberUsage != null) result.putAll(memberUsage);
+ }
+ return result;
+ }
+
+ @Override
+ public void update() {
+ synchronized (mutex) {
+ for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+ member.update();
+ }
+
+ calculateServiceUp();
+
+ // Choose the first available location to set host and port (and compute one-up)
+ Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
+
+ if (upNode.isPresent()) {
+ setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
+ setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
new file mode 100644
index 0000000..7d0a56d
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.math.BigInteger;
+import java.util.Set;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.database.DatastoreMixins;
+import brooklyn.entity.java.UsesJavaMXBeans;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.location.basic.PortRanges;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.reflect.TypeToken;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a Cassandra node in a {@link CassandraDatacenter}.
+ */
+@Catalog(name="Apache Cassandra Node", description="Cassandra is a highly scalable, eventually " +
+ "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
+ "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@ImplementedBy(CassandraNodeImpl.class)
+public interface CassandraNode extends DatastoreMixins.DatastoreCommon, SoftwareProcess, UsesJmx, UsesJavaMXBeans, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript {
+
+ @SetFromFlag("version")
+ ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.16");
+ // when this changes remember to put a copy under releng2:/var/www/developer/brooklyn/repository/ !
+ // TODO experiment with supporting 2.0.x
+
+ @SetFromFlag("downloadUrl")
+ BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+ SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-cassandra-${version}-bin.tar.gz");
+
+ /** download mirror, if desired */
+ @SetFromFlag("mirrorUrl")
+ ConfigKey<String> MIRROR_URL = new BasicConfigKey<String>(String.class, "cassandra.install.mirror.url", "URL of mirror",
+ "http://www.mirrorservice.org/sites/ftp.apache.org/cassandra"
+ // for older versions, but slower:
+// "http://archive.apache.org/dist/cassandra/"
+ );
+
+ @SetFromFlag("tgzUrl")
+ ConfigKey<String> TGZ_URL = new BasicConfigKey<String>(String.class, "cassandra.install.tgzUrl", "URL of TGZ download file");
+
+ @SetFromFlag("clusterName")
+ BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = CassandraDatacenter.CLUSTER_NAME;
+
+ @SetFromFlag("snitchName")
+ ConfigKey<String> ENDPOINT_SNITCH_NAME = CassandraDatacenter.ENDPOINT_SNITCH_NAME;
+
+ @SetFromFlag("gossipPort")
+ PortAttributeSensorAndConfigKey GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.gossip.port", "Cassandra Gossip communications port", PortRanges.fromString("7000+"));
+
+ @SetFromFlag("sslGgossipPort")
+ PortAttributeSensorAndConfigKey SSL_GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.ssl-gossip.port", "Cassandra Gossip SSL communications port", PortRanges.fromString("7001+"));
+
+ @SetFromFlag("thriftPort")
+ PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("cassandra.thrift.port", "Cassandra Thrift RPC port", PortRanges.fromString("9160+"));
+
+ @SetFromFlag("nativePort")
+ PortAttributeSensorAndConfigKey NATIVE_TRANSPORT_PORT = new PortAttributeSensorAndConfigKey("cassandra.native.port", "Cassandra Native Transport port", PortRanges.fromString("9042+"));
+
+ @SetFromFlag("rmiRegistryPort")
+ // cassandra nodetool and others want 7199 - not required, but useful
+ PortAttributeSensorAndConfigKey RMI_REGISTRY_PORT = new PortAttributeSensorAndConfigKey(UsesJmx.RMI_REGISTRY_PORT,
+ PortRanges.fromInteger(7199));
+
+ // some of the cassandra tooing (eg nodetool) use RMI, but we want JMXMP, so do both!
+ ConfigKey<JmxAgentModes> JMX_AGENT_MODE = ConfigKeys.newConfigKeyWithDefault(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.JMXMP_AND_RMI);
+
+ @SetFromFlag("customSnitchJarUrl")
+ ConfigKey<String> CUSTOM_SNITCH_JAR_URL = ConfigKeys.newStringConfigKey("cassandra.config.customSnitchUrl",
+ "URL for a jar file to be uploaded (e.g. \"classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-multicloud-snitch.jar\"); defaults to null which means nothing to upload",
+ null);
+
+ @SetFromFlag("cassandraConfigTemplateUrl")
+ ConfigKey<String> CASSANDRA_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+ "cassandra.config.templateUrl", "A URL (in freemarker format) for a cassandra.yaml config file (in freemarker format)",
+ "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-${entity.majorMinorVersion}.yaml");
+
+ @SetFromFlag("cassandraConfigFileName")
+ ConfigKey<String> CASSANDRA_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
+ "cassandra.config.fileName", "Name for the copied config file", "cassandra.yaml");
+
+ @SetFromFlag("cassandraRackdcConfigTemplateUrl")
+ ConfigKey<String> CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+ "cassandra.config.rackdc.templateUrl", "Template file (in freemarker format) for the cassandra-rackdc.properties config file",
+ "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-rackdc.properties");
+
+ @SetFromFlag("cassandraRackdcConfigFileName")
+ ConfigKey<String> CASSANDRA_RACKDC_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
+ "cassandra.config.rackdc.fileName", "Name for the copied rackdc config file (used for configuring replication, when a suitable snitch is used)", "cassandra-rackdc.properties");
+
+ @SetFromFlag("datacenterName")
+ BasicAttributeSensorAndConfigKey<String> DATACENTER_NAME = new BasicAttributeSensorAndConfigKey<String>(
+ String.class, "cassandra.replication.datacenterName", "Datacenter name (used for configuring replication, when a suitable snitch is used)",
+ null);
+
+ @SetFromFlag("rackName")
+ BasicAttributeSensorAndConfigKey<String> RACK_NAME = new BasicAttributeSensorAndConfigKey<String>(
+ String.class, "cassandra.replication.rackName", "Rack name (used for configuring replication, when a suitable snitch is used)",
+ null);
+
+ ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode",
+ "Number of tokens per node; if using vnodes, should set this to a value like 256",
+ 1);
+
+ /**
+ * @deprecated since 0.7; use {@link #TOKENS}
+ */
+ @SetFromFlag("token")
+ @Deprecated
+ BasicAttributeSensorAndConfigKey<BigInteger> TOKEN = new BasicAttributeSensorAndConfigKey<BigInteger>(
+ BigInteger.class, "cassandra.token", "Cassandra Token");
+
+ @SetFromFlag("tokens")
+ BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>(
+ new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens");
+
+ AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster");
+
+ AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster");
+
+ /* Metrics for read/write performance. */
+
+ AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.read.pending", "Current pending ReadStage tasks");
+ AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.read.active", "Current active ReadStage tasks");
+ AttributeSensor<Long> READ_COMPLETED = Sensors.newLongSensor("cassandra.read.completed", "Total completed ReadStage tasks");
+ AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.write.pending", "Current pending MutationStage tasks");
+ AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.write.active", "Current active MutationStage tasks");
+ AttributeSensor<Long> WRITE_COMPLETED = Sensors.newLongSensor("cassandra.write.completed", "Total completed MutationStage tasks");
+
+ AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("cassandra.service.jmx.up", "Whether JMX is up for this service");
+ AttributeSensor<Long> THRIFT_PORT_LATENCY = Sensors.newLongSensor("cassandra.thrift.latency", "Latency for thrift port connection (ms) or null if down");
+
+ AttributeSensor<Double> READS_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.reads.perSec.last", "Reads/sec (last datapoint)");
+ AttributeSensor<Double> WRITES_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.write.perSec.last", "Writes/sec (last datapoint)");
+
+ AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed", "Latency for thrift port (ms, averaged over time window)");
+ AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed", "Reads/sec (over time window)");
+ AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed", "Writes/sec (over time window)");
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ ConfigKey<Set<Entity>> INITIAL_SEEDS = (ConfigKey)ConfigKeys.newConfigKey(Set.class, "cassandra.cluster.seeds.initial",
+ "List of cluster nodes to seed this node");
+
+ ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES);
+
+ ConfigKey<String> LISTEN_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.listenAddressSensor", "sensor name from which to take the listen address; default (null) is a smart lookup");
+ ConfigKey<String> BROADCAST_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.broadcastAddressSensor", "sensor name from which to take the broadcast address; default (null) is a smart lookup");
+ ConfigKey<String> RPC_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.rpcAddressSensor", "sensor name from which to take the RPC address; default (null) is 0.0.0.0");
+
+ Effector<String> EXECUTE_SCRIPT = CassandraDatacenter.EXECUTE_SCRIPT;
+
+ /* Accessors used from template */
+
+ String getMajorMinorVersion();
+ Integer getGossipPort();
+ Integer getSslGossipPort();
+ Integer getThriftPort();
+ Integer getNativeTransportPort();
+ String getClusterName();
+ String getListenAddress();
+ String getBroadcastAddress();
+ String getRpcAddress();
+ String getSeeds();
+
+ String getPrivateIp();
+ String getPublicIp();
+
+ /**
+ * In range 0 to (2^127)-1; or null if not yet set or known.
+ * Returns the first token if more than one token.
+ * @deprecated since 0.7; see {@link #getTokens()}
+ */
+ @Deprecated
+ BigInteger getToken();
+
+ int getNumTokensPerNode();
+
+ Set<BigInteger> getTokens();
+
+ /**
+ * string value of token (with no commas, which freemarker introduces!) or blank if none
+ * @deprecated since 0.7; use {@link #getTokensAsString()}
+ */
+ @Deprecated
+ String getTokenAsString();
+
+ /** string value of comma-separated tokens; or blank if none */
+ String getTokensAsString();
+
+ /* For configuration */
+
+ void setToken(String token);
+
+ /* Using Cassandra */
+
+ String executeScript(String commands);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
new file mode 100644
index 0000000..eab6672
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+import brooklyn.util.task.system.ProcessTaskWrapper;
+
+public interface CassandraNodeDriver extends JavaSoftwareProcessDriver {
+
+ Integer getGossipPort();
+
+ Integer getSslGossipPort();
+
+ Integer getThriftPort();
+
+ Integer getNativeTransportPort();
+
+ String getClusterName();
+
+ String getCassandraConfigTemplateUrl();
+
+ String getCassandraConfigFileName();
+
+ boolean isClustered();
+
+ ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
+
+ /** returns the address that the given hostname resolves to at the target */
+ String getResolvedAddress(String hostname);
+
+}