You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2015/08/06 18:32:26 UTC

[12/26] incubator-brooklyn git commit: [BROOKLYN-162] Renaming of the NoSQL packages

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java
new file mode 100644
index 0000000..535bab6
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBConfigServerSshDriver.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBSshDriver;
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBDriver;
+
+import brooklyn.location.basic.SshMachineLocation;
+
+public class MongoDBConfigServerSshDriver extends AbstractMongoDBSshDriver implements MongoDBDriver {
+    
+    public MongoDBConfigServerSshDriver(MongoDBConfigServerImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+    
+    @Override
+    public MongoDBConfigServerImpl getEntity() {
+        return MongoDBConfigServerImpl.class.cast(super.getEntity());
+    }
+
+    @Override
+    public void launch() {
+        launch(getArgsBuilderWithDefaults(getEntity())
+                .add("--configsvr")
+                .add("--dbpath", getDataDirectory()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java
new file mode 100644
index 0000000..195b10f
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import org.apache.brooklyn.catalog.Catalog;
+import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBServer;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.time.Duration;
+
+import com.google.common.reflect.TypeToken;
+
+@Catalog(name="MongoDB Router",
+        description="MongoDB (from \"humongous\") is a scalable, high-performance, open source NoSQL database",
+        iconUrl="classpath:///mongodb-logo.png")
+@ImplementedBy(MongoDBRouterImpl.class)
+public interface MongoDBRouter extends AbstractMongoDBServer {
+
+    @SuppressWarnings("serial")
+    ConfigKey<Iterable<String>> CONFIG_SERVERS = ConfigKeys.newConfigKey(
+            new TypeToken<Iterable<String>>(){}, "mongodb.router.config.servers", "List of host names and ports of the config servers");
+    
+    AttributeSensor<Integer> SHARD_COUNT = Sensors.newIntegerSensor("mongodb.router.config.shard.count", "Number of shards that have been added");
+    
+    AttributeSensor<Boolean> RUNNING = Sensors.newBooleanSensor("mongodb.router.running", "Indicates that the router is running, "
+            + "and can be used to add shards, but is not necessarity available for CRUD operations (e.g. if no shards have been added)");
+
+    /**
+     * @throws IllegalStateException if times out.
+     */
+    public void waitForServiceUp(Duration duration);
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java
new file mode 100644
index 0000000..333a1bd
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterCluster.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.util.Collection;
+
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+
+@ImplementedBy(MongoDBRouterClusterImpl.class)
+public interface MongoDBRouterCluster extends DynamicCluster {
+
+    AttributeSensor<MongoDBRouter> ANY_ROUTER = Sensors.newSensor(MongoDBRouter.class, "mongodb.routercluster.any", 
+            "When set, can be used to access one of the routers in the cluster (usually the first). This will only be set once "
+            + "at least one shard has been added, and the router is available for CRUD operations");
+    
+    AttributeSensor<MongoDBRouter> ANY_RUNNING_ROUTER = Sensors.newSensor(MongoDBRouter.class, "mongodb.routercluster.any.running", 
+            "When set, can be used to access one of the running routers in the cluster (usually the first). This should only be used " 
+            + "to add shards as it does not guarantee that the router is available for CRUD operations");
+
+    /**
+     * @return One of the routers in the cluster if available, null otherwise
+     */
+    MongoDBRouter getAnyRouter();
+    
+    /**
+     * @return One of the running routers in the cluster. This should only be used to add shards as it does not guarantee that 
+     * the router is available for CRUD operations
+     */
+    MongoDBRouter getAnyRunningRouter();
+    
+    /**
+     * @return All routers in the cluster
+     */
+    Collection<MongoDBRouter> getRouters();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java
new file mode 100644
index 0000000..b905c10
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterClusterImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.util.Collection;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+public class MongoDBRouterClusterImpl extends DynamicClusterImpl implements MongoDBRouterCluster {
+
+    @Override
+    public void init() {
+        super.init();
+        subscribeToChildren(this, MongoDBRouter.RUNNING, new SensorEventListener<Boolean>() {
+            @Override public void onEvent(SensorEvent<Boolean> event) {
+                setAnyRouter();
+            }
+        });
+    }
+    
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        super.start(locations);
+        addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Router cluster membership tracker")
+                .configure("group", this));
+    }
+    
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override protected void onEntityEvent(EventType type, Entity member) {
+            ((MongoDBRouterClusterImpl)super.entity).setAnyRouter();
+        }
+        @Override protected void onEntityRemoved(Entity member) {
+            ((MongoDBRouterClusterImpl)super.entity).setAnyRouter();
+        }
+        @Override protected void onEntityChange(Entity member) {
+            ((MongoDBRouterClusterImpl)super.entity).setAnyRouter();
+        }
+    }
+    
+    protected void setAnyRouter() {
+        setAttribute(MongoDBRouterCluster.ANY_ROUTER, Iterables.tryFind(getRouters(), 
+                EntityPredicates.attributeEqualTo(Startable.SERVICE_UP, true)).orNull());
+
+        setAttribute(
+                MongoDBRouterCluster.ANY_RUNNING_ROUTER, 
+                Iterables.tryFind(getRouters(), EntityPredicates.attributeEqualTo(MongoDBRouter.RUNNING, true))
+                .orNull());
+    }
+    
+    @Override
+    public Collection<MongoDBRouter> getRouters() {
+        return ImmutableList.copyOf(Iterables.filter(getMembers(), MongoDBRouter.class));
+    }
+    
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        if (super.getMemberSpec() != null)
+            return super.getMemberSpec();
+        return EntitySpec.create(MongoDBRouter.class);
+    }
+
+    @Override
+    public MongoDBRouter getAnyRouter() {
+        return getAttribute(MongoDBRouterCluster.ANY_ROUTER);
+    }
+    
+    @Override
+    public MongoDBRouter getAnyRunningRouter() {
+        return getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
+    }
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java
new file mode 100644
index 0000000..3c7a30c
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterDriver.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface MongoDBRouterDriver extends SoftwareProcessDriver {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java
new file mode 100644
index 0000000..cbbc6b8
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBClientSupport;
+
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+
+import com.google.common.base.Functions;
+
+public class MongoDBRouterImpl extends SoftwareProcessImpl implements MongoDBRouter {
+    
+    private volatile FunctionFeed functionFeed;
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return MongoDBRouterDriver.class;
+    }
+
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+        functionFeed = FunctionFeed.builder()
+                .entity(this)
+                .poll(new FunctionPollConfig<Boolean, Boolean>(RUNNING)
+                        .period(5, TimeUnit.SECONDS)
+                        .callable(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this);
+                                return clientSupport.ping();
+                            }
+                        })
+                        .onException(Functions.<Boolean>constant(false)))
+                .poll(new FunctionPollConfig<Boolean, Boolean>(SERVICE_UP)
+                        .period(5, TimeUnit.SECONDS)
+                        .callable(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                // TODO: This is the same as in AbstractMongoDBSshDriver.isRunning. 
+                                // This feels like the right place. But feels like can be more consistent with different 
+                                // MongoDB types using the FunctionFeed.
+                                MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this);
+                                return clientSupport.ping() && MongoDBRouterImpl.this.getAttribute(SHARD_COUNT) > 0;
+                            }
+                        })
+                        .onException(Functions.<Boolean>constant(false)))
+                .poll(new FunctionPollConfig<Integer, Integer>(SHARD_COUNT)
+                        .period(5, TimeUnit.SECONDS)
+                        .callable(new Callable<Integer>() {
+                            public Integer call() throws Exception {
+                                MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this);
+                                return (int) clientSupport.getShardCount();
+                            }    
+                        })
+                        .onException(Functions.<Integer>constant(-1)))
+                .build();
+    }
+
+    @Override
+    protected void disconnectSensors() {
+        super.disconnectSensors();
+        if (functionFeed != null) functionFeed.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java
new file mode 100644
index 0000000..422b9ac
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBRouterSshDriver.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import org.apache.brooklyn.entity.nosql.mongodb.AbstractMongoDBSshDriver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+public class MongoDBRouterSshDriver extends AbstractMongoDBSshDriver implements MongoDBRouterDriver {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDBRouterSshDriver.class);
+
+    public MongoDBRouterSshDriver(MongoDBRouterImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+    
+    @Override
+    public void launch() {
+        String configdb = Joiner.on(",").join(getEntity().getConfig(MongoDBRouter.CONFIG_SERVERS));
+        ImmutableList.Builder<String> argsBuilder = getArgsBuilderWithDefaults(MongoDBRouterImpl.class.cast(getEntity()))
+                .add("--configdb", configdb);
+        
+        String args = Joiner.on(" ").join(argsBuilder.build());
+        String command = String.format("%s/bin/mongos %s > out.log 2> err.log < /dev/null", getExpandedInstallDir(), args);
+        LOG.info(command);
+        newScript(LAUNCHING)
+                .updateTaskAndFailOnNonZeroResultCode()
+                .body.append(command).execute();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java
new file mode 100644
index 0000000..edf7d7a
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardCluster.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+
+@ImplementedBy(MongoDBShardClusterImpl.class)
+public interface MongoDBShardCluster extends DynamicCluster {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
new file mode 100644
index 0000000..7eb2571
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBClientSupport;
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBReplicaSet;
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
+import brooklyn.location.Location;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+
+public class MongoDBShardClusterImpl extends DynamicClusterImpl implements MongoDBShardCluster {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardClusterImpl.class);
+    
+    // TODO: Need to use attributes for this in order to support brooklyn restart 
+    private Set<Entity> addedMembers = Sets.newConcurrentHashSet();
+
+    // TODO: Need to use attributes for this in order to support brooklyn restart 
+    private Set<Entity> addingMembers = Sets.newConcurrentHashSet();
+
+    /**
+     * For shard addition and removal.
+     * Used for retrying.
+     * 
+     * TODO Should use ExecutionManager.
+     */
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        EntitySpec<?> result = super.getMemberSpec();
+        if (result == null)
+            result = EntitySpec.create(MongoDBReplicaSet.class);
+        result.configure(DynamicClusterImpl.INITIAL_SIZE, getConfig(MongoDBShardedDeployment.SHARD_REPLICASET_SIZE));
+        return result;
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        subscribeToMembers(this, Startable.SERVICE_UP, new SensorEventListener<Boolean>() {
+            public void onEvent(SensorEvent<Boolean> event) {
+                addShards();
+            }
+        });
+
+        super.start(locations);
+        
+        MongoDBRouterCluster routers = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER);
+        subscribe(routers, MongoDBRouterCluster.ANY_RUNNING_ROUTER, new SensorEventListener<MongoDBRouter>() {
+            public void onEvent(SensorEvent<MongoDBRouter> event) {
+                if (event.getValue() != null)
+                    addShards();
+            }
+        });
+    }
+
+    @Override
+    public void stop() {
+        // TODO Note that after this the executor will not run if the set is restarted.
+        executor.shutdownNow();
+        super.stop();
+    }
+    
+    @Override
+    public void onManagementStopped() {
+        super.onManagementStopped();
+        executor.shutdownNow();
+    }
+
+    protected void addShards() {
+        MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
+        if (router == null) {
+            if (LOG.isTraceEnabled()) LOG.trace("Not adding shards because no running router in {}", this);
+            return;
+        }
+        
+        for (Entity member : this.getMembers()) {
+            if (member.getAttribute(Startable.SERVICE_UP) && !addingMembers.contains(member)) {
+                LOG.info("{} adding shard {}", new Object[] {MongoDBShardClusterImpl.this, member});
+                addingMembers.add(member);
+                addShardAsync(member);
+            }
+        }
+    }
+    
+    protected void addShardAsync(final Entity replicaSet) {
+        final Duration timeout = Duration.minutes(20);
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        final AtomicInteger attempts = new AtomicInteger();
+        
+        // TODO Don't use executor, use ExecutionManager; but following pattern in MongoDBReplicaSetImpl for now.
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                boolean reschedule;
+                MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
+                if (router == null) {
+                    LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", replicaSet, this);
+                    reschedule = true;
+                } else {
+                    MongoDBClientSupport client;
+                    try {
+                        client = MongoDBClientSupport.forServer(router);
+                    } catch (UnknownHostException e) {
+                        throw Exceptions.propagate(e);
+                    }
+                    
+                    MongoDBServer primary = replicaSet.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
+                    if (primary != null) {
+                        String addr = String.format("%s:%d", primary.getAttribute(MongoDBServer.SUBNET_HOSTNAME), primary.getAttribute(MongoDBServer.PORT));
+                        String replicaSetURL = ((MongoDBReplicaSet) replicaSet).getName() + "/" + addr;
+                        boolean added = client.addShardToRouter(replicaSetURL);
+                        if (added) {
+                            LOG.info("{} added shard {} via {}", new Object[] {MongoDBShardClusterImpl.this, replicaSetURL, router});
+                            addedMembers.add(replicaSet);
+                            reschedule = false;
+                        } else {
+                            LOG.debug("Rescheduling addition of shard {} because add failed via router {}", replicaSetURL, router);
+                            reschedule = true;
+                        }
+                    } else {
+                        LOG.debug("Rescheduling addition of shard {} because primary is null", replicaSet);
+                        reschedule = true;
+                    }
+                }
+                
+                if (reschedule) {
+                    int numAttempts = attempts.incrementAndGet();
+                    if (numAttempts > 1 && timeout.toMilliseconds() > stopwatch.elapsed(TimeUnit.MILLISECONDS)) {
+                        executor.schedule(this, 3, TimeUnit.SECONDS);
+                    } else {
+                        LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting", 
+                                new Object[] {numAttempts, Time.makeTimeStringRounded(stopwatch), replicaSet});
+                        addingMembers.remove(replicaSet);
+                    }
+                }
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java
new file mode 100644
index 0000000..3383887
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeployment.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import org.apache.brooklyn.catalog.Catalog;
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBReplicaSet;
+import org.apache.brooklyn.entity.nosql.mongodb.MongoDBServer;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.reflect.TypeToken;
+
+@Catalog(name="MongoDB Sharded Deployment",
+        description="MongoDB (from \"humongous\") is a scalable, high-performance, open source NoSQL database",
+        iconUrl="classpath:///mongodb-logo.png")
+@ImplementedBy(MongoDBShardedDeploymentImpl.class)
+public interface MongoDBShardedDeployment extends Entity, Startable {
+    @SetFromFlag("configClusterSize")
+    ConfigKey<Integer> CONFIG_CLUSTER_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.config.cluster.size", 
+            "Number of config servers", 3);
+    
+    @SetFromFlag("initialRouterClusterSize")
+    ConfigKey<Integer> INITIAL_ROUTER_CLUSTER_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.router.cluster.initial.size", 
+            "Initial number of routers (mongos)", 0);
+    
+    @SetFromFlag("initialShardClusterSize")
+    ConfigKey<Integer> INITIAL_SHARD_CLUSTER_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.shard.cluster.initial.size", 
+            "Initial number of shards (replicasets)", 2);
+    
+    @SetFromFlag("shardReplicaSetSize")
+    ConfigKey<Integer> SHARD_REPLICASET_SIZE = ConfigKeys.newIntegerConfigKey("mongodb.shard.replicaset.size", 
+            "Number of servers (mongod) in each shard (replicaset)", 3);
+    
+    @SetFromFlag("routerUpTimeout")
+    ConfigKey<Duration> ROUTER_UP_TIMEOUT = ConfigKeys.newConfigKey(Duration.class, "mongodb.router.up.timeout", 
+            "Maximum time to wait for the routers to become available before adding the shards", Duration.FIVE_MINUTES);
+    
+    @SetFromFlag("coLocatedRouterGroup")
+    ConfigKey<Group> CO_LOCATED_ROUTER_GROUP = ConfigKeys.newConfigKey(Group.class, "mongodb.colocated.router.group", 
+            "Group to be monitored for the addition of new CoLocatedMongoDBRouter entities");
+    
+    @SuppressWarnings("serial")
+    ConfigKey<EntitySpec<?>> MONGODB_ROUTER_SPEC = ConfigKeys.newConfigKey(
+            new TypeToken<EntitySpec<?>>() {},
+            "mongodb.router.spec", 
+            "Spec for Router instances",
+            EntitySpec.create(MongoDBRouter.class));
+
+    @SuppressWarnings("serial")
+    ConfigKey<EntitySpec<?>> MONGODB_REPLICA_SET_SPEC = ConfigKeys.newConfigKey(
+            new TypeToken<EntitySpec<?>>() {},
+            "mongodb.replicaset.spec", 
+            "Spec for Replica Set",
+            EntitySpec.create(MongoDBReplicaSet.class)
+                    .configure(MongoDBReplicaSet.MEMBER_SPEC, EntitySpec.create(MongoDBServer.class)));
+
+    @SuppressWarnings("serial")
+    ConfigKey<EntitySpec<?>> MONGODB_CONFIG_SERVER_SPEC = ConfigKeys.newConfigKey(
+            new TypeToken<EntitySpec<?>>() {},
+            "mongodb.configserver.spec", 
+            "Spec for Config Server instances",
+            EntitySpec.create(MongoDBConfigServer.class));
+
+    public static AttributeSensor<MongoDBConfigServerCluster> CONFIG_SERVER_CLUSTER = Sensors.newSensor(
+            MongoDBConfigServerCluster.class, "mongodbshardeddeployment.configservers", "Config servers");
+    public static AttributeSensor<MongoDBRouterCluster> ROUTER_CLUSTER = Sensors.newSensor(
+            MongoDBRouterCluster.class, "mongodbshardeddeployment.routers", "Routers");
+    
+    public static AttributeSensor<MongoDBShardCluster> SHARD_CLUSTER = Sensors.newSensor(
+            MongoDBShardCluster.class, "mongodbshardeddeployment.shards", "Shards");
+    
+    public MongoDBConfigServerCluster getConfigCluster();
+    
+    public MongoDBRouterCluster getRouterCluster();
+    
+    public MongoDBShardCluster getShardCluster();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java
new file mode 100644
index 0000000..74f0623
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardedDeploymentImpl.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.mongodb.sharding;
+
+import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.trait.Startable;
+import brooklyn.location.Location;
+import brooklyn.policy.PolicySpec;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class MongoDBShardedDeploymentImpl extends AbstractEntity implements MongoDBShardedDeployment {
+    
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardedDeploymentImpl.class);
+    
+    @Override
+    public void init() {
+        super.init();
+        
+        setAttribute(CONFIG_SERVER_CLUSTER, addChild(EntitySpec.create(MongoDBConfigServerCluster.class)
+                .configure(MongoDBConfigServerCluster.MEMBER_SPEC, getConfig(MONGODB_CONFIG_SERVER_SPEC))
+                .configure(DynamicCluster.INITIAL_SIZE, getConfig(CONFIG_CLUSTER_SIZE))));
+        setAttribute(ROUTER_CLUSTER, addChild(EntitySpec.create(MongoDBRouterCluster.class)
+                .configure(MongoDBRouterCluster.MEMBER_SPEC, getConfig(MONGODB_ROUTER_SPEC))
+                .configure(DynamicCluster.INITIAL_SIZE, getConfig(INITIAL_ROUTER_CLUSTER_SIZE))
+                .configure(MongoDBRouter.CONFIG_SERVERS, attributeWhenReady(getAttribute(CONFIG_SERVER_CLUSTER), MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES))));
+        setAttribute(SHARD_CLUSTER, addChild(EntitySpec.create(MongoDBShardCluster.class)
+                .configure(MongoDBShardCluster.MEMBER_SPEC, getConfig(MONGODB_REPLICA_SET_SPEC))
+                .configure(DynamicCluster.INITIAL_SIZE, getConfig(INITIAL_SHARD_CLUSTER_SIZE))));
+        addEnricher(Enrichers.builder()
+                .propagating(MongoDBConfigServerCluster.CONFIG_SERVER_ADDRESSES)
+                .from(getAttribute(CONFIG_SERVER_CLUSTER))
+                .build());
+        
+        ServiceNotUpLogic.updateNotUpIndicator(this, Attributes.SERVICE_STATE_ACTUAL, "stopped");
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
+        try {
+            final MongoDBRouterCluster routers = getAttribute(ROUTER_CLUSTER);
+            final MongoDBShardCluster shards = getAttribute(SHARD_CLUSTER);
+            List<DynamicCluster> clusters = ImmutableList.of(getAttribute(CONFIG_SERVER_CLUSTER), routers, shards);
+            Entities.invokeEffectorList(this, clusters, Startable.START, ImmutableMap.of("locations", locations))
+                .get();
+
+            if (getConfigRaw(MongoDBShardedDeployment.CO_LOCATED_ROUTER_GROUP, true).isPresent()) {
+                addPolicy(PolicySpec.create(ColocatedRouterTrackingPolicy.class)
+                        .displayName("Co-located router tracker")
+                        .configure("group", (Group)getConfig(MongoDBShardedDeployment.CO_LOCATED_ROUTER_GROUP)));
+            }
+            ServiceNotUpLogic.clearNotUpIndicator(this, Attributes.SERVICE_STATE_ACTUAL);
+            ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING);
+        } catch (Exception e) {
+            ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+            // no need to log here; the effector invocation should do that
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    public static class ColocatedRouterTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override
+        protected void onEntityAdded(Entity member) {
+            MongoDBRouterCluster cluster = entity.getAttribute(ROUTER_CLUSTER);
+            cluster.addMember(member.getAttribute(CoLocatedMongoDBRouter.ROUTER));
+        }
+        @Override
+        protected void onEntityRemoved(Entity member) {
+            MongoDBRouterCluster cluster = entity.getAttribute(ROUTER_CLUSTER);
+            cluster.removeMember(member.getAttribute(CoLocatedMongoDBRouter.ROUTER));
+        }
+    };
+
+    @Override
+    public void stop() {
+        ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING);
+        try {
+            Entities.invokeEffectorList(this, ImmutableList.of(getAttribute(CONFIG_SERVER_CLUSTER), getAttribute(ROUTER_CLUSTER), 
+                    getAttribute(SHARD_CLUSTER)), Startable.STOP).get();
+        } catch (Exception e) {
+            ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+            throw Exceptions.propagate(e);
+        }
+        ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPED);
+        ServiceNotUpLogic.updateNotUpIndicator(this, Attributes.SERVICE_STATE_ACTUAL, "stopped");
+    }
+    
+    @Override
+    public void restart() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public MongoDBConfigServerCluster getConfigCluster() {
+        return getAttribute(CONFIG_SERVER_CLUSTER);
+    }
+
+    @Override
+    public MongoDBRouterCluster getRouterCluster() {
+        return getAttribute(ROUTER_CLUSTER);
+    }
+
+    @Override
+    public MongoDBShardCluster getShardCluster() {
+        return getAttribute(SHARD_CLUSTER);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java
new file mode 100644
index 0000000..26f4f1c
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisCluster.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.entity.Entity;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.entity.trait.Startable;
+
+/**
+ * A cluster of {@link RedisStore}s with one master and a group of slaves.
+ *
+ * The slaves are contained in a {@link DynamicCluster} which can be resized by a policy if required.
+ *
+ * TODO add sensors with aggregated Redis statistics from cluster
+ */
+@Catalog(name="Redis Cluster", description="Redis is an open-source, networked, in-memory, key-value data store with optional durability", iconUrl="classpath:///redis-logo.png")
+@ImplementedBy(RedisClusterImpl.class)
+public interface RedisCluster extends Entity, Startable {
+    
+    public RedisStore getMaster();
+    
+    public DynamicCluster getSlaves();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java
new file mode 100644
index 0000000..39c9dbe
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisClusterImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import java.util.Collection;
+
+import brooklyn.enricher.Enrichers;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.location.Location;
+import brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import brooklyn.util.exceptions.Exceptions;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class RedisClusterImpl extends AbstractEntity implements RedisCluster {
+
+    private static AttributeSensor<RedisStore> MASTER = Sensors.newSensor(RedisStore.class, "redis.master");
+    private static AttributeSensor<DynamicCluster> SLAVES = Sensors.newSensor(DynamicCluster.class, "redis.slaves");
+
+    public RedisClusterImpl() {
+    }
+
+    @Override
+    public RedisStore getMaster() {
+        return getAttribute(MASTER);
+    }
+    
+    @Override
+    public DynamicCluster getSlaves() {
+        return getAttribute(SLAVES);
+    }
+
+    @Override
+    public void init() {
+        super.init();
+
+        RedisStore master = addChild(EntitySpec.create(RedisStore.class));
+        setAttribute(MASTER, master);
+
+        DynamicCluster slaves = addChild(EntitySpec.create(DynamicCluster.class)
+                .configure(DynamicCluster.MEMBER_SPEC, EntitySpec.create(RedisSlave.class).configure(RedisSlave.MASTER, master)));
+        setAttribute(SLAVES, slaves);
+
+        addEnricher(Enrichers.builder()
+                .propagating(RedisStore.HOSTNAME, RedisStore.ADDRESS, RedisStore.SUBNET_HOSTNAME, RedisStore.SUBNET_ADDRESS, RedisStore.REDIS_PORT)
+                .from(master)
+                .build());
+    }
+
+    @Override
+    protected void initEnrichers() {
+        super.initEnrichers();
+        ServiceStateLogic.newEnricherFromChildrenUp().
+            checkChildrenOnly().
+            requireUpChildren(QuorumChecks.all()).
+            configure(ComputeServiceIndicatorsFromChildrenAndMembers.IGNORE_ENTITIES_WITH_THESE_SERVICE_STATES, ImmutableSet.<Lifecycle>of()).
+            addTo(this);
+    }
+    
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        ServiceStateLogic.setExpectedState(this, Lifecycle.STARTING);
+        ServiceProblemsLogic.clearProblemsIndicator(this, START);
+        try {
+            doStart(locations);
+            ServiceStateLogic.setExpectedState(this, Lifecycle.RUNNING);
+        } catch (Exception e) {
+            ServiceProblemsLogic.updateProblemsIndicator(this, START, "Start failed with error: "+e);
+            ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    private void doStart(Collection<? extends Location> locations) {
+        RedisStore master = getMaster();
+        master.invoke(RedisStore.START, ImmutableMap.<String, Object>of("locations", ImmutableList.copyOf(locations))).getUnchecked();
+
+        DynamicCluster slaves = getSlaves();
+        slaves.invoke(DynamicCluster.START, ImmutableMap.<String, Object>of("locations", ImmutableList.copyOf(locations))).getUnchecked();
+    }
+
+    @Override
+    public void stop() {
+        ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPING);
+        try {
+            doStop();
+            ServiceStateLogic.setExpectedState(this, Lifecycle.STOPPED);
+        } catch (Exception e) {
+            ServiceProblemsLogic.updateProblemsIndicator(this, STOP, "Stop failed with error: "+e);
+            ServiceStateLogic.setExpectedState(this, Lifecycle.ON_FIRE);
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    private void doStop() {
+        getSlaves().invoke(DynamicCluster.STOP, ImmutableMap.<String, Object>of()).getUnchecked();
+        getMaster().invoke(RedisStore.STOP, ImmutableMap.<String, Object>of()).getUnchecked();
+    }
+
+    @Override
+    public void restart() {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java
new file mode 100644
index 0000000..09d71b3
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShard.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.proxying.ImplementedBy;
+
+@ImplementedBy(RedisShardImpl.class)
+public interface RedisShard extends Entity {
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java
new file mode 100644
index 0000000..87396f5
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisShardImpl.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import brooklyn.entity.basic.AbstractEntity;
+
+public class RedisShardImpl extends AbstractEntity implements RedisShard {
+    public RedisShardImpl() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java
new file mode 100644
index 0000000..af91beb
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlave.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * A {@link RedisStore} configured as a slave.
+ */
+@ImplementedBy(RedisSlaveImpl.class)
+public interface RedisSlave extends RedisStore {
+
+    @SetFromFlag("master")
+    ConfigKey<RedisStore> MASTER = new BasicConfigKey<RedisStore>(RedisStore.class, "redis.master", "Redis master");
+
+    @SetFromFlag("redisConfigTemplateUrl")
+    ConfigKey<String> REDIS_CONFIG_TEMPLATE_URL = new BasicConfigKey<String>(
+            String.class, "redis.config.templateUrl", "Template file (in freemarker format) for the redis.conf config file", 
+            "classpath://org/apache/brooklyn/entity/nosql/redis/slave.conf");
+
+    RedisStore getMaster();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java
new file mode 100644
index 0000000..b58ce7d
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisSlaveImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+
+/**
+ * A {@link RedisStore} configured as a slave.
+ */
+public class RedisSlaveImpl extends RedisStoreImpl implements RedisSlave {
+
+    public RedisSlaveImpl() {
+    }
+
+    @Override
+    public RedisStore getMaster() {
+        return getConfig(MASTER);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java
new file mode 100644
index 0000000..8d2cef1
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStore.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+
+/**
+ * An entity that represents a Redis key-value store service.
+ */
+@Catalog(name="Redis Server", description="Redis is an open-source, networked, in-memory, key-value data store with optional durability", iconUrl="classpath:///redis-logo.png")
+@ImplementedBy(RedisStoreImpl.class)
+public interface RedisStore extends SoftwareProcess {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION =
+            ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "2.8.4");
+
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, "http://download.redis.io/releases/redis-${version}.tar.gz");
+
+    @SetFromFlag("redisPort")
+    PortAttributeSensorAndConfigKey REDIS_PORT = new PortAttributeSensorAndConfigKey("redis.port", "Redis port number", "6379+");
+
+    @SetFromFlag("redisConfigTemplateUrl")
+    ConfigKey<String> REDIS_CONFIG_TEMPLATE_URL = ConfigKeys.newConfigKey(
+            "redis.config.templateUrl", "Template file (in freemarker format) for the redis.conf config file", 
+            "classpath://org/apache/brooklyn/entity/nosql/redis/redis.conf");
+
+    AttributeSensor<Integer> UPTIME = Sensors.newIntegerSensor("redis.uptime", "Redis uptime in seconds");
+
+    // See http://redis.io/commands/info for details of all information available
+    AttributeSensor<Integer> TOTAL_CONNECTIONS_RECEIVED = Sensors.newIntegerSensor("redis.connections.received.total", "Total number of connections accepted by the server");
+    AttributeSensor<Integer> TOTAL_COMMANDS_PROCESSED = Sensors.newIntegerSensor("redis.commands.processed.total", "Total number of commands processed by the server");
+    AttributeSensor<Integer> EXPIRED_KEYS = Sensors.newIntegerSensor("redis.keys.expired", "Total number of key expiration events");
+    AttributeSensor<Integer> EVICTED_KEYS = Sensors.newIntegerSensor("redis.keys.evicted", "Number of evicted keys due to maxmemory limit");
+    AttributeSensor<Integer> KEYSPACE_HITS = Sensors.newIntegerSensor("redis.keyspace.hits", "Number of successful lookup of keys in the main dictionary");
+    AttributeSensor<Integer> KEYSPACE_MISSES = Sensors.newIntegerSensor("redis.keyspace.misses", "Number of failed lookup of keys in the main dictionary");
+
+    String getAddress();
+
+    Integer getRedisPort();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java
new file mode 100644
index 0000000..ba77cfd
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import brooklyn.entity.basic.SoftwareProcessDriver;
+
+public interface RedisStoreDriver extends SoftwareProcessDriver {
+
+    String getRunDir();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java
new file mode 100644
index 0000000..f556bcf
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.event.feed.ssh.SshFeed;
+import brooklyn.event.feed.ssh.SshPollConfig;
+import brooklyn.event.feed.ssh.SshPollValue;
+import brooklyn.event.feed.ssh.SshValueFunctions;
+import brooklyn.location.Location;
+import brooklyn.location.MachineLocation;
+import brooklyn.location.basic.SshMachineLocation;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+/**
+ * An entity that represents a Redis key-value store service.
+ */
+public class RedisStoreImpl extends SoftwareProcessImpl implements RedisStore {
+    @SuppressWarnings("unused")
+    private static final Logger LOG = LoggerFactory.getLogger(RedisStore.class);
+
+    private transient SshFeed sshFeed;
+
+    public RedisStoreImpl() {
+    }
+
+    @Override
+    protected void connectSensors() {
+        super.connectSensors();
+
+        connectServiceUpIsRunning();
+
+        // Find an SshMachineLocation for the UPTIME feed
+        Optional<Location> location = Iterables.tryFind(getLocations(), Predicates.instanceOf(SshMachineLocation.class));
+        if (!location.isPresent()) throw new IllegalStateException("Could not find SshMachineLocation in list of locations");
+        SshMachineLocation machine = (SshMachineLocation) location.get();
+        String statsCommand = getDriver().getRunDir() + "/bin/redis-cli -p " + getRedisPort() + " info stats";
+
+        sshFeed = SshFeed.builder()
+                .entity(this)
+                .machine(machine)
+                .period(5, TimeUnit.SECONDS)
+                .poll(new SshPollConfig<Integer>(UPTIME)
+                        .command(getDriver().getRunDir() + "/bin/redis-cli -p " + getRedisPort() + " info server")
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("uptime_in_seconds")))
+                .poll(new SshPollConfig<Integer>(TOTAL_CONNECTIONS_RECEIVED)
+                        .command(statsCommand)
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("total_connections_received")))
+                .poll(new SshPollConfig<Integer>(TOTAL_COMMANDS_PROCESSED)
+                        .command(statsCommand)
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("total_commands_processed")))
+                .poll(new SshPollConfig<Integer>(EXPIRED_KEYS)
+                        .command(statsCommand)
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("expired_keys")))
+                .poll(new SshPollConfig<Integer>(EVICTED_KEYS)
+                        .command(statsCommand)
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("evicted_keys")))
+                .poll(new SshPollConfig<Integer>(KEYSPACE_HITS)
+                        .command(statsCommand)
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("keyspace_hits")))
+                .poll(new SshPollConfig<Integer>(KEYSPACE_MISSES)
+                        .command(statsCommand)
+                        .onFailureOrException(Functions.constant(-1))
+                        .onSuccess(infoFunction("keyspace_misses")))
+                .build();
+    }
+
+    /**
+     * Create a {@link Function} to retrieve a particular field value from a {@code redis-cli info}
+     * command.
+     * 
+     * @param field the info field to retrieve and convert
+     * @return a new function that converts a {@link SshPollValue} to an {@link Integer}
+     */
+    private static Function<SshPollValue, Integer> infoFunction(final String field) {
+        return Functions.compose(new Function<String, Integer>() {
+            @Override
+            public Integer apply(@Nullable String input) {
+                Optional<String> line = Iterables.tryFind(Splitter.on('\n').split(input), Predicates.containsPattern(field + ":"));
+                if (line.isPresent()) {
+                    String data = line.get().trim();
+                    int colon = data.indexOf(":");
+                    return Integer.parseInt(data.substring(colon + 1));
+                } else {
+                    throw new IllegalStateException("Data for field "+field+" not found: "+input);
+                }
+            }
+        }, SshValueFunctions.stdout());
+    }
+
+    @Override
+    public void disconnectSensors() {
+        disconnectServiceUpIsRunning();
+        if (sshFeed != null) sshFeed.stop();
+        super.disconnectSensors();
+    }
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return RedisStoreDriver.class;
+    }
+
+    @Override
+    public RedisStoreDriver getDriver() {
+        return (RedisStoreDriver) super.getDriver();
+    }
+
+    @Override
+    public String getAddress() {
+        MachineLocation machine = getMachineOrNull();
+        return (machine != null) ? machine.getAddress().getHostAddress() : null;
+    }
+
+    @Override
+    public Integer getRedisPort() {
+        return getAttribute(RedisStore.REDIS_PORT);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java
new file mode 100644
index 0000000..c362e4e
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/redis/RedisStoreSshDriver.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.redis;
+
+import static java.lang.String.format;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.AbstractSoftwareProcessSshDriver;
+import brooklyn.entity.basic.Entities;
+import brooklyn.location.Location;
+import brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Start a {@link RedisStore} in a {@link Location} accessible over ssh.
+ */
+public class RedisStoreSshDriver extends AbstractSoftwareProcessSshDriver implements RedisStoreDriver {
+
+    private static final Logger LOG = LoggerFactory.getLogger(RedisStoreSshDriver.class);
+
+    public RedisStoreSshDriver(RedisStoreImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("redis-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        MutableMap<String, String> installGccPackageFlags = MutableMap.of(
+                "onlyifmissing", "gcc",
+                "yum", "gcc",
+                "apt", "gcc",
+                "port", null);
+        MutableMap<String, String> installMakePackageFlags = MutableMap.of(
+                "onlyifmissing", "make",
+                "yum", "make",
+                "apt", "make",
+                "port", null);
+
+        List<String> commands = ImmutableList.<String>builder()
+                .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+                .add(BashCommands.INSTALL_TAR)
+                .add(BashCommands.INSTALL_CURL)
+                .add(BashCommands.installPackage(installGccPackageFlags, "redis-prerequisites-gcc"))
+                .add(BashCommands.installPackage(installMakePackageFlags, "redis-prerequisites-make"))
+                .add("tar xzfv " + saveAs)
+                .add(format("cd redis-%s", getVersion()))
+                .add("pushd deps")
+                .add("make lua hiredis linenoise")
+                .add("popd")
+                .add("make clean && make")
+                .build();
+
+        newScript(INSTALLING)
+                .failOnNonZeroResultCode()
+                .body.append(commands).execute();
+    }
+
+    @Override
+    public void customize() {
+        newScript(MutableMap.of("usePidFile", false), CUSTOMIZING)
+                .failOnNonZeroResultCode()
+                .body.append(
+                        format("cd %s", getExpandedInstallDir()),
+                        "make install PREFIX="+getRunDir())
+                .execute();
+
+        copyTemplate(getEntity().getConfig(RedisStore.REDIS_CONFIG_TEMPLATE_URL), "redis.conf");
+    }
+
+    @Override
+    public void launch() {
+        // TODO Should we redirect stdout/stderr: format(" >> %s/console 2>&1 </dev/null &", getRunDir())
+        newScript(MutableMap.of("usePidFile", false), LAUNCHING)
+                .failOnNonZeroResultCode()
+                .body.append("./bin/redis-server redis.conf")
+                .execute();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of("usePidFile", false), CHECK_RUNNING)
+                .body.append("./bin/redis-cli -p " + getEntity().getAttribute(RedisStore.REDIS_PORT) + " ping > /dev/null")
+                .execute() == 0;
+    }
+
+    /**
+     * Restarts redis with the current configuration.
+     */
+    @Override
+    public void stop() {
+        int exitCode = newScript(MutableMap.of("usePidFile", false), STOPPING)
+                .body.append("./bin/redis-cli -p " + getEntity().getAttribute(RedisStore.REDIS_PORT) + " shutdown")
+                .execute();
+        // TODO: Good enough? Will cause warnings when trying to stop a server that is already not running.
+        if (exitCode != 0) {
+            LOG.warn("Unexpected exit code when stopping {}: {}", entity, exitCode);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d5cf5285/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java
new file mode 100644
index 0000000..99df1a2
--- /dev/null
+++ b/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/riak/RiakCluster.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.nosql.riak;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.brooklyn.catalog.Catalog;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.util.flags.SetFromFlag;
+import brooklyn.util.time.Duration;
+
+import com.google.common.reflect.TypeToken;
+
+@Catalog(name="Riak Cluster", description="Riak is a distributed NoSQL key-value data store that offers "
+        + "extremely high availability, fault tolerance, operational simplicity and scalability.")
+@ImplementedBy(RiakClusterImpl.class)
+public interface RiakCluster extends DynamicCluster {
+
+    @SuppressWarnings("serial")
+    AttributeSensor<Map<Entity, String>> RIAK_CLUSTER_NODES = Sensors.newSensor(
+            new TypeToken<Map<Entity, String>>() {}, 
+            "riak.cluster.nodes", "Names of all active Riak nodes in the cluster <Entity,Riak Name>");
+
+    @SetFromFlag("delayBeforeAdvertisingCluster")
+    ConfigKey<Duration> DELAY_BEFORE_ADVERTISING_CLUSTER = ConfigKeys.newConfigKey(Duration.class, "riak.cluster.delayBeforeAdvertisingCluster", "Delay after cluster is started before checking and advertising its availability", Duration.seconds(2 * 60));
+
+    AttributeSensor<Boolean> IS_CLUSTER_INIT = Sensors.newBooleanSensor("riak.cluster.isClusterInit", "Flag to determine if the cluster was already initialized");
+
+    AttributeSensor<Boolean> IS_FIRST_NODE_SET = Sensors.newBooleanSensor("riak.cluster.isFirstNodeSet", "Flag to determine if the first node has been set");
+
+    AttributeSensor<String> NODE_LIST = Sensors.newStringSensor("riak.cluster.nodeList", "List of nodes (including ports), comma separated");
+
+    AttributeSensor<String> NODE_LIST_PB_PORT = Sensors.newStringSensor("riak.cluster.nodeListPbPort", "List of nodes (including ports for riak db clients), comma separated");
+
+    AttributeSensor<URI> RIAK_CONSOLE_URI = Attributes.MAIN_URI;
+
+    AttributeSensor<Integer> NODE_GETS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.gets.1m.perNode", "Gets in the last minute, averaged across cluster");
+    AttributeSensor<Integer> NODE_PUTS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.puts.1m.perNode", "Puts in the last minute, averaged across cluster");
+    AttributeSensor<Integer> NODE_OPS_1MIN_PER_NODE = Sensors.newIntegerSensor("riak.node.ops.1m.perNode", "Sum of node gets and puts in the last minute, averaged across cluster");
+
+}