You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:01:01 UTC

[46/64] incubator-brooklyn git commit: brooklyn-software-messaging: add org.apache package prefix

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
new file mode 100644
index 0000000..1f108a0
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
+import brooklyn.event.basic.DependentConfiguration;
+import org.apache.brooklyn.location.basic.Machines;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class StormSshDriver extends JavaSoftwareProcessSshDriver implements StormDriver {
+
+    private static final Logger log = LoggerFactory.getLogger(StormSshDriver.class);
+
+    public StormSshDriver(EntityLocal entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    public String getRoleName() {
+        return entity.getConfig(Storm.ROLE).name().toLowerCase();
+    }
+
+    public String getZeromqVersion() {
+        return entity.getConfig(Storm.ZEROMQ_VERSION);
+    }
+
+    public String getLocalDir() {
+        return Optional.fromNullable(entity.getConfig(Storm.LOCAL_DIR)).or(Os.mergePathsUnix(getRunDir(), "storm"));
+    }
+
+    public String getNimbusHostname() {
+        String result = entity.getConfig(Storm.NIMBUS_HOSTNAME);
+        if (result != null) return result;
+
+        Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
+        if (nimbus == null) {
+            log.warn("No nimbus hostname available; using 'localhost'");
+            return "localhost";
+        }
+        return Entities.submit(entity, DependentConfiguration.attributeWhenReady(nimbus, Attributes.HOSTNAME)).getUnchecked();
+    }
+
+    public Integer getUiPort() {
+        return entity.getAttribute(Storm.UI_PORT);
+    }
+
+    public Map<String, Integer> getPortMap() {
+        return MutableMap.of("uiPort", getUiPort());
+    }
+
+    @Override
+    protected List<String> getCustomJavaConfigOptions() {
+        List<String> result = super.getCustomJavaConfigOptions();
+        if ("nimbus".equals(getRoleName()) || "supervisor".equals(getRoleName())) {
+            result.add("-verbose:gc");
+            result.add("-XX:+PrintGCTimeStamps");
+            result.add("-XX:+PrintGCDetails");
+        }
+
+        if ("ui".equals(getRoleName())) {
+            result.add("-Xmx768m");
+        }
+
+        return result;
+    }
+
+    public String getJvmOptsLine() {
+        return Optional.fromNullable(getShellEnvironment().get("JAVA_OPTS")).or("");
+    }
+    
+    public List<String> getZookeeperServers() {
+        ZooKeeperEnsemble zooKeeperEnsemble = entity.getConfig(Storm.ZOOKEEPER_ENSEMBLE);
+        Supplier<List<String>> supplier = Entities.attributeSupplierWhenReady(zooKeeperEnsemble, ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
+        return supplier.get();
+    }
+
+    public String getStormConfigTemplateUrl() {
+        return entity.getConfig(Storm.STORM_CONFIG_TEMPLATE_URL);
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("storm-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+        
+        ImmutableList.Builder<String> commands= ImmutableList.<String> builder();
+        if (!getLocation().getOsDetails().isMac()) {
+            commands.add(BashCommands.installPackage(ImmutableMap.of(
+                        "yum", "libuuid-devel",
+                        "apt", "build-essential uuid-dev pkg-config libtool automake"), 
+                    "libuuid-devel"));
+            commands.add(BashCommands.ifExecutableElse0("yum", BashCommands.sudo("yum -y groupinstall 'Development Tools'")));
+        }
+        commands.add(BashCommands.installPackage(ImmutableMap.of("yum", "git"), "git"))
+                .add(BashCommands.INSTALL_UNZIP)
+                .addAll(installNativeDependencies())
+                .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+                .add("unzip " + saveAs)
+                .add("mkdir -p " + getLocalDir())
+                .add("chmod 777 " + getLocalDir()); // FIXME
+        newScript(INSTALLING)
+                .body.append(commands.build())
+                .gatherOutput()
+                .execute();
+    }
+
+    public String getPidFile() {
+        return Os.mergePathsUnix(getRunDir(), format("%s.pid", getRoleName()));
+    }
+
+    @Override
+    protected String getLogFileLocation() {
+        return Os.mergePathsUnix(getRunDir(), "logs", format("%s.log", getRoleName()));
+    }
+
+    @Override
+    public void launch() {
+        boolean needsSleep = false;
+        if (getRoleName().equals("supervisor")) {
+            Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
+            if (nimbus == null) {
+                log.warn("No nimbus entity available; not blocking before starting supervisors");
+            } else {
+                Entities.waitForServiceUp(nimbus, entity.getConfig(SoftwareProcess.START_TIMEOUT));
+                needsSleep = true;
+            }
+        }
+
+        String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
+        log.info("Launching " + entity + " with role " + getRoleName() + " and " + "hostname (public) " 
+                + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + "hostname (subnet) " + subnetHostname + ")");
+
+        // ensure only one node at a time tries to start
+        // attempting to eliminate the causes of:
+        // 2013-12-12 09:21:45 supervisor [ERROR] Error on initialization of server mk-supervisor
+        // org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /assignments
+        // TODO use SoftwareProcess#START_LATCH instead here?
+
+        Object startMutex = Optional.fromNullable(entity.getConfig(Storm.START_MUTEX)).or(new Object());
+        synchronized (startMutex) {
+            if (needsSleep) {
+                // give 10s extra to make sure nimbus is ready; we see weird zookeeper no /assignments node error otherwise
+                // (this could be optimized by recording nimbus service_up time)
+                Time.sleep(Duration.TEN_SECONDS);
+            }
+            newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
+                    .body.append(format("nohup ./bin/storm %s > %s 2>&1 &", getRoleName(), getLogFileLocation()))
+                    .execute();
+        }
+    }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
+    }
+
+    @Override
+    public void customize() {
+        log.debug("Customizing {}", entity);
+        Networking.checkPortsValid(getPortMap());
+
+        newScript(CUSTOMIZING)
+                .body.append(format("cp -R %s/* .", getExpandedInstallDir()))
+                .execute();
+
+        String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/storm.yaml");
+        copyTemplate(getStormConfigTemplateUrl(), destinationConfigFile);
+    }
+
+    protected List<String> installNativeDependencies() {
+        String zeromqUrl = format("http://download.zeromq.org/zeromq-%s.tar.gz", getZeromqVersion());
+        String targz = format("zeromq-%s.tar.gz", getZeromqVersion());
+        String jzmq = "https://github.com/nathanmarz/jzmq.git";
+
+        ImmutableList.Builder<String> commands = ImmutableList.<String>builder();
+        if (getLocation().getOsDetails().isMac()) {
+            commands.add("export PATH=$PATH:/usr/local/bin")
+                   .add("export JAVA_HOME=$(/usr/libexec/java_home)")
+                   .add("cd " + getInstallDir())
+                   .add(BashCommands.installPackage(ImmutableMap.of("brew", "automake"), "make"))
+                   .add(BashCommands.installPackage(ImmutableMap.of("brew", "libtool"), "libtool"))
+                   .add(BashCommands.installPackage(ImmutableMap.of("brew", "pkg-config"), "pkg-config"))
+                   .add(BashCommands.installPackage(ImmutableMap.of("brew", "zeromq"), "zeromq"))
+                   .add("git clone https://github.com/asmaier/jzmq")
+                   .add("cd jzmq")
+                   .add("./autogen.sh")
+                   .add("./configure")
+                   .add("make")
+                   .add((BashCommands.sudo("make install")))
+                   .add("cd " + getInstallDir());
+        } else {
+            commands.add("export JAVA_HOME=$(dirname $(readlink -m `which java`))/../../ || export JAVA_HOME=/usr/lib/jvm/java")
+                   .add("cd " + getInstallDir())
+                   .add(BashCommands.commandToDownloadUrlAs(zeromqUrl, targz))
+                   .add("tar xzf " + targz)
+                   .add(format("cd zeromq-%s", getZeromqVersion()))
+                   .add("./configure")
+                   .add("make")
+                   .add((BashCommands.sudo("make install")))
+                   // install jzmq
+                   .add("cd " + getInstallDir())
+                   .add("git clone " + jzmq)
+                   .add("cd jzmq")
+                   .add("./autogen.sh")
+                   .add("./configure")
+                           
+                   // hack needed on ubuntu 12.04; ignore if it fails
+                   // see https://github.com/zeromq/jzmq/issues/114
+                   .add(BashCommands.ok(
+                       "pushd src ; touch classdist_noinst.stamp ; CLASSPATH=.:./.:$CLASSPATH "
+                       + "javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java"))
+                   .add(BashCommands.ok("popd"))
+
+                   .add("make")
+                   .add((BashCommands.sudo("make install")))
+                   .add("cd " + getInstallDir());
+        }
+        return commands.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
new file mode 100644
index 0000000..6cb5ab0
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance.
+ */
+public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl implements ZooKeeperNode {
+
+    @SuppressWarnings("unused")
+    private static final Logger log = LoggerFactory.getLogger(AbstractZooKeeperImpl.class);
+    private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
+
+    private volatile JmxFeed jmxFeed;
+
+    public AbstractZooKeeperImpl() {
+    }
+
+    @Override
+    public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
+
+    @Override
+    public String getHostname() { return getAttribute(HOSTNAME); }
+
+    @Override
+    public void waitForServiceUp(long duration, TimeUnit units) {
+        super.waitForServiceUp(duration, units);
+
+        if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+            // Wait for the MBean to exist
+            JmxHelper helper = new JmxHelper(this);
+            try {
+                helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
+            } finally {
+                helper.terminate();
+            }
+        }
+    }
+
+    @Override
+    protected void connectSensors() {
+        connectServiceUpIsRunning();
+
+        if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+            jmxFeed = JmxFeed.builder()
+                .entity(this)
+                .period(500, TimeUnit.MILLISECONDS)
+                .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
+                        .objectName(ZOOKEEPER_MBEAN)
+                        .attributeName("OutstandingRequests")
+                        .onFailureOrException(Functions.constant(-1l)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
+                        .objectName(ZOOKEEPER_MBEAN)
+                        .attributeName("PacketsReceived")
+                        .onFailureOrException(Functions.constant(-1l)))
+                .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
+                        .objectName(ZOOKEEPER_MBEAN)
+                        .attributeName("PacketsSent")
+                        .onFailureOrException(Functions.constant(-1l)))
+                .build();
+        }
+    }
+
+    @Override
+    public void disconnectSensors() {
+        super.disconnectSensors();
+        disconnectServiceUpIsRunning();
+        if (jmxFeed != null) jmxFeed.stop();
+    }
+
+    @Override
+    protected ToStringHelper toStringHelper() {
+        return super.toStringHelper()
+                .add("zookeeperPort", getZookeeperPort());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
new file mode 100644
index 0000000..36388f0
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface ZooKeeperDriver extends JavaSoftwareProcessDriver {
+
+    Integer getZooKeeperPort();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
new file mode 100644
index 0000000..d29bc91
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+
+import com.google.common.reflect.TypeToken;
+
+@Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. "
+        + "Apache ZooKeeper enables highly reliable distributed coordination.")
+@ImplementedBy(ZooKeeperEnsembleImpl.class)
+public interface ZooKeeperEnsemble extends DynamicCluster {
+
+    @SetFromFlag("clusterName")
+    BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String
+            .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
+
+    @SetFromFlag("initialSize")
+    public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3);
+
+    @SuppressWarnings("serial")
+    AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>() { },
+            "zookeeper.servers", "Hostnames to connect to cluster with");
+
+    String getClusterName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
new file mode 100644
index 0000000..f9ce930
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+
+import com.google.common.collect.Lists;
+
+public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble {
+
+    private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class);
+    private static final AtomicInteger myId = new AtomicInteger();
+    
+    private MemberTrackingPolicy policy;
+
+    public ZooKeeperEnsembleImpl() {}
+
+    /**
+     * Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes.
+     */
+    @Override
+    protected EntitySpec<?> getMemberSpec() {
+        return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
+    }
+
+    @Override
+    public String getClusterName() {
+        return getAttribute(CLUSTER_NAME);
+    }
+
+    @Override
+    public void init() {
+        log.info("Initializing the ZooKeeper Ensemble");
+        super.init();
+
+        policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Members tracker")
+                .configure("group", this));
+    }
+
+    public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+        @Override
+        protected void onEntityChange(Entity member) {
+        }
+
+        @Override
+        protected void onEntityAdded(Entity member) {
+            if (member.getAttribute(ZooKeeperNode.MY_ID) == null) {
+                ((EntityInternal) member).setAttribute(ZooKeeperNode.MY_ID, myId.incrementAndGet());
+            }
+        }
+
+        @Override
+        protected void onEntityRemoved(Entity member) {
+        }
+    };
+
+    @Override
+    protected void initEnrichers() {
+        super.initEnrichers();
+        
+    }
+    
+    @Override
+    public void start(Collection<? extends Location> locations) {
+        super.start(locations);
+        
+        List<String> zookeeperServers = Lists.newArrayList();
+        for (Entity zookeeper : getMembers()) {
+            zookeeperServers.add(zookeeper.getAttribute(Attributes.HOSTNAME));
+        }
+        setAttribute(ZOOKEEPER_SERVERS, zookeeperServers);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
new file mode 100644
index 0000000..6a67394
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance.
+ */
+@Catalog(name="ZooKeeper Node", description="Apache ZooKeeper is a server which enables "
+        + "highly reliable distributed coordination.")
+@ImplementedBy(ZooKeeperNodeImpl.class)
+public interface ZooKeeperNode extends SoftwareProcess {
+
+    @SetFromFlag("version")
+    ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.5");
+    @SetFromFlag("zookeeperPort")
+    PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
+    @SetFromFlag("zookeeperLeaderPort")
+    PortAttributeSensorAndConfigKey ZOOKEEPER_LEADER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.leader.port", "Zookeeper leader ports", "2888+");
+    @SetFromFlag("zookeeperElectionPort")
+    PortAttributeSensorAndConfigKey ZOOKEEPER_ELECTION_PORT = new PortAttributeSensorAndConfigKey("zookeeper.election.port", "Zookeeper election ports", "3888+");
+    @SetFromFlag("downloadUrl")
+    BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+            SoftwareProcess.DOWNLOAD_URL, "http://apache.fastbull.org/zookeeper/zookeeper-${version}/zookeeper-${version}.tar.gz");
+    /**
+     * Location of the ZK configuration file template to be copied to the server.
+     */
+    @SetFromFlag("zookeeperConfig")
+    ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = ConfigKeys.newStringConfigKey(
+            "zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)",
+            "classpath://org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg");
+    AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count");
+    AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received");
+    AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent");
+    AttributeSensor<Integer> MY_ID = new BasicAttributeSensor<Integer>(Integer.class, "zookeeper.myid", "ZooKeeper node's myId");
+
+    Integer getZookeeperPort();
+
+    String getHostname();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
new file mode 100644
index 0000000..275e101
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single standalone zookeeper instance.
+ */
+public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements ZooKeeperNode {
+
+    public ZooKeeperNodeImpl() {}
+
+    @Override
+    public Class<?> getDriverInterface() {
+        return ZooKeeperDriver.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
new file mode 100644
index 0000000..709e44c
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements ZooKeeperDriver {
+
+    public ZooKeeperSshDriver(ZooKeeperNodeImpl entity, SshMachineLocation machine) {
+        super(entity, machine);
+    }
+
+    @Override
+    protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(), "console.out"); }
+
+    protected Map<String, Integer> getPortMap() {
+        return MutableMap.of("zookeeperPort", getZooKeeperPort());
+    }
+
+    protected String getConfigFileName() {
+        return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE);
+    }
+
+    protected int getMyId() {
+        return entity.getAttribute(ZooKeeperNode.MY_ID);
+    }
+
+    // FIXME All for one, and one for all! If any node fails then we're stuck waiting for its hostname/port forever.
+    // Need a way to terminate the wait based on the entity going on-fire etc.
+    // FIXME Race in getMemebers. Should we change DynamicCluster.grow to create the members and only then call start on them all?
+    public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException, InterruptedException {
+        ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent();
+        List<ZooKeeperServerConfig> result = Lists.newArrayList();
+
+        for (Entity member : ensemble.getMembers()) {
+            Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get();
+            String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get();
+            Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get();
+            Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get();
+            Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get();
+            result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort));
+        }
+        return result;
+    }
+
+    @Override
+    public Integer getZooKeeperPort() {
+        return getEntity().getAttribute(ZooKeeperNode.ZOOKEEPER_PORT);
+    }
+
+    @Override
+    public boolean isRunning() {
+        return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
+    }
+
+    @Override
+    public void stop() {
+        newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();     
+    }
+
+    @Override
+    public void preInstall() {
+        resolver = Entities.newDownloader(this);
+        setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("zookeeper-%s", getVersion()))));
+    }
+
+    @Override
+    public void install() {
+        List<String> urls = resolver.getTargets();
+        String saveAs = resolver.getFilename();
+
+        List<String> commands = ImmutableList.<String> builder()
+                .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+                .add(BashCommands.INSTALL_TAR)
+                .add("tar xzfv " + saveAs)
+                .build();
+
+        newScript(INSTALLING)
+                .body.append(commands)
+                .execute();
+    }
+
+    @Override
+    public void customize() {
+        log.debug("Customizing {}", entity);
+        Networking.checkPortsValid(getPortMap());
+        newScript(CUSTOMIZING)
+                .body.append(
+                        format("cp -R %s/* .", getExpandedInstallDir()),
+                        format("mkdir %s/zookeeper", getRunDir()),
+                        format("echo %d > %s/zookeeper/myid", getMyId(), getRunDir())
+                    )
+                .execute();
+
+        String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/zoo.cfg");
+        copyTemplate(getConfigFileName(), destinationConfigFile);
+    }
+
+    public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "zookeeper.pid"); }
+
+    @Override
+    public void launch() {
+        newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
+                .body.append(format("nohup java $JAVA_OPTS -cp zookeeper-%s.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg > %s 2>&1 &", getVersion(), getLogFileLocation()))
+                .execute();
+    }
+
+    public static class ZooKeeperServerConfig {
+        private final Integer myid;
+        private final String hostname;
+        private final Integer port;
+        private final Integer leaderPort;
+        private final Integer electionPort;
+
+        public ZooKeeperServerConfig(Integer myid, String hostname, Integer port, Integer leaderPort, Integer electionPort) {
+            this.myid = myid;
+            this.hostname = hostname;
+            this.port = port;
+            this.leaderPort = leaderPort;
+            this.electionPort = electionPort;
+        }
+
+        public Integer getMyid() { return myid; }
+        public String getHostname() { return hostname; }
+        public Integer getPort() { return port; }
+        public Integer getLeaderPort() { return leaderPort; }
+        public Integer getElectionPort() { return electionPort; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml b/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml
deleted file mode 100644
index 52114d1..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-[#ftl]
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Based on standard file from ActiveMQ Version 5.7.0 -->
-<!-- START SNIPPET: example -->
-<beans
-  xmlns="http://www.springframework.org/schema/beans"
-  xmlns:amq="http://activemq.apache.org/schema/core"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
-  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
-    <!-- Allows us to use system properties as variables in this configuration file -->
-    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
-        <property name="locations">
-            <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value>
-        </property>
-    </bean>
-
-    <!--
-        The <broker> element is used to configure the ActiveMQ broker.
-    -->
-    <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]">
-
-        <!--
-            For better performances use VM cursor and small memory limit.
-            For more information, see:
-
-            http://activemq.apache.org/message-cursors.html
-
-            Also, if your producer is "hanging", it's probably due to producer flow control.
-            For more information, see:
-            http://activemq.apache.org/producer-flow-control.html
-        -->
-
-        <destinationPolicy>
-            <policyMap>
-              <policyEntries>
-                <policyEntry topic=">" producerFlowControl="true">
-                    <!-- The constantPendingMessageLimitStrategy is used to prevent
-                         slow topic consumers to block producers and affect other consumers
-                         by limiting the number of messages that are retained
-                         For more information, see:
-
-                         http://activemq.apache.org/slow-consumer-handling.html
-
-                    -->
-                  <pendingMessageLimitStrategy>
-                    <constantPendingMessageLimitStrategy limit="1000"/>
-                  </pendingMessageLimitStrategy>
-                </policyEntry>
-                <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
-                  <!-- Use VM cursor for better latency
-                       For more information, see:
-
-                       http://activemq.apache.org/message-cursors.html
-
-                  <pendingQueuePolicy>
-                    <vmQueueCursor/>
-                  </pendingQueuePolicy>
-                  -->
-                </policyEntry>
-              </policyEntries>
-            </policyMap>
-        </destinationPolicy>
-
-
-        <!--
-            The managementContext is used to configure how ActiveMQ is exposed in
-            JMX. By default, ActiveMQ uses the MBean server that is started by
-            the JVM. For more information, see:
-
-            http://activemq.apache.org/jmx.html
-        -->
-        <managementContext>
-            [#if entity.jmxPort > 0]
-            <managementContext connectorPort="${entity.jmxPort?c}"/>
-            [#else]
-            <managementContext createConnector="false"/>
-            [/#if]
-        </managementContext>
-
-        <!--
-            Configure message persistence for the broker. The default persistence
-            mechanism is the KahaDB store (identified by the kahaDB tag).
-            For more information, see:
-
-            http://activemq.apache.org/persistence.html
-        -->
-        <persistenceAdapter>
-            <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/>
-        </persistenceAdapter>
-
-
-          <!--
-            The systemUsage controls the maximum amount of space the broker will
-            use before slowing down producers. For more information, see:
-            http://activemq.apache.org/producer-flow-control.html
-            If using ActiveMQ embedded - the following limits could safely be used:
-
-        <systemUsage>
-            <systemUsage>
-                <memoryUsage>
-                    <memoryUsage limit="20 mb"/>
-                </memoryUsage>
-                <storeUsage>
-                    <storeUsage limit="1 gb"/>
-                </storeUsage>
-                <tempUsage>
-                    <tempUsage limit="100 mb"/>
-                </tempUsage>
-            </systemUsage>
-        </systemUsage>
-        -->
-          <systemUsage>
-            <systemUsage>
-                <memoryUsage>
-                    <memoryUsage limit="64 mb"/>
-                </memoryUsage>
-                <storeUsage>
-                    <storeUsage limit="100 gb"/>
-                </storeUsage>
-                <tempUsage>
-                    <tempUsage limit="50 gb"/>
-                </tempUsage>
-            </systemUsage>
-        </systemUsage>
-
-        <!--
-            The transport connectors expose ActiveMQ over a given protocol to
-            clients and other brokers. For more information, see:
-
-            http://activemq.apache.org/configuring-transports.html
-        -->
-        <transportConnectors>
-            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
-            <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
-        </transportConnectors>
-
-        <!-- destroy the spring context on shutdown to stop jetty -->
-        <shutdownHooks>
-            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
-        </shutdownHooks>
-
-    </broker>
-
-    <!--
-        Enable web consoles, REST and Ajax APIs and demos
-
-        Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details
-    -->
-    <import resource="jetty.xml"/>
-
-</beans>
-<!-- END SNIPPET: example -->

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
deleted file mode 100644
index d600ef5..0000000
Binary files a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
deleted file mode 100644
index feb871f..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
+++ /dev/null
@@ -1,112 +0,0 @@
-[#ftl]
-#
-##
-# KafkaBroker configuration template for Brooklyn
-#
-# see kafka.server.KafkaConfig for additional details and defaults
-##
-
-############################# Server Basics #############################
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=${entity.brokerId?c}
-
-############################# Socket Server Settings #############################
-
-# The port the socket server listens on
-port=${entity.kafkaPort?c}
-
-# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-host.name=${driver.hostname}
-
-# Hostname the broker will advertise to producers and consumers. If not set, it uses the
-# value for "host.name" if configured.  Otherwise, it will use the value returned from
-# java.net.InetAddress.getCanonicalHostName().
-#advertised.host.name=<hostname routable by clients>
-
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
-#advertised.port=<port accessible by clients>
-
-# The number of threads handling network requests
-num.network.threads=3
- 
-# The number of threads doing disk I/O
-num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# The directory under which to store log files
-log.dir=${driver.runDir}/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
-num.recovery.threads.per.data.dir=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk. 
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data may be lost if you are not using replication.
-#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-log.flush.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
-#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted according 
-# to the retention policies
-log.retention.check.interval.ms=300000
-
-# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
-log.cleaner.enable=false
-
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
-
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=1000000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
deleted file mode 100644
index 646d2f1..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
+++ /dev/null
@@ -1,13 +0,0 @@
-[#ftl]
-#
-
-##
-# KafkaZookeeper configuration template for Brooklyn
-##
-
-# the directory where the snapshot is stored.
-dataDir=${driver.runDir}/zookeeper
-# the port at which the clients will connect
-clientPort=${entity.zookeeperPort?c}
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config b/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config
deleted file mode 100644
index b4428f0..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config
+++ /dev/null
@@ -1,5 +0,0 @@
-[
-<#if entity.enableManagementPlugin>
-    {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]}
-</#if>
-].
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml b/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml
deleted file mode 100644
index 99f0c71..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml
+++ /dev/null
@@ -1,39 +0,0 @@
-[#ftl]
-#
-# Storm Configuration
-[#if driver.zookeeperServers?has_content]
- storm.zookeeper.servers:
-[#list driver.zookeeperServers as zkServer]
-   - "${zkServer}"
-[/#list]
-[/#if]
-
- storm.local.dir: "${driver.localDir}"
-
-### ui.* configs are for the master
- ui.port: ${driver.uiPort?c}
- ui.childopts: "-Xmx768m"
-
-[#if driver.roleName == "ui"]
- nimbus.host: "${driver.nimbusHostname}"
-[/#if]
-
- nimbus.childopts: " ${driver.jvmOptsLine}"
- worker.childopts: " ${driver.jvmOptsLine}"
- supervisor.childopts: " ${driver.jvmOptsLine}" 
-  
-# ##### These may optionally be filled in:
-#    
-## List of custom serializations
-# topology.kryo.register:
-#     - org.mycompany.MyType
-#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
-#
-## List of custom kryo decorators
-# topology.kryo.decorators:
-#     - org.mycompany.MyDecorator
-#
-## Locations of the drpc servers
-# drpc.servers:
-#     - "server1"
-#     - "server2"

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg b/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg
deleted file mode 100644
index 79721a6..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg
+++ /dev/null
@@ -1,42 +0,0 @@
-[#ftl]
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#  http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-##
-# ZooKeeper configuration template for Brooklyn
-##
-
-# The number of milliseconds of each tick
-tickTime=2000
-# The number of ticks that the initial
-# synchronization phase can take
-initLimit=10
-# The number of ticks that can pass between
-# sending a request and getting an acknowledgement
-syncLimit=5
-# the directory where the snapshot is stored.
-dataDir=${driver.runDir}/zookeeper
-# the port at which the clients will connect
-clientPort=${entity.zookeeperPort?c}
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
-
-[#list driver.zookeeperServers as zkServer]
-server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c}
-[/#list]

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
new file mode 100644
index 0000000..52114d1
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
@@ -0,0 +1,154 @@
+[#ftl]
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Based on standard file from ActiveMQ Version 5.7.0 -->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!-- Allows us to use system properties as variables in this configuration file -->
+    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+        <property name="locations">
+            <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value>
+        </property>
+    </bean>
+
+    <!--
+        The <broker> element is used to configure the ActiveMQ broker.
+    -->
+    <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]">
+
+        <!--
+            For better performances use VM cursor and small memory limit.
+            For more information, see:
+
+            http://activemq.apache.org/message-cursors.html
+
+            Also, if your producer is "hanging", it's probably due to producer flow control.
+            For more information, see:
+            http://activemq.apache.org/producer-flow-control.html
+        -->
+
+        <destinationPolicy>
+            <policyMap>
+              <policyEntries>
+                <policyEntry topic=">" producerFlowControl="true">
+                    <!-- The constantPendingMessageLimitStrategy is used to prevent
+                         slow topic consumers to block producers and affect other consumers
+                         by limiting the number of messages that are retained
+                         For more information, see:
+
+                         http://activemq.apache.org/slow-consumer-handling.html
+
+                    -->
+                  <pendingMessageLimitStrategy>
+                    <constantPendingMessageLimitStrategy limit="1000"/>
+                  </pendingMessageLimitStrategy>
+                </policyEntry>
+                <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
+                  <!-- Use VM cursor for better latency
+                       For more information, see:
+
+                       http://activemq.apache.org/message-cursors.html
+
+                  <pendingQueuePolicy>
+                    <vmQueueCursor/>
+                  </pendingQueuePolicy>
+                  -->
+                </policyEntry>
+              </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+
+
+        <!--
+            The managementContext is used to configure how ActiveMQ is exposed in
+            JMX. By default, ActiveMQ uses the MBean server that is started by
+            the JVM. For more information, see:
+
+            http://activemq.apache.org/jmx.html
+        -->
+        <managementContext>
+            [#if entity.jmxPort > 0]
+            <managementContext connectorPort="${entity.jmxPort?c}"/>
+            [#else]
+            <managementContext createConnector="false"/>
+            [/#if]
+        </managementContext>
+
+        <!--
+            Configure message persistence for the broker. The default persistence
+            mechanism is the KahaDB store (identified by the kahaDB tag).
+            For more information, see:
+
+            http://activemq.apache.org/persistence.html
+        -->
+        <persistenceAdapter>
+            <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/>
+        </persistenceAdapter>
+
+
+          <!--
+            The systemUsage controls the maximum amount of space the broker will
+            use before slowing down producers. For more information, see:
+            http://activemq.apache.org/producer-flow-control.html
+            If using ActiveMQ embedded - the following limits could safely be used:
+
+        <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="20 mb"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="1 gb"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="100 mb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+        -->
+          <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage limit="64 mb"/>
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="100 gb"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="50 gb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+
+        <!--
+            The transport connectors expose ActiveMQ over a given protocol to
+            clients and other brokers. For more information, see:
+
+            http://activemq.apache.org/configuring-transports.html
+        -->
+        <transportConnectors>
+            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
+            <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
+        </transportConnectors>
+
+        <!-- destroy the spring context on shutdown to stop jetty -->
+        <shutdownHooks>
+            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
+        </shutdownHooks>
+
+    </broker>
+
+    <!--
+        Enable web consoles, REST and Ajax APIs and demos
+
+        Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details
+    -->
+    <import resource="jetty.xml"/>
+
+</beans>
+<!-- END SNIPPET: example -->

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
new file mode 100644
index 0000000..d600ef5
Binary files /dev/null and b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg differ

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
new file mode 100644
index 0000000..feb871f
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
@@ -0,0 +1,112 @@
+[#ftl]
+#
+##
+# KafkaBroker configuration template for Brooklyn
+#
+# see kafka.server.KafkaConfig for additional details and defaults
+##
+
+############################# Server Basics #############################
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=${entity.brokerId?c}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=${entity.kafkaPort?c}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name=${driver.hostname}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured.  Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+ 
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=${driver.runDir}/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk. 
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
new file mode 100644
index 0000000..646d2f1
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
@@ -0,0 +1,13 @@
+[#ftl]
+#
+
+##
+# KafkaZookeeper configuration template for Brooklyn
+##
+
+# the directory where the snapshot is stored.
+dataDir=${driver.runDir}/zookeeper
+# the port at which the clients will connect
+clientPort=${entity.zookeeperPort?c}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
new file mode 100644
index 0000000..b4428f0
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
@@ -0,0 +1,5 @@
+[
+<#if entity.enableManagementPlugin>
+    {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]}
+</#if>
+].
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
new file mode 100644
index 0000000..99f0c71
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
@@ -0,0 +1,39 @@
+[#ftl]
+#
+# Storm Configuration
+[#if driver.zookeeperServers?has_content]
+ storm.zookeeper.servers:
+[#list driver.zookeeperServers as zkServer]
+   - "${zkServer}"
+[/#list]
+[/#if]
+
+ storm.local.dir: "${driver.localDir}"
+
+### ui.* configs are for the master
+ ui.port: ${driver.uiPort?c}
+ ui.childopts: "-Xmx768m"
+
+[#if driver.roleName == "ui"]
+ nimbus.host: "${driver.nimbusHostname}"
+[/#if]
+
+ nimbus.childopts: " ${driver.jvmOptsLine}"
+ worker.childopts: " ${driver.jvmOptsLine}"
+ supervisor.childopts: " ${driver.jvmOptsLine}" 
+  
+# ##### These may optionally be filled in:
+#    
+## List of custom serializations
+# topology.kryo.register:
+#     - org.mycompany.MyType
+#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
+#
+## List of custom kryo decorators
+# topology.kryo.decorators:
+#     - org.mycompany.MyDecorator
+#
+## Locations of the drpc servers
+# drpc.servers:
+#     - "server1"
+#     - "server2"

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
new file mode 100644
index 0000000..79721a6
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
@@ -0,0 +1,42 @@
+[#ftl]
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##
+# ZooKeeper configuration template for Brooklyn
+##
+
+# The number of milliseconds of each tick
+tickTime=2000
+# The number of ticks that the initial
+# synchronization phase can take
+initLimit=10
+# The number of ticks that can pass between
+# sending a request and getting an acknowledgement
+syncLimit=5
+# the directory where the snapshot is stored.
+dataDir=${driver.runDir}/zookeeper
+# the port at which the clients will connect
+clientPort=${entity.zookeeperPort?c}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+
+[#list driver.zookeeperServers as zkServer]
+server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c}
+[/#list]

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
deleted file mode 100644
index aaffda8..0000000
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.activemq;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.AbstractEc2LiveTest;
-import brooklyn.entity.trait.Startable;
-
-import com.google.common.collect.ImmutableList;
-
-public class ActiveMQEc2LiveTest extends AbstractEc2LiveTest {
-
-    /**
-     * Test that can install+start, and use, ActiveMQ.
-     */
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        String queueName = "testQueue";
-        int number = 10;
-        String content = "01234567890123456789012345678901";
-
-        // Start broker with a configured queue
-        ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
-        
-        app.start(ImmutableList.of(loc));
-        
-        EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
-
-        // Check queue created
-        assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
-        assertEquals(activeMQ.getChildren().size(), 1);
-        assertEquals(activeMQ.getQueues().size(), 1);
-
-        // Get the named queue entity
-        ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
-        assertNotNull(queue);
-
-        // Connect to broker using JMS and send messages
-        Connection connection = getActiveMQConnection(activeMQ);
-        clearQueue(connection, queueName);
-        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
-        sendMessages(connection, number, queueName, content);
-
-        // Check messages arrived
-        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
-        connection.close();
-    }
-
-    private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
-        int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
-        String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
-        Connection connection = factory.createConnection("admin", "activemq");
-        connection.start();
-        return connection;
-    }
-
-    private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
-        MessageProducer messageProducer = session.createProducer(destination);
-
-        for (int i = 0; i < count; i++) {
-            TextMessage message = session.createTextMessage(content);
-            messageProducer.send(message);
-        }
-
-        session.close();
-    }
-
-    private int clearQueue(Connection connection, String queueName) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
-        MessageConsumer messageConsumer = session.createConsumer(destination);
-
-        int received = 0;
-        while (messageConsumer.receive(500) != null) received++;
-
-        session.close();
-
-        return received;
-    }
-    
-    @Test(enabled=false)
-    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
deleted file mode 100644
index e26dc2d..0000000
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.activemq;
-
-import brooklyn.entity.AbstractGoogleComputeLiveTest;
-import brooklyn.entity.trait.Startable;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-public class ActiveMQGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest {
-
-    /**
-     * Test that can install+start, and use, ActiveMQ.
-     */
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        String queueName = "testQueue";
-        int number = 10;
-        String content = "01234567890123456789012345678901";
-
-        // Start broker with a configured queue
-        ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
-        
-        app.start(ImmutableList.of(loc));
-        
-        EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
-
-        // Check queue created
-        assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
-        assertEquals(activeMQ.getChildren().size(), 1);
-        assertEquals(activeMQ.getQueues().size(), 1);
-
-        // Get the named queue entity
-        ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
-        assertNotNull(queue);
-
-        // Connect to broker using JMS and send messages
-        Connection connection = getActiveMQConnection(activeMQ);
-        clearQueue(connection, queueName);
-        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
-        sendMessages(connection, number, queueName, content);
-
-        // Check messages arrived
-        EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
-        connection.close();
-    }
-
-    private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
-        int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
-        String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
-        Connection connection = factory.createConnection("admin", "activemq");
-        connection.start();
-        return connection;
-    }
-
-    private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
-        MessageProducer messageProducer = session.createProducer(destination);
-
-        for (int i = 0; i < count; i++) {
-            TextMessage message = session.createTextMessage(content);
-            messageProducer.send(message);
-        }
-
-        session.close();
-    }
-
-    private int clearQueue(Connection connection, String queueName) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
-        MessageConsumer messageConsumer = session.createConsumer(destination);
-
-        int received = 0;
-        while (messageConsumer.receive(500) != null) received++;
-
-        session.close();
-
-        return received;
-    }
-    
-    @Test(enabled=false)
-    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
-}