You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/08/06 18:32:31 UTC

[17/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the NoSQL packages

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java
new file mode 100644
index 0000000..7036285
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraClusterImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+/**
+ * @deprecated since 0.7.0; use {@link CassandraDatacenter} which is equivalent but has
+ * a less ambiguous name; <em>Cluster</em> in Cassandra corresponds to what Brooklyn terms a <em>Fabric</em>.
+ */
+@Deprecated
+public class CassandraClusterImpl extends CassandraDatacenterImpl implements CassandraCluster {
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java
new file mode 100644
index 0000000..7ef646f
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenter.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.brooklyn.catalog.Catalog;
+import org.apache.brooklyn.entity.nosql.cassandra.TokenGenerators.PosNeg63TokenGenerator;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.database.DatastoreMixins;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A group of {@link CassandraNode}s -- based on Brooklyn's {@link DynamicCluster} 
+ * (though it is a "Datacenter" in Cassandra terms, where Cassandra's "cluster" corresponds
+ * to a Brooklyn Fabric, cf {@link CassandraFabric}). 
+ * The Datacenter can be resized, manually or by policy if required.
+ * Tokens are selected intelligently.
+ * <p>
+ * Note that due to how Cassandra assumes ports are the same across a cluster,
+ * it is <em>NOT</em> possible to deploy a cluster of size larger than 1 to localhost.
+ * (Some exploratory work has been done to use different 127.0.0.x IP's for localhost,
+ * and there is evidence this could be made to work.)
+ */
+@Catalog(name="Apache Cassandra Datacenter Cluster", description="Cassandra is a highly scalable, eventually " +
+        "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
+        "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@ImplementedBy(CassandraDatacenterImpl.class)
+public interface CassandraDatacenter extends DynamicCluster, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript {
+
+    // FIXME datacenter name -- also CASS_CLUSTER_NODES should be CASS_DC_NODES
+    @SetFromFlag("clusterName")
+    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, "cassandra.cluster.name", "Name of the Cassandra cluster", "BrooklynCluster");
+
+    @SetFromFlag("snitchName")
+    ConfigKey<String> ENDPOINT_SNITCH_NAME = ConfigKeys.newStringConfigKey("cassandra.cluster.snitchName", "Type of the Cassandra snitch", "SimpleSnitch");
+
+    @SetFromFlag("seedSupplier")
+    @SuppressWarnings("serial")
+    ConfigKey<Supplier<Set<Entity>>> SEED_SUPPLIER = ConfigKeys.newConfigKey(new TypeToken<Supplier<Set<Entity>>>() { }, "cassandra.cluster.seedSupplier", "For determining the seed nodes", null);
+
+    @SuppressWarnings("serial")
+    @SetFromFlag("tokenGeneratorClass")
+    ConfigKey<Class<? extends TokenGenerator>> TOKEN_GENERATOR_CLASS = ConfigKeys.newConfigKey(
+        new TypeToken<Class<? extends TokenGenerator>>() {}, "cassandra.cluster.tokenGenerator.class", "For determining the tokens of nodes", 
+        PosNeg63TokenGenerator.class);
+
+    @SetFromFlag("tokenShift")
+    ConfigKey<BigInteger> TOKEN_SHIFT = ConfigKeys.newConfigKey(BigInteger.class, "cassandra.cluster.tokenShift", 
+        "Delta applied to all tokens generated for this Cassandra datacenter, "
+        + "useful when configuring multiple datacenters which should be shifted; "
+        + "if not set, a random shift is applied. (Pass 0 to prevent any shift.)", null);
+
+    ConfigKey<Boolean> USE_VNODES = ConfigKeys.newBooleanConfigKey(
+            "cassandra.cluster.useVnodes",
+            "Determines whether to use vnodes; if doing so, tokens will not be explicitly assigned to nodes in the cluster",
+            false);
+
+    /**
+     * num_tokens will automatically be reset to 1 for each node if {@link #USE_VNODES} is false. 
+     */
+    ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode",
+            "Number of tokens per node; if using vnodes, should set this to a value like 256; will be overridden to 1 if USE_VNODES==false",
+            256);
+    
+    /**
+     * Additional time after the nodes in the cluster are up when starting
+     * before announcing the cluster as up.
+     * <p>
+     * Useful to ensure nodes have synchronized.
+     * <p>
+     * On 1.2.2 this could be as much as 120s when using 2 seed nodes,
+     * or just a few seconds with 1 seed node. On 1.2.9 it seems a few
+     * seconds is sufficient even with 2 seed nodes
+     */
+    @SetFromFlag("delayBeforeAdvertisingCluster")
+    ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "cassandra.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.TEN_SECONDS);
+
+    @SuppressWarnings("serial")
+    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = Sensors.newSensor(new TypeToken<Multimap<String,Entity>>() { }, "cassandra.cluster.datacenterUsages", "Current set of datacenters in use, with nodes in each");
+
+    @SuppressWarnings("serial")
+    AttributeSensor<Set<String>> DATACENTERS = Sensors.newSensor(new TypeToken<Set<String>>() { }, "cassandra.cluster.datacenters", "Current set of datacenters in use");
+
+    AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = Sensors.newBooleanSensor("cassandra.cluster.seeds.hasPublished", "Whether we have published any seeds");
+
+    @SuppressWarnings("serial")
+    AttributeSensor<Set<Entity>> CURRENT_SEEDS = Sensors.newSensor(new TypeToken<Set<Entity>>() { }, "cassandra.cluster.seeds.current", "Current set of seeds to use to bootstrap the cluster");
+
+    AttributeSensor<String> HOSTNAME = Sensors.newStringSensor("cassandra.cluster.hostname", "Hostname to connect to cluster with");
+
+    @SuppressWarnings("serial")
+    AttributeSensor<List<String>> CASSANDRA_CLUSTER_NODES = Sensors.newSensor(new TypeToken<List<String>>() {},
+        "cassandra.cluster.nodes", "List of host:port of all active nodes in the cluster (thrift port, and public hostname/IP)");
+
+    AttributeSensor<Integer> THRIFT_PORT = Sensors.newIntegerSensor("cassandra.cluster.thrift.port", "Cassandra Thrift RPC port to connect to cluster with");
+
+    AttributeSensor<Long> FIRST_NODE_STARTED_TIME_UTC = Sensors.newLongSensor("cassandra.cluster.first.node.started.utc", "Time (UTC) when the first node was started");
+    @SuppressWarnings("serial")
+    AttributeSensor<List<Entity>> QUEUED_START_NODES = Sensors.newSensor(new TypeToken<List<Entity>>() {}, "cassandra.cluster.start.nodes.queued",
+        "Nodes queued for starting (for sequential start)");
+    
+    AttributeSensor<Integer> SCHEMA_VERSION_COUNT = Sensors.newIntegerSensor("cassandra.cluster.schema.versions.count",
+            "Number of different schema versions in the cluster; should be 1 for a healthy cluster, 0 when off; " +
+            "2 and above indicats a Schema Disagreement Error (and keyspace access may fail)");
+
+    AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.cluster.read.pending", "Current pending ReadStage tasks");
+    AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.read.active", "Current active ReadStage tasks");
+    AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.cluster.write.pending", "Current pending MutationStage tasks");
+    AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.cluster.write.active", "Current active MutationStage tasks");
+
+    AttributeSensor<Long> THRIFT_PORT_LATENCY_PER_NODE = Sensors.newLongSensor("cassandra.cluster.thrift.latency.perNode", "Latency for thrift port connection  averaged over all nodes (ms)");
+    AttributeSensor<Double> READS_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.last.perNode", "Reads/sec (last datapoint) averaged over all nodes");
+    AttributeSensor<Double> WRITES_PER_SECOND_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.write.perSec.last.perNode", "Writes/sec (last datapoint) averaged over all nodes");
+    AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.perNode", "Fraction of CPU time used (percentage reported by JMX), averaged over all nodes");
+
+    AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed.perNode", "Reads/sec (over time window) averaged over all nodes");
+    AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed.perNode", "Writes/sec (over time window) averaged over all nodes");
+    AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed.perNode", "Latency for thrift port (ms, over time window) averaged over all nodes");
+    AttributeSensor<Double> PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE = Sensors.newDoubleSensor("cassandra.cluster.metrics.processCpuTime.fraction.windowed", "Fraction of CPU time used (percentage, over time window), averaged over all nodes");
+
+    MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraDatacenter.class, "update");
+
+    brooklyn.entity.Effector<String> EXECUTE_SCRIPT = Effectors.effector(DatastoreMixins.EXECUTE_SCRIPT)
+        .description("executes the given script contents using cassandra-cli")
+        .buildAbstract();
+
+    /**
+     * Sets the number of nodes used to seed the cluster.
+     * <p>
+     * Version 1.2.2 is buggy and requires a big delay for 2 nodes both seeds to reconcile,
+     * with 1.2.9 this seems fine, with just a few seconds' delay after starting.
+     *
+     * @see <a href="http://stackoverflow.com/questions/6770894/schemadisagreementexception/18639005" />
+     */
+    int DEFAULT_SEED_QUORUM = 2;
+
+    /**
+     * Can insert a delay after the first node comes up.
+     * <p>
+     * Reportedly not needed with 1.2.9, but we are still seeing some seed failures so re-introducing it.
+     * (This does not seem to help with the bug in 1.2.2.)
+     */
+    Duration DELAY_AFTER_FIRST = Duration.ONE_MINUTE;
+
+    /**
+     * If set (ie non-null), this waits the indicated time after a successful launch of one node
+     * before starting the next.  (If it is null, all nodes start simultaneously,
+     * possibly after the DELAY_AFTER_FIRST.)
+     * <p>
+     * When subsequent nodes start simultaneously, we occasionally see schema disagreement problems;
+     * if nodes start sequentially, we occasionally get "no sources for (tokenRange]" problems.
+     * Either way the node stops. Ideally this can be solved at the Cassandra level,
+     * but if not, we will have to introduce some restarts at the Cassandra nodes (which does seem
+     * to resolve the problems.)
+     */
+    Duration DELAY_BETWEEN_STARTS = null;
+    
+    /**
+     * Whether to wait for the first node to start up
+     * <p>
+     * not sure whether this is needed or not. Need to test in env where not all nodes are seed nodes,
+     * what happens if non-seed nodes start before the seed nodes?
+     */
+    boolean WAIT_FOR_FIRST = true;
+
+    @Effector(description="Updates the cluster members")
+    void update();
+
+    /**
+     * The name of the cluster.
+     */
+    String getClusterName();
+
+    Set<Entity> gatherPotentialSeeds();
+
+    Set<Entity> gatherPotentialRunningSeeds();
+
+    String executeScript(String commands);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
new file mode 100644
index 0000000..baa9a17
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.location.basic.Machines;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.ResourceUtils;
+import brooklyn.util.collections.MutableList;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.common.net.HostAndPort;
+
+/**
+ * Implementation of {@link CassandraDatacenter}.
+ * <p>
+ * Several subtleties to note:
+ * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
+ *   (so we wait for thrift port to be contactable)
+ * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
+ *   (each up to 1m; often very close to the 1m) 
+ */
+public class CassandraDatacenterImpl extends DynamicClusterImpl implements CassandraDatacenter {
+
+    /*
+     * TODO Seed management is hard!
+     *  - The ServiceRestarter is not doing customize(), so is not refreshing the seeds in cassandra.yaml.
+     *    If we have two nodes that were seeds for each other and they both restart at the same time, we'll have a split brain.
+     */
+    
+    private static final Logger log = LoggerFactory.getLogger(CassandraDatacenterImpl.class);
+
+    // Mutex for synchronizing during re-size operations
+    private final Object mutex = new Object[0];
+
+    private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
+        // Mutex for (re)calculating our seeds
+        // TODO is this very dangerous?! Calling out to SeedTracker, which calls out to alien getAttribute()/getConfig(). But I think that's ok.
+        // TODO might not need mutex? previous race was being caused by something else, other than concurrent calls!
+        private final Object seedMutex = new Object();
+        
+        @Override
+        public Set<Entity> get() {
+            synchronized (seedMutex) {
+                boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
+                int quorumSize = getSeedQuorumSize();
+                Set<Entity> potentialSeeds = gatherPotentialSeeds();
+                Set<Entity> potentialRunningSeeds = gatherPotentialRunningSeeds();
+                boolean stillWaitingForQuorum = (!hasPublishedSeeds) && (potentialSeeds.size() < quorumSize);
+                
+                if (stillWaitingForQuorum) {
+                    if (log.isDebugEnabled()) log.debug("Not refreshed seeds of cluster {}, because still waiting for quorum (need {}; have {} potentials)", new Object[] {CassandraDatacenterImpl.class, quorumSize, potentialSeeds.size()});
+                    return ImmutableSet.of();
+                } else if (hasPublishedSeeds) {
+                    Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
+                    if (getAttribute(SERVICE_STATE_ACTUAL) == Lifecycle.STARTING) {
+                        if (Sets.intersection(currentSeeds, potentialSeeds).isEmpty()) {
+                            log.warn("Cluster {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraDatacenterImpl.this, currentSeeds});
+                        }
+                        return currentSeeds;
+                    } else if (potentialRunningSeeds.isEmpty()) {
+                        // TODO Could be race where nodes have only just returned from start() and are about to 
+                        // transition to serviceUp; so don't just abandon all our seeds!
+                        log.warn("Cluster {} has no running seeds (yet?); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraDatacenterImpl.this});
+                        return currentSeeds;
+                    } else {
+                        Set<Entity> result = trim(quorumSize, potentialRunningSeeds);
+                        log.debug("Cluster {} updating seeds: chosen={}; potentialRunning={}", new Object[] {CassandraDatacenterImpl.this, result, potentialRunningSeeds});
+                        return result;
+                    }
+                } else {
+                    Set<Entity> result = trim(quorumSize, potentialSeeds);
+                    if (log.isDebugEnabled()) log.debug("Cluster {} has reached seed quorum: seeds={}", new Object[] {CassandraDatacenterImpl.this, result});
+                    return result;
+                }
+            }
+        }
+        private Set<Entity> trim(int num, Set<Entity> contenders) {
+            // Prefer existing seeds wherever possible; otherwise accept any other contenders
+            Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
+            Set<Entity> result = Sets.newLinkedHashSet();
+            result.addAll(Sets.intersection(currentSeeds, contenders));
+            result.addAll(contenders);
+            return ImmutableSet.copyOf(Iterables.limit(result, num));
+        }
+    };
+    
+    protected SeedTracker seedTracker = new SeedTracker();
+    protected TokenGenerator tokenGenerator = null;
+
+    public CassandraDatacenterImpl() {
+    }
+
+    @Override
+    public void init() {
+        super.init();
+
+        /*
+         * subscribe to hostname, and keep an accurate set of current seeds in a sensor;
+         * then at nodes we set the initial seeds to be the current seeds when ready (non-empty)
+         */
+        subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener<String>() {
+            @Override
+            public void onEvent(SensorEvent<String> event) {
+                seedTracker.onHostnameChanged(event.getSource(), event.getValue());
+            }
+        });
+        subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+            @Override public void onEvent(SensorEvent<Entity> event) {
+                seedTracker.onMemberRemoved(event.getValue());
+            }
+        });
+        subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
+            @Override
+            public void onEvent(SensorEvent<Boolean> event) {
+                seedTracker.onServiceUpChanged(event.getSource(), event.getValue());
+            }
+        });
+        subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener<Lifecycle>() {
+            @Override
+            public void onEvent(SensorEvent<Lifecycle> event) {
+                // trigger a recomputation also when lifecycle state changes, 
+                // because it might not have ruled a seed as inviable when service up went true 
+                // because service state was not yet running
+                seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue());
+            }
+        });
+        
+        // Track the datacenters for this cluster
+        subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener<String>() {
+            @Override
+            public void onEvent(SensorEvent<String> event) {
+                Entity member = event.getSource();
+                String dcName = event.getValue();
+                if (dcName != null) {
+                    Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
+                    Multimap<String, Entity> mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.<String, Entity>create() : LinkedHashMultimap.create(datacenterUsage);
+                    Optional<String> oldDcName = getKeyOfVal(mutableDatacenterUsage, member);
+                    if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) {
+                        mutableDatacenterUsage.values().remove(member);
+                        mutableDatacenterUsage.put(dcName, member);
+                        setAttribute(DATACENTER_USAGE, mutableDatacenterUsage);
+                        setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
+                    }
+                }
+            }
+            private <K,V> Optional<K> getKeyOfVal(Multimap<K,V> map, V val) {
+                for (Map.Entry<K,V> entry : map.entries()) {
+                    if (Objects.equal(val, entry.getValue())) {
+                        return Optional.of(entry.getKey());
+                    }
+                }
+                return Optional.absent();
+            }
+        });
+        subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+            @Override public void onEvent(SensorEvent<Entity> event) {
+                Entity entity = event.getSource();
+                Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
+                if (datacenterUsage != null && datacenterUsage.containsValue(entity)) {
+                    Multimap<String, Entity> mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage);
+                    mutableDatacenterUsage.values().remove(entity);
+                    setAttribute(DATACENTER_USAGE, mutableDatacenterUsage);
+                    setAttribute(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
+                }
+            }
+        });
+        
+        getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() {
+            @Override
+            public String call(ConfigBag parameters) {
+                return executeScript((String)parameters.getStringKey("commands"));
+            }
+        });
+    }
+    
+    protected Supplier<Set<Entity>> getSeedSupplier() {
+        Supplier<Set<Entity>> seedSupplier = getConfig(SEED_SUPPLIER);
+        return (seedSupplier == null) ? defaultSeedSupplier : seedSupplier;
+    }
+    
+    protected boolean useVnodes() {
+        return Boolean.TRUE.equals(getConfig(USE_VNODES));
+    }
+    
+    protected synchronized TokenGenerator getTokenGenerator() {
+        if (tokenGenerator!=null) 
+            return tokenGenerator;
+        
+        try {
+            tokenGenerator = getConfig(TOKEN_GENERATOR_CLASS).newInstance();
+            
+            BigInteger shift = getConfig(TOKEN_SHIFT);
+            if (shift==null) 
+                shift = BigDecimal.valueOf(Math.random()).multiply(
+                    new BigDecimal(tokenGenerator.range())).toBigInteger();
+            tokenGenerator.setOrigin(shift);
+            
+            return tokenGenerator;
+        } catch (Exception e) {
+            throw Throwables.propagate(e);
+        }        
+    }
+    
+    protected int getSeedQuorumSize() {
+        Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
+        if (quorumSize!=null && quorumSize>0)
+            return quorumSize;
+        // default 2 is recommended, unless initial size is smaller
+        return Math.min(Math.max(getConfig(INITIAL_SIZE), 1), DEFAULT_SEED_QUORUM);
+    }
+
+    @Override
+    public Set<Entity> gatherPotentialSeeds() {
+        return seedTracker.gatherPotentialSeeds();
+    }
+
+    @Override
+    public Set<Entity> gatherPotentialRunningSeeds() {
+        return seedTracker.gatherPotentialRunningSeeds();
+    }
+
+    /**
+     * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra nodes.
+     */
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        return getConfig(MEMBER_SPEC, EntitySpec.create(CassandraNode.class));
+    }
+
+    @Override
+    public String getClusterName() {
+        return getAttribute(CLUSTER_NAME);
+    }
+
+    @Override
+    public Collection<Entity> grow(int delta) {
+        if (useVnodes()) {
+            // nothing to do for token generator
+        } else {
+            if (getCurrentSize() == 0) {
+                getTokenGenerator().growingCluster(delta);
+            }
+        }
+        return super.grow(delta);
+    }
+    
+    @SuppressWarnings("deprecation")
+    @Override
+    protected Entity createNode(@Nullable Location loc, Map<?,?> flags) {
+        Map<Object, Object> allflags = MutableMap.copyOf(flags);
+        
+        if ((flags.containsKey(CassandraNode.TOKEN) || flags.containsKey("token")) || (flags.containsKey(CassandraNode.TOKENS) || flags.containsKey("tokens"))) {
+            // leave token config as-is
+        } else if (!useVnodes()) {
+            BigInteger token = getTokenGenerator().newToken();
+            allflags.put(CassandraNode.TOKEN, token);
+        }
+
+        if ((flags.containsKey(CassandraNode.NUM_TOKENS_PER_NODE) || flags.containsKey("numTokensPerNode"))) {
+            // leave num_tokens as-is
+        } else if (useVnodes()) {
+            Integer numTokensPerNode = getConfig(NUM_TOKENS_PER_NODE);
+            allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, numTokensPerNode);
+        } else {
+            allflags.put(CassandraNode.NUM_TOKENS_PER_NODE, 1);
+        }
+        
+        return super.createNode(loc, allflags);
+    }
+
+    @Override
+    protected Entity replaceMember(Entity member, Location memberLoc, Map<?, ?> extraFlags) {
+        Set<BigInteger> oldTokens = ((CassandraNode) member).getTokens();
+        Set<BigInteger> newTokens = (oldTokens != null && oldTokens.size() > 0) ? getTokenGenerator().getTokensForReplacementNode(oldTokens) : null;
+        return super.replaceMember(member, memberLoc,  MutableMap.copyOf(extraFlags).add(CassandraNode.TOKENS, newTokens));
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        Machines.warnIfLocalhost(locations, "CassandraCluster does not support multiple nodes on localhost, " +
+                "due to assumptions Cassandra makes about the use of the same port numbers used across the cluster.");
+
+        // force this to be set - even if it is using the default
+        setAttribute(CLUSTER_NAME, getConfig(CLUSTER_NAME));
+        
+        super.start(locations);
+
+        connectSensors();
+
+        // TODO wait until all nodes which we think are up are consistent 
+        // i.e. all known nodes use the same schema, as reported by
+        // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
+        // once we've done that we can revert to using 2 seed nodes.
+        // see CassandraCluster.DEFAULT_SEED_QUORUM
+        // (also ensure the cluster is ready if we are about to run a creation script)
+        Time.sleep(getConfig(DELAY_BEFORE_ADVERTISING_CLUSTER));
+
+        String scriptUrl = getConfig(CassandraNode.CREATION_SCRIPT_URL);
+        if (Strings.isNonEmpty(scriptUrl)) {
+            executeScript(new ResourceUtils(this).getResourceAsString(scriptUrl));
+        }
+
+        update();
+    }
+
+    protected void connectSensors() {
+        connectEnrichers();
+        
+        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Cassandra Cluster Tracker")
+                .configure("sensorsToTrack", ImmutableSet.of(Attributes.SERVICE_UP, Attributes.HOSTNAME, CassandraNode.THRIFT_PORT))
+                .configure("group", this));
+    }
+
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override
+        protected void onEntityChange(Entity member) {
+            if (log.isDebugEnabled()) log.debug("Node {} updated in Cluster {}", member, this);
+            ((CassandraDatacenterImpl)entity).update();
+        }
+        @Override
+        protected void onEntityAdded(Entity member) {
+            if (log.isDebugEnabled()) log.debug("Node {} added to Cluster {}", member, this);
+            ((CassandraDatacenterImpl)entity).update();
+        }
+        @Override
+        protected void onEntityRemoved(Entity member) {
+            if (log.isDebugEnabled()) log.debug("Node {} removed from Cluster {}", member, this);
+            ((CassandraDatacenterImpl)entity).update();
+        }
+    };
+
+    @SuppressWarnings("unchecked")
+    protected void connectEnrichers() {
+        List<? extends List<? extends AttributeSensor<? extends Number>>> summingEnricherSetup = ImmutableList.of(
+                ImmutableList.of(CassandraNode.READ_ACTIVE, READ_ACTIVE),
+                ImmutableList.of(CassandraNode.READ_PENDING, READ_PENDING),
+                ImmutableList.of(CassandraNode.WRITE_ACTIVE, WRITE_ACTIVE),
+                ImmutableList.of(CassandraNode.WRITE_PENDING, WRITE_PENDING)
+        );
+        
+        List<? extends List<? extends AttributeSensor<? extends Number>>> averagingEnricherSetup = ImmutableList.of(
+                ImmutableList.of(CassandraNode.READS_PER_SECOND_LAST, READS_PER_SECOND_LAST_PER_NODE),
+                ImmutableList.of(CassandraNode.WRITES_PER_SECOND_LAST, WRITES_PER_SECOND_LAST_PER_NODE),
+                ImmutableList.of(CassandraNode.WRITES_PER_SECOND_IN_WINDOW, WRITES_PER_SECOND_IN_WINDOW_PER_NODE),
+                ImmutableList.of(CassandraNode.READS_PER_SECOND_IN_WINDOW, READS_PER_SECOND_IN_WINDOW_PER_NODE),
+                ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY, THRIFT_PORT_LATENCY_PER_NODE),
+                ImmutableList.of(CassandraNode.THRIFT_PORT_LATENCY_IN_WINDOW, THRIFT_PORT_LATENCY_IN_WINDOW_PER_NODE),
+                ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_LAST, PROCESS_CPU_TIME_FRACTION_LAST_PER_NODE),
+                ImmutableList.of(CassandraNode.PROCESS_CPU_TIME_FRACTION_IN_WINDOW, PROCESS_CPU_TIME_FRACTION_IN_WINDOW_PER_NODE)
+        );
+        
+        for (List<? extends AttributeSensor<? extends Number>> es : summingEnricherSetup) {
+            AttributeSensor<? extends Number> t = es.get(0);
+            AttributeSensor<? extends Number> total = es.get(1);
+            addEnricher(Enrichers.builder()
+                    .aggregating(t)
+                    .publishing(total)
+                    .fromMembers()
+                    .computingSum()
+                    .defaultValueForUnreportedSensors(null)
+                    .valueToReportIfNoSensors(null)
+                    .build());
+        }
+        
+        for (List<? extends AttributeSensor<? extends Number>> es : averagingEnricherSetup) {
+            AttributeSensor<Number> t = (AttributeSensor<Number>) es.get(0);
+            AttributeSensor<Double> average = (AttributeSensor<Double>) es.get(1);
+            addEnricher(Enrichers.builder()
+                    .aggregating(t)
+                    .publishing(average)
+                    .fromMembers()
+                    .computingAverage()
+                    .defaultValueForUnreportedSensors(null)
+                    .valueToReportIfNoSensors(null)
+                    .build());
+
+        }
+    }
+
+    @Override
+    public void stop() {
+        disconnectSensors();
+        
+        super.stop();
+    }
+    
+    protected void disconnectSensors() {
+    }
+
+    @Override
+    public void update() {
+        synchronized (mutex) {
+            // Update our seeds, as necessary
+            seedTracker.refreshSeeds();
+            
+            // Choose the first available cluster member to set host and port (and compute one-up)
+            Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
+
+            if (upNode.isPresent()) {
+                setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
+                setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
+
+                List<String> currentNodes = getAttribute(CASSANDRA_CLUSTER_NODES);
+                Set<String> oldNodes = (currentNodes != null) ? ImmutableSet.copyOf(currentNodes) : ImmutableSet.<String>of();
+                Set<String> newNodes = MutableSet.<String>of();
+                for (Entity member : getMembers()) {
+                    if (member instanceof CassandraNode && Boolean.TRUE.equals(member.getAttribute(SERVICE_UP))) {
+                        String hostname = member.getAttribute(Attributes.HOSTNAME);
+                        Integer thriftPort = member.getAttribute(CassandraNode.THRIFT_PORT);
+                        if (hostname != null && thriftPort != null) {
+                            newNodes.add(HostAndPort.fromParts(hostname, thriftPort).toString());
+                        }
+                    }
+                }
+                if (Sets.symmetricDifference(oldNodes, newNodes).size() > 0) {
+                    setAttribute(CASSANDRA_CLUSTER_NODES, MutableList.copyOf(newNodes));
+                }
+            } else {
+                setAttribute(HOSTNAME, null);
+                setAttribute(THRIFT_PORT, null);
+                setAttribute(CASSANDRA_CLUSTER_NODES, Collections.<String>emptyList());
+            }
+
+            ServiceNotUpLogic.updateNotUpIndicatorRequiringNonEmptyList(this, CASSANDRA_CLUSTER_NODES);
+        }
+    }
+    
+    /**
+     * For tracking our seeds. This gets fiddly! High-level logic is:
+     * <ul>
+     *   <li>If we have never reached quorum (i.e. have never published seeds), then continue to wait for quorum;
+     *       because entity-startup may be blocking for this. This is handled by the seedSupplier.
+     *   <li>If we previously reached quorum (i.e. have previousy published seeds), then always update;
+     *       we never want stale/dead entities listed in our seeds.
+     *   <li>If an existing seed looks unhealthy, then replace it.
+     *   <li>If a new potential seed becomes available (and we're in need of more), then add it.
+     * <ul>
+     * 
+     * Also note that {@link CassandraFabric} can take over, because it know about multiple sub-clusters!
+     * It will provide a different {@link CassandraDatacenter#SEED_SUPPLIER}. Each time we think that our seeds
+     * need to change, we call that. The fabric will call into {@link CassandraDatacenterImpl#gatherPotentialSeeds()}
+     * to find out what's available.
+     * 
+     * @author aled
+     */
+    protected class SeedTracker {
+        private final Map<Entity, Boolean> memberUpness = Maps.newLinkedHashMap();
+        
+        public void onMemberRemoved(Entity member) {
+            Set<Entity> seeds = getSeeds();
+            boolean maybeRemove = seeds.contains(member);
+            memberUpness.remove(member);
+            
+            if (maybeRemove) {
+                refreshSeeds();
+            } else {
+                if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} removed)", new Object[] {CassandraDatacenterImpl.this, member});
+                return;
+            }
+        }
+        public void onHostnameChanged(Entity member, String hostname) {
+            Set<Entity> seeds = getSeeds();
+            int quorum = getSeedQuorumSize();
+            boolean isViable = isViableSeed(member);
+            boolean maybeAdd = isViable && seeds.size() < quorum;
+            boolean maybeRemove = seeds.contains(member) && !isViable;
+            
+            if (maybeAdd || maybeRemove) {
+                refreshSeeds();
+            } else {
+                if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed hostname {})", new Object[] {CassandraDatacenterImpl.this, member, hostname});
+                return;
+            }
+        }
+        public void onServiceUpChanged(Entity member, Boolean serviceUp) {
+            Boolean oldVal = memberUpness.put(member, serviceUp);
+            if (Objects.equal(oldVal, serviceUp)) {
+                if (log.isTraceEnabled()) log.trace("Ignoring duplicate service-up in "+CassandraDatacenterImpl.this+" for "+member+", "+serviceUp);
+            }
+            Set<Entity> seeds = getSeeds();
+            int quorum = getSeedQuorumSize();
+            boolean isViable = isViableSeed(member);
+            boolean maybeAdd = isViable && seeds.size() < quorum;
+            boolean maybeRemove = seeds.contains(member) && !isViable;
+            
+            if (log.isDebugEnabled())
+                log.debug("Considering refresh of seeds for "+CassandraDatacenterImpl.this+" because "+member+" is now "+serviceUp+" ("+isViable+" / "+maybeAdd+" / "+maybeRemove+")");
+            if (maybeAdd || maybeRemove) {
+                refreshSeeds();
+            } else {
+                if (log.isTraceEnabled()) log.trace("Seeds considered stable for cluster {} (node {} changed serviceUp {})", new Object[] {CassandraDatacenterImpl.this, member, serviceUp});
+                return;
+            }
+        }
+        protected Set<Entity> getSeeds() {
+            Set<Entity> result = getAttribute(CURRENT_SEEDS);
+            return (result == null) ? ImmutableSet.<Entity>of() : result;
+        }
+        public void refreshSeeds() {
+            Set<Entity> oldseeds = getAttribute(CURRENT_SEEDS);
+            Set<Entity> newseeds = getSeedSupplier().get();
+            if (Objects.equal(oldseeds, newseeds)) {
+                if (log.isTraceEnabled()) log.debug("Seed refresh no-op for cluster {}: still={}", new Object[] {CassandraDatacenterImpl.this, oldseeds});
+            } else {
+                if (log.isDebugEnabled()) log.debug("Refreshing seeds of cluster {}: now={}; old={}", new Object[] {this, newseeds, oldseeds});
+                setAttribute(CURRENT_SEEDS, newseeds);
+                if (newseeds != null && newseeds.size() > 0) {
+                    setAttribute(HAS_PUBLISHED_SEEDS, true);
+                }
+            }
+        }
+        public Set<Entity> gatherPotentialSeeds() {
+            Set<Entity> result = Sets.newLinkedHashSet();
+            for (Entity member : getMembers()) {
+                if (isViableSeed(member)) {
+                    result.add(member);
+                }
+            }
+            if (log.isTraceEnabled()) log.trace("Viable seeds in Cluster {}: {}", new Object[] {result});
+            return result;
+        }
+        public Set<Entity> gatherPotentialRunningSeeds() {
+            Set<Entity> result = Sets.newLinkedHashSet();
+            for (Entity member : getMembers()) {
+                if (isRunningSeed(member)) {
+                    result.add(member);
+                }
+            }
+            if (log.isTraceEnabled()) log.trace("Viable running seeds in Cluster {}: {}", new Object[] {result});
+            return result;
+        }
+        public boolean isViableSeed(Entity member) {
+            // TODO would be good to reuse the better logic in ServiceFailureDetector
+            // (e.g. if that didn't just emit a notification but set a sensor as well?)
+            boolean managed = Entities.isManaged(member);
+            String hostname = member.getAttribute(Attributes.HOSTNAME);
+            boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
+            Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+            boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
+            boolean result = (hostname != null && !hasFailed);
+            if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, this, result, hostname, serviceUp, serviceState, hasFailed});
+            return result;
+        }
+        public boolean isRunningSeed(Entity member) {
+            boolean viableSeed = isViableSeed(member);
+            boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
+            Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+            boolean result = viableSeed && serviceUp && serviceState == Lifecycle.RUNNING;
+            if (log.isTraceEnabled()) log.trace("Node {} in Cluster {}: runningSeed={}; viableSeed={}; serviceUp={}; serviceState={}", new Object[] {member, this, result, viableSeed, serviceUp, serviceState});
+            return result;
+        }
+    }
+    
+    @Override
+    public String executeScript(String commands) {
+        Entity someChild = Iterables.getFirst(getMembers(), null);
+        if (someChild==null)
+            throw new IllegalStateException("No Cassandra nodes available");
+        // FIXME cross-etntity method-style calls such as below do not set up a queueing context (DynamicSequentialTask) 
+//        return ((CassandraNode)someChild).executeScript(commands);
+        return Entities.invokeEffector(this, someChild, CassandraNode.EXECUTE_SCRIPT, MutableMap.of("commands", commands)).getUnchecked();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
new file mode 100644
index 0000000..23db92c
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabric.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.util.Set;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.annotation.Effector;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.MethodEffector;
+import brooklyn.entity.group.DynamicFabric;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.location.Location;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Multimap;
+import com.google.common.reflect.TypeToken;
+
+/**
+ * A fabric of {@link CassandraNode}s, which forms a cluster spanning multiple locations.
+ * <p>
+ * Each {@link CassandraDatacenter} child instance is actually just a part of the whole cluster. It consists of the
+ * nodes in that single location (which normally corresponds to a "datacenter" in Cassandra terminology).
+ */
+@Catalog(name="Apache Cassandra Database Fabric", description="Cassandra is a highly scalable, eventually " +
+        "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
+        "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@ImplementedBy(CassandraFabricImpl.class)
+public interface CassandraFabric extends DynamicFabric {
+
+    ConfigKey<Integer> INITIAL_QUORUM_SIZE = ConfigKeys.newIntegerConfigKey(
+            "fabric.initial.quorumSize",
+            "Initial fabric quorum size - number of initial nodes that must have been successfully started " +
+            "to report success (if less than 0, then use a value based on INITIAL_SIZE of clusters)",
+            -1);
+    
+    @SuppressWarnings("serial")
+    ConfigKey<Function<Location, String>> DATA_CENTER_NAMER = ConfigKeys.newConfigKey(new TypeToken<Function<Location, String>>(){}, 
+            "cassandra.fabric.datacenter.namer",
+            "Function used to provide the cassandra.replication.datacenterName for a given location");
+
+    int DEFAULT_SEED_QUORUM = 5;
+    
+    AttributeSensor<Multimap<String,Entity>> DATACENTER_USAGE = CassandraDatacenter.DATACENTER_USAGE;
+
+    AttributeSensor<Set<String>> DATACENTERS = CassandraDatacenter.DATACENTERS;
+
+    AttributeSensor<Set<Entity>> CURRENT_SEEDS = CassandraDatacenter.CURRENT_SEEDS;
+
+    AttributeSensor<Boolean> HAS_PUBLISHED_SEEDS = CassandraDatacenter.HAS_PUBLISHED_SEEDS;
+
+    AttributeSensor<String> HOSTNAME = CassandraDatacenter.HOSTNAME;
+
+    AttributeSensor<Integer> THRIFT_PORT = CassandraDatacenter.THRIFT_PORT;
+
+    MethodEffector<Void> UPDATE = new MethodEffector<Void>(CassandraFabric.class, "update");
+
+    @Effector(description="Updates the cluster members")
+    void update();
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
new file mode 100644
index 0000000..bce7cac
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraFabricImpl.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.DynamicGroup;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicFabricImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.collections.CollectionFunctionals;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.MutableSet;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of {@link CassandraDatacenter}.
+ * <p>
+ * Serveral subtleties to note:
+ * - a node may take some time after it is running and serving JMX to actually be contactable on its thrift port
+ *   (so we wait for thrift port to be contactable)
+ * - sometimes new nodes take a while to peer, and/or take a while to get a consistent schema
+ *   (each up to 1m; often very close to the 1m) 
+ */
+public class CassandraFabricImpl extends DynamicFabricImpl implements CassandraFabric {
+
+    private static final Logger log = LoggerFactory.getLogger(CassandraFabricImpl.class);
+
+    // Mutex for synchronizing during re-size operations
+    private final Object mutex = new Object[0];
+
+    private final Supplier<Set<Entity>> defaultSeedSupplier = new Supplier<Set<Entity>>() {
+        @Override public Set<Entity> get() {
+            // TODO Remove duplication from CassandraClusterImpl.defaultSeedSupplier
+            Set<Entity> seeds = getAttribute(CURRENT_SEEDS);
+            boolean hasPublishedSeeds = Boolean.TRUE.equals(getAttribute(HAS_PUBLISHED_SEEDS));
+            int quorumSize = getSeedQuorumSize();
+            
+            // update seeds if we're not quorate; note this may not work for dynamically adding new datacenters
+            // as we do not take a new seed from the new datacenter
+            if (seeds == null || seeds.size() < quorumSize || containsDownEntity(seeds)) {
+                Set<Entity> newseeds;
+                Map<CassandraDatacenter,Set<Entity>> potentialSeeds = MutableMap.of();
+                int potentialSeedCount = 0;
+                for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+                    Set<Entity> dcPotentialSeeds = member.gatherPotentialSeeds();
+                    potentialSeeds.put(member, dcPotentialSeeds);
+                    potentialSeedCount += dcPotentialSeeds.size();
+                }
+                
+                if (hasPublishedSeeds) {
+                    Set<Entity> currentSeeds = getAttribute(CURRENT_SEEDS);
+                    Lifecycle serviceState = getAttribute(SERVICE_STATE_ACTUAL);
+                    if (serviceState == Lifecycle.STARTING) {
+                        if (Sets.intersection(currentSeeds, ImmutableSet.copyOf(Iterables.concat(potentialSeeds.values()))).isEmpty()) {
+                            log.warn("Fabric {} lost all its seeds while starting! Subsequent failure likely, but changing seeds during startup would risk split-brain: seeds={}", new Object[] {CassandraFabricImpl.this, currentSeeds});
+                        }
+                        newseeds = currentSeeds;
+                    } else if (serviceState == Lifecycle.STOPPING || serviceState == Lifecycle.STOPPED) {
+                        if (log.isTraceEnabled()) log.trace("Fabric {} ignoring any potential seed-changes, because {}: seeds={}", new Object[] {CassandraFabricImpl.this, serviceState, currentSeeds});
+                        newseeds = currentSeeds;
+                    } else if (potentialSeedCount == 0) {
+                        // TODO Could be race where nodes have only just returned from start() and are about to 
+                        // transition to serviceUp; so don't just abandon all our seeds!
+                        log.warn("Fabric {} has no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
+                        newseeds = currentSeeds;
+                    } else if (!allNonEmpty(potentialSeeds.values())) {
+                        log.warn("Fabric {} has datacenter with no seeds (after startup); leaving seeds as-is; but risks split-brain if these seeds come back up!", new Object[] {CassandraFabricImpl.this});
+                        newseeds = currentSeeds;
+                    } else {
+                        Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
+                        if (log.isDebugEnabled() && !Objects.equal(seeds, result)) {
+                            log.debug("Fabric {} updating seeds: chosen={}; potential={}", new Object[] {CassandraFabricImpl.this, result, potentialSeeds});
+                        }
+                        newseeds = result;
+                    }
+                } else if (potentialSeedCount < quorumSize) {
+                    if (log.isDebugEnabled()) log.debug("Not setting seeds of fabric {} yet, because still waiting for quorum (need {}; have {} potentials from {} members)", new Object[] {CassandraFabricImpl.this, quorumSize, potentialSeedCount, getMembers()});
+                    newseeds = ImmutableSet.of();
+                } else if (!allNonEmpty(potentialSeeds.values())) {
+                    if (log.isDebugEnabled()) {
+                        Map<CassandraDatacenter, Integer> datacenterCounts = Maps.transformValues(potentialSeeds, CollectionFunctionals.sizeFunction());
+                        log.debug("Not setting seeds of fabric {} yet, because not all datacenters have seeds (sizes are {})", new Object[] {CassandraFabricImpl.this, datacenterCounts});
+                    }
+                    newseeds = ImmutableSet.of();
+                } else {
+                    // yay, we're quorate
+                    Set<Entity> result = selectSeeds(quorumSize, potentialSeeds);
+                    log.info("Fabric {} has reached seed quorum: seeds={}", new Object[] {CassandraFabricImpl.this, result});
+                    newseeds = result;
+                }
+                
+                if (!Objects.equal(seeds, newseeds)) {
+                    setAttribute(CURRENT_SEEDS, newseeds);
+                    
+                    if (newseeds != null && newseeds.size() > 0) {
+                        setAttribute(HAS_PUBLISHED_SEEDS, true);
+                        
+                        // Need to tell every datacenter that seeds are ready.
+                        // Otherwise a datacenter might get no more changes (e.g. to nodes' hostnames etc), 
+                        // and not call seedSupplier.get() again.
+                        for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+                            member.update();
+                        }
+                    }
+                    return newseeds;
+                } else {
+                    return seeds;
+                }
+            } else {
+                if (log.isTraceEnabled()) log.trace("Not refresheed seeds of fabric {}, because have quorum {} (of {} members), and none are down: seeds={}", 
+                        new Object[] {CassandraFabricImpl.class, quorumSize, getMembers().size(), seeds});
+                return seeds;
+            }
+        }
+        private boolean allNonEmpty(Collection<? extends Collection<Entity>> contenders) {
+            for (Collection<Entity> contender: contenders)
+                if (contender.isEmpty()) return false;
+            return true;
+        }
+        private Set<Entity> selectSeeds(int num, Map<CassandraDatacenter,? extends Collection<Entity>> contenders) {
+            // Prefer existing seeds wherever possible;
+            // otherwise prefer a seed from each sub-cluster;
+            // otherwise accept any other contenders
+            Set<Entity> currentSeeds = (getAttribute(CURRENT_SEEDS) != null) ? getAttribute(CURRENT_SEEDS) : ImmutableSet.<Entity>of();
+            MutableSet<Entity> result = MutableSet.of();
+            result.addAll(Sets.intersection(currentSeeds, ImmutableSet.copyOf(contenders.values())));
+            for (CassandraDatacenter cluster : contenders.keySet()) {
+                Set<Entity> contendersInCluster = Sets.newLinkedHashSet(contenders.get(cluster));
+                if (contendersInCluster.size() > 0 && Sets.intersection(result, contendersInCluster).isEmpty()) {
+                    result.add(Iterables.getFirst(contendersInCluster, null));
+                }
+            }
+            result.addAll(Iterables.concat(contenders.values()));
+            return ImmutableSet.copyOf(Iterables.limit(result, num));
+        }
+        private boolean containsDownEntity(Set<Entity> seeds) {
+            for (Entity seed : seeds) {
+                if (!isViableSeed(seed)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+        public boolean isViableSeed(Entity member) {
+            // TODO remove duplication from CassandraClusterImpl.SeedTracker.isViableSeed
+            boolean managed = Entities.isManaged(member);
+            String hostname = member.getAttribute(Attributes.HOSTNAME);
+            boolean serviceUp = Boolean.TRUE.equals(member.getAttribute(Attributes.SERVICE_UP));
+            Lifecycle serviceState = member.getAttribute(Attributes.SERVICE_STATE_ACTUAL);
+            boolean hasFailed = !managed || (serviceState == Lifecycle.ON_FIRE) || (serviceState == Lifecycle.RUNNING && !serviceUp) || (serviceState == Lifecycle.STOPPED);
+            boolean result = (hostname != null && !hasFailed);
+            if (log.isTraceEnabled()) log.trace("Node {} in Fabric {}: viableSeed={}; hostname={}; serviceUp={}; serviceState={}; hasFailed={}", new Object[] {member, CassandraFabricImpl.this, result, hostname, serviceUp, serviceState, hasFailed});
+            return result;
+        }
+    };
+
+    public CassandraFabricImpl() {
+    }
+
+    @Override
+    public void init() {
+        super.init();
+
+        if (!getConfigRaw(CassandraDatacenter.SEED_SUPPLIER, true).isPresentAndNonNull())
+            setConfig(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
+        
+        // track members
+        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Cassandra Fabric Tracker")
+                .configure("group", this));
+
+        // Track first node's startup
+        subscribeToMembers(this, CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, new SensorEventListener<Long>() {
+            @Override
+            public void onEvent(SensorEvent<Long> event) {
+                Long oldval = getAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC);
+                Long newval = event.getValue();
+                if (oldval == null && newval != null) {
+                    setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
+                    for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+                        ((EntityInternal)member).setAttribute(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, newval);
+                    }
+                }
+            }
+        });
+        
+        // Track the datacenters for this cluster
+        subscribeToMembers(this, CassandraDatacenter.DATACENTER_USAGE, new SensorEventListener<Multimap<String,Entity>>() {
+            @Override
+            public void onEvent(SensorEvent<Multimap<String,Entity>> event) {
+                Multimap<String, Entity> usage = calculateDatacenterUsage();
+                setAttribute(DATACENTER_USAGE, usage);
+                setAttribute(DATACENTERS, usage.keySet());
+            }
+        });
+        subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
+            @Override public void onEvent(SensorEvent<Entity> event) {
+                Multimap<String, Entity> usage = calculateDatacenterUsage();
+                setAttribute(DATACENTER_USAGE, usage);
+                setAttribute(DATACENTERS, usage.keySet());
+            }
+        });
+    }
+
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override
+        protected void onEntityChange(Entity member) {
+            if (log.isDebugEnabled()) log.debug("Location {} updated in Fabric {}", member, entity);
+            ((CassandraFabricImpl)entity).update();
+        }
+        @Override
+        protected void onEntityAdded(Entity member) {
+            if (log.isDebugEnabled()) log.debug("Location {} added to Fabric {}", member, entity);
+            ((CassandraFabricImpl)entity).update();
+        }
+        @Override
+        protected void onEntityRemoved(Entity member) {
+            if (log.isDebugEnabled()) log.debug("Location {} removed from Fabric {}", member, entity);
+            ((CassandraFabricImpl)entity).update();
+        }
+    };
+
+    protected int getSeedQuorumSize() {
+        Integer quorumSize = getConfig(INITIAL_QUORUM_SIZE);
+        if (quorumSize!=null && quorumSize>0)
+            return quorumSize;
+
+        int initialSizeSum = 0;
+        for (CassandraDatacenter cluster : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+            initialSizeSum += cluster.getConfig(CassandraDatacenter.INITIAL_SIZE);
+        }
+        if (initialSizeSum>5) initialSizeSum /= 2;
+        else if (initialSizeSum>3) initialSizeSum -= 2;
+        else if (initialSizeSum>2) initialSizeSum -= 1;
+        
+        return Math.min(Math.max(initialSizeSum, 1), CassandraFabric.DEFAULT_SEED_QUORUM);
+    }
+
+    /**
+     * Sets the default {@link #MEMBER_SPEC} to describe the Cassandra sub-clusters.
+     */
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        // Need to set the seedSupplier, even if the caller has overridden the CassandraCluster config
+        // (unless they've explicitly overridden the seedSupplier as well!)
+        // TODO probably don't need to anymore, as it is set on the Fabric here -- just make sure there is a default!
+        EntitySpec<?> custom = getConfig(MEMBER_SPEC);
+        if (custom == null) {
+            return EntitySpec.create(CassandraDatacenter.class)
+                    .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
+        } else if (custom.getConfig().containsKey(CassandraDatacenter.SEED_SUPPLIER) || custom.getFlags().containsKey("seedSupplier")) {
+            return custom;
+        } else {
+            return EntitySpec.create(custom)
+                    .configure(CassandraDatacenter.SEED_SUPPLIER, getSeedSupplier());
+        }
+    }
+    
+    @Override
+    protected Entity createCluster(Location location, Map flags) {
+        Function<Location, String> dataCenterNamer = getConfig(DATA_CENTER_NAMER);
+        if (dataCenterNamer != null) {
+            flags = ImmutableMap.builder()
+                .putAll(flags)
+                .put(CassandraNode.DATACENTER_NAME, dataCenterNamer.apply(location))
+                .build();
+        }
+        return super.createCluster(location, flags);
+    }
+
+    /**
+     * Prefers one node per location, and then others from anywhere.
+     * Then trims result down to the "quorumSize".
+     */
+    public Supplier<Set<Entity>> getSeedSupplier() {
+        return defaultSeedSupplier;
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        super.start(locations);
+
+        connectSensors();
+
+        // TODO wait until all nodes which we think are up are consistent 
+        // i.e. all known nodes use the same schema, as reported by
+        // SshEffectorTasks.ssh("echo \"describe cluster;\" | /bin/cassandra-cli");
+        // once we've done that we can revert to using 2 seed nodes.
+        // see CassandraCluster.DEFAULT_SEED_QUORUM
+        Time.sleep(getConfig(CassandraDatacenter.DELAY_BEFORE_ADVERTISING_CLUSTER));
+
+        update();
+    }
+
+    protected void connectSensors() {
+        connectEnrichers();
+    }
+    
+    protected void connectEnrichers() {
+        // TODO Aggregate across sub-clusters
+
+        subscribeToMembers(this, SERVICE_UP, new SensorEventListener<Boolean>() {
+            @Override public void onEvent(SensorEvent<Boolean> event) {
+                setAttribute(SERVICE_UP, calculateServiceUp());
+            }
+        });
+    }
+
+    @Override
+    public void stop() {
+        disconnectSensors();
+        
+        super.stop();
+    }
+    
+    protected void disconnectSensors() {
+    }
+
+    protected boolean calculateServiceUp() {
+        Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
+        return upNode.isPresent();
+    }
+
+    protected Multimap<String, Entity> calculateDatacenterUsage() {
+        Multimap<String, Entity> result = LinkedHashMultimap.<String, Entity>create();
+        for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+            Multimap<String, Entity> memberUsage = member.getAttribute(CassandraDatacenter.DATACENTER_USAGE);
+            if (memberUsage != null) result.putAll(memberUsage);
+        }
+        return result;
+    }
+
+    @Override
+    public void update() {
+        synchronized (mutex) {
+            for (CassandraDatacenter member : Iterables.filter(getMembers(), CassandraDatacenter.class)) {
+                member.update();
+            }
+
+            calculateServiceUp();
+
+            // Choose the first available location to set host and port (and compute one-up)
+            Optional<Entity> upNode = Iterables.tryFind(getMembers(), EntityPredicates.attributeEqualTo(SERVICE_UP, Boolean.TRUE));
+
+            if (upNode.isPresent()) {
+                setAttribute(HOSTNAME, upNode.get().getAttribute(Attributes.HOSTNAME));
+                setAttribute(THRIFT_PORT, upNode.get().getAttribute(CassandraNode.THRIFT_PORT));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
new file mode 100644
index 0000000..7d0a56d
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNode.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import java.math.BigInteger;
+import java.util.Set;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.BrooklynConfigKeys;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.database.DatastoreMixins;
+import brooklyn.entity.java.UsesJavaMXBeans;
+import brooklyn.entity.java.UsesJmx;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.location.basic.PortRanges;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.reflect.TypeToken;
+
+/**
+ * An {@link brooklyn.entity.Entity} that represents a Cassandra node in a {@link CassandraDatacenter}.
+ */
+@Catalog(name="Apache Cassandra Node", description="Cassandra is a highly scalable, eventually " +
+        "consistent, distributed, structured key-value store which provides a ColumnFamily-based data model " +
+        "richer than typical key/value systems", iconUrl="classpath:///cassandra-logo.jpeg")
+@ImplementedBy(CassandraNodeImpl.class)
+public interface CassandraNode extends DatastoreMixins.DatastoreCommon, SoftwareProcess, UsesJmx, UsesJavaMXBeans, DatastoreMixins.HasDatastoreUrl, DatastoreMixins.CanExecuteScript {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "1.2.16");
+    // when this changes remember to put a copy under releng2:/var/www/developer/brooklyn/repository/ !
+    // TODO experiment with supporting 2.0.x
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, "${driver.mirrorUrl}/${version}/apache-cassandra-${version}-bin.tar.gz");
+
+    /** download mirror, if desired */
+    @SetFromFlag("mirrorUrl")
+    ConfigKey<String> MIRROR_URL = new BasicConfigKey<String>(String.class, "cassandra.install.mirror.url", "URL of mirror", 
+        "http://www.mirrorservice.org/sites/ftp.apache.org/cassandra"
+        // for older versions, but slower:
+//        "http://archive.apache.org/dist/cassandra/"
+        );
+
+    @SetFromFlag("tgzUrl")
+    ConfigKey<String> TGZ_URL = new BasicConfigKey<String>(String.class, "cassandra.install.tgzUrl", "URL of TGZ download file");
+
+    @SetFromFlag("clusterName")
+    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = CassandraDatacenter.CLUSTER_NAME;
+
+    @SetFromFlag("snitchName")
+    ConfigKey<String> ENDPOINT_SNITCH_NAME = CassandraDatacenter.ENDPOINT_SNITCH_NAME;
+
+    @SetFromFlag("gossipPort")
+    PortAttributeSensorAndConfigKey GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.gossip.port", "Cassandra Gossip communications port", PortRanges.fromString("7000+"));
+
+    @SetFromFlag("sslGgossipPort")
+    PortAttributeSensorAndConfigKey SSL_GOSSIP_PORT = new PortAttributeSensorAndConfigKey("cassandra.ssl-gossip.port", "Cassandra Gossip SSL communications port", PortRanges.fromString("7001+"));
+
+    @SetFromFlag("thriftPort")
+    PortAttributeSensorAndConfigKey THRIFT_PORT = new PortAttributeSensorAndConfigKey("cassandra.thrift.port", "Cassandra Thrift RPC port", PortRanges.fromString("9160+"));
+
+    @SetFromFlag("nativePort")
+    PortAttributeSensorAndConfigKey NATIVE_TRANSPORT_PORT = new PortAttributeSensorAndConfigKey("cassandra.native.port", "Cassandra Native Transport port", PortRanges.fromString("9042+"));
+
+    @SetFromFlag("rmiRegistryPort")
+    // cassandra nodetool and others want 7199 - not required, but useful
+    PortAttributeSensorAndConfigKey RMI_REGISTRY_PORT = new PortAttributeSensorAndConfigKey(UsesJmx.RMI_REGISTRY_PORT, 
+        PortRanges.fromInteger(7199));
+
+    // some of the cassandra tooing (eg nodetool) use RMI, but we want JMXMP, so do both!
+    ConfigKey<JmxAgentModes> JMX_AGENT_MODE = ConfigKeys.newConfigKeyWithDefault(UsesJmx.JMX_AGENT_MODE, JmxAgentModes.JMXMP_AND_RMI);
+    
+    @SetFromFlag("customSnitchJarUrl")
+    ConfigKey<String> CUSTOM_SNITCH_JAR_URL = ConfigKeys.newStringConfigKey("cassandra.config.customSnitchUrl", 
+            "URL for a jar file to be uploaded (e.g. \"classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-multicloud-snitch.jar\"); defaults to null which means nothing to upload", 
+            null);
+
+    @SetFromFlag("cassandraConfigTemplateUrl")
+    ConfigKey<String> CASSANDRA_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+            "cassandra.config.templateUrl", "A URL (in freemarker format) for a cassandra.yaml config file (in freemarker format)", 
+            "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-${entity.majorMinorVersion}.yaml");
+
+    @SetFromFlag("cassandraConfigFileName")
+    ConfigKey<String> CASSANDRA_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
+            "cassandra.config.fileName", "Name for the copied config file", "cassandra.yaml");
+
+    @SetFromFlag("cassandraRackdcConfigTemplateUrl")
+    ConfigKey<String> CASSANDRA_RACKDC_CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey(
+            "cassandra.config.rackdc.templateUrl", "Template file (in freemarker format) for the cassandra-rackdc.properties config file", 
+            "classpath://org/apache/brooklyn/entity/nosql/cassandra/cassandra-rackdc.properties");
+
+    @SetFromFlag("cassandraRackdcConfigFileName")
+    ConfigKey<String> CASSANDRA_RACKDC_CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey(
+            "cassandra.config.rackdc.fileName", "Name for the copied rackdc config file (used for configuring replication, when a suitable snitch is used)", "cassandra-rackdc.properties");
+    
+    @SetFromFlag("datacenterName")
+    BasicAttributeSensorAndConfigKey<String> DATACENTER_NAME = new BasicAttributeSensorAndConfigKey<String>(
+            String.class, "cassandra.replication.datacenterName", "Datacenter name (used for configuring replication, when a suitable snitch is used)", 
+            null);
+
+    @SetFromFlag("rackName")
+    BasicAttributeSensorAndConfigKey<String> RACK_NAME = new BasicAttributeSensorAndConfigKey<String>(
+            String.class, "cassandra.replication.rackName", "Rack name (used for configuring replication, when a suitable snitch is used)", 
+            null);
+
+    ConfigKey<Integer> NUM_TOKENS_PER_NODE = ConfigKeys.newIntegerConfigKey("cassandra.numTokensPerNode",
+            "Number of tokens per node; if using vnodes, should set this to a value like 256",
+            1);
+    
+    /**
+     * @deprecated since 0.7; use {@link #TOKENS}
+     */
+    @SetFromFlag("token")
+    @Deprecated
+    BasicAttributeSensorAndConfigKey<BigInteger> TOKEN = new BasicAttributeSensorAndConfigKey<BigInteger>(
+            BigInteger.class, "cassandra.token", "Cassandra Token");
+
+    @SetFromFlag("tokens")
+    BasicAttributeSensorAndConfigKey<Set<BigInteger>> TOKENS = new BasicAttributeSensorAndConfigKey<Set<BigInteger>>(
+            new TypeToken<Set<BigInteger>>() {}, "cassandra.tokens", "Cassandra Tokens");
+
+    AttributeSensor<Integer> PEERS = Sensors.newIntegerSensor( "cassandra.peers", "Number of peers in cluster");
+
+    AttributeSensor<Integer> LIVE_NODE_COUNT = Sensors.newIntegerSensor( "cassandra.liveNodeCount", "Number of live nodes in cluster");
+
+    /* Metrics for read/write performance. */
+
+    AttributeSensor<Long> READ_PENDING = Sensors.newLongSensor("cassandra.read.pending", "Current pending ReadStage tasks");
+    AttributeSensor<Integer> READ_ACTIVE = Sensors.newIntegerSensor("cassandra.read.active", "Current active ReadStage tasks");
+    AttributeSensor<Long> READ_COMPLETED = Sensors.newLongSensor("cassandra.read.completed", "Total completed ReadStage tasks");
+    AttributeSensor<Long> WRITE_PENDING = Sensors.newLongSensor("cassandra.write.pending", "Current pending MutationStage tasks");
+    AttributeSensor<Integer> WRITE_ACTIVE = Sensors.newIntegerSensor("cassandra.write.active", "Current active MutationStage tasks");
+    AttributeSensor<Long> WRITE_COMPLETED = Sensors.newLongSensor("cassandra.write.completed", "Total completed MutationStage tasks");
+    
+    AttributeSensor<Boolean> SERVICE_UP_JMX = Sensors.newBooleanSensor("cassandra.service.jmx.up", "Whether JMX is up for this service");
+    AttributeSensor<Long> THRIFT_PORT_LATENCY = Sensors.newLongSensor("cassandra.thrift.latency", "Latency for thrift port connection (ms) or null if down");
+
+    AttributeSensor<Double> READS_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.reads.perSec.last", "Reads/sec (last datapoint)");
+    AttributeSensor<Double> WRITES_PER_SECOND_LAST = Sensors.newDoubleSensor("cassandra.write.perSec.last", "Writes/sec (last datapoint)");
+
+    AttributeSensor<Double> THRIFT_PORT_LATENCY_IN_WINDOW = Sensors.newDoubleSensor("cassandra.thrift.latency.windowed", "Latency for thrift port (ms, averaged over time window)");
+    AttributeSensor<Double> READS_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.reads.perSec.windowed", "Reads/sec (over time window)");
+    AttributeSensor<Double> WRITES_PER_SECOND_IN_WINDOW = Sensors.newDoubleSensor("cassandra.writes.perSec.windowed", "Writes/sec (over time window)");
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    ConfigKey<Set<Entity>> INITIAL_SEEDS = (ConfigKey)ConfigKeys.newConfigKey(Set.class, "cassandra.cluster.seeds.initial", 
+            "List of cluster nodes to seed this node");
+
+    ConfigKey<Duration> START_TIMEOUT = ConfigKeys.newConfigKeyWithDefault(BrooklynConfigKeys.START_TIMEOUT, Duration.FIVE_MINUTES);
+    
+    ConfigKey<String> LISTEN_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.listenAddressSensor", "sensor name from which to take the listen address; default (null) is a smart lookup");
+    ConfigKey<String> BROADCAST_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.broadcastAddressSensor", "sensor name from which to take the broadcast address; default (null) is a smart lookup");
+    ConfigKey<String> RPC_ADDRESS_SENSOR = ConfigKeys.newStringConfigKey("cassandra.rpcAddressSensor", "sensor name from which to take the RPC address; default (null) is 0.0.0.0");
+
+    Effector<String> EXECUTE_SCRIPT = CassandraDatacenter.EXECUTE_SCRIPT;
+
+    /* Accessors used from template */
+    
+    String getMajorMinorVersion();
+    Integer getGossipPort();
+    Integer getSslGossipPort();
+    Integer getThriftPort();
+    Integer getNativeTransportPort();
+    String getClusterName();
+    String getListenAddress();
+    String getBroadcastAddress();
+    String getRpcAddress();
+    String getSeeds();
+    
+    String getPrivateIp();
+    String getPublicIp();
+    
+    /**
+     * In range 0 to (2^127)-1; or null if not yet set or known.
+     * Returns the first token if more than one token.
+     * @deprecated since 0.7; see {@link #getTokens()}
+     */
+    @Deprecated
+    BigInteger getToken();
+
+    int getNumTokensPerNode();
+
+    Set<BigInteger> getTokens();
+
+    /**
+     * string value of token (with no commas, which freemarker introduces!) or blank if none
+     * @deprecated since 0.7; use {@link #getTokensAsString()}
+     */
+    @Deprecated
+    String getTokenAsString();
+
+    /** string value of comma-separated tokens; or blank if none */
+    String getTokensAsString();
+
+    /* For configuration */
+    
+    void setToken(String token);
+    
+    /* Using Cassandra */
+    
+    String executeScript(String commands);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
new file mode 100644
index 0000000..eab6672
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeDriver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.cassandra;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+import brooklyn.util.task.system.ProcessTaskWrapper;
+
+public interface CassandraNodeDriver extends JavaSoftwareProcessDriver {
+
+    Integer getGossipPort();
+
+    Integer getSslGossipPort();
+
+    Integer getThriftPort();
+
+    Integer getNativeTransportPort();
+
+    String getClusterName();
+
+    String getCassandraConfigTemplateUrl();
+
+    String getCassandraConfigFileName();
+
+    boolean isClustered();
+
+    ProcessTaskWrapper<Integer> executeScriptAsync(String commands);
+
+    /** returns the address that the given hostname resolves to at the target */
+    String getResolvedAddress(String hostname);
+
+}