You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:01:01 UTC
[46/64] incubator-brooklyn git commit: brooklyn-software-messaging:
add org.apache package prefix
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
new file mode 100644
index 0000000..1f108a0
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.messaging.storm;
+
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
+import brooklyn.event.basic.DependentConfiguration;
+import org.apache.brooklyn.location.basic.Machines;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class StormSshDriver extends JavaSoftwareProcessSshDriver implements StormDriver {
+
+ private static final Logger log = LoggerFactory.getLogger(StormSshDriver.class);
+
+ public StormSshDriver(EntityLocal entity, SshMachineLocation machine) {
+ super(entity, machine);
+ }
+
+ public String getRoleName() {
+ return entity.getConfig(Storm.ROLE).name().toLowerCase();
+ }
+
+ public String getZeromqVersion() {
+ return entity.getConfig(Storm.ZEROMQ_VERSION);
+ }
+
+ public String getLocalDir() {
+ return Optional.fromNullable(entity.getConfig(Storm.LOCAL_DIR)).or(Os.mergePathsUnix(getRunDir(), "storm"));
+ }
+
+ public String getNimbusHostname() {
+ String result = entity.getConfig(Storm.NIMBUS_HOSTNAME);
+ if (result != null) return result;
+
+ Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
+ if (nimbus == null) {
+ log.warn("No nimbus hostname available; using 'localhost'");
+ return "localhost";
+ }
+ return Entities.submit(entity, DependentConfiguration.attributeWhenReady(nimbus, Attributes.HOSTNAME)).getUnchecked();
+ }
+
+ public Integer getUiPort() {
+ return entity.getAttribute(Storm.UI_PORT);
+ }
+
+ public Map<String, Integer> getPortMap() {
+ return MutableMap.of("uiPort", getUiPort());
+ }
+
+ @Override
+ protected List<String> getCustomJavaConfigOptions() {
+ List<String> result = super.getCustomJavaConfigOptions();
+ if ("nimbus".equals(getRoleName()) || "supervisor".equals(getRoleName())) {
+ result.add("-verbose:gc");
+ result.add("-XX:+PrintGCTimeStamps");
+ result.add("-XX:+PrintGCDetails");
+ }
+
+ if ("ui".equals(getRoleName())) {
+ result.add("-Xmx768m");
+ }
+
+ return result;
+ }
+
+ public String getJvmOptsLine() {
+ return Optional.fromNullable(getShellEnvironment().get("JAVA_OPTS")).or("");
+ }
+
+ public List<String> getZookeeperServers() {
+ ZooKeeperEnsemble zooKeeperEnsemble = entity.getConfig(Storm.ZOOKEEPER_ENSEMBLE);
+ Supplier<List<String>> supplier = Entities.attributeSupplierWhenReady(zooKeeperEnsemble, ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
+ return supplier.get();
+ }
+
+ public String getStormConfigTemplateUrl() {
+ return entity.getConfig(Storm.STORM_CONFIG_TEMPLATE_URL);
+ }
+
+ @Override
+ public void preInstall() {
+ resolver = Entities.newDownloader(this);
+ setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("storm-%s", getVersion()))));
+ }
+
+ @Override
+ public void install() {
+ List<String> urls = resolver.getTargets();
+ String saveAs = resolver.getFilename();
+
+ ImmutableList.Builder<String> commands= ImmutableList.<String> builder();
+ if (!getLocation().getOsDetails().isMac()) {
+ commands.add(BashCommands.installPackage(ImmutableMap.of(
+ "yum", "libuuid-devel",
+ "apt", "build-essential uuid-dev pkg-config libtool automake"),
+ "libuuid-devel"));
+ commands.add(BashCommands.ifExecutableElse0("yum", BashCommands.sudo("yum -y groupinstall 'Development Tools'")));
+ }
+ commands.add(BashCommands.installPackage(ImmutableMap.of("yum", "git"), "git"))
+ .add(BashCommands.INSTALL_UNZIP)
+ .addAll(installNativeDependencies())
+ .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+ .add("unzip " + saveAs)
+ .add("mkdir -p " + getLocalDir())
+ .add("chmod 777 " + getLocalDir()); // FIXME
+ newScript(INSTALLING)
+ .body.append(commands.build())
+ .gatherOutput()
+ .execute();
+ }
+
+ public String getPidFile() {
+ return Os.mergePathsUnix(getRunDir(), format("%s.pid", getRoleName()));
+ }
+
+ @Override
+ protected String getLogFileLocation() {
+ return Os.mergePathsUnix(getRunDir(), "logs", format("%s.log", getRoleName()));
+ }
+
+ @Override
+ public void launch() {
+ boolean needsSleep = false;
+ if (getRoleName().equals("supervisor")) {
+ Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
+ if (nimbus == null) {
+ log.warn("No nimbus entity available; not blocking before starting supervisors");
+ } else {
+ Entities.waitForServiceUp(nimbus, entity.getConfig(SoftwareProcess.START_TIMEOUT));
+ needsSleep = true;
+ }
+ }
+
+ String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
+ log.info("Launching " + entity + " with role " + getRoleName() + " and " + "hostname (public) "
+ + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + "hostname (subnet) " + subnetHostname + ")");
+
+ // ensure only one node at a time tries to start
+ // attempting to eliminate the causes of:
+ // 2013-12-12 09:21:45 supervisor [ERROR] Error on initialization of server mk-supervisor
+ // org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /assignments
+ // TODO use SoftwareProcess#START_LATCH instead here?
+
+ Object startMutex = Optional.fromNullable(entity.getConfig(Storm.START_MUTEX)).or(new Object());
+ synchronized (startMutex) {
+ if (needsSleep) {
+ // give 10s extra to make sure nimbus is ready; we see weird zookeeper no /assignments node error otherwise
+ // (this could be optimized by recording nimbus service_up time)
+ Time.sleep(Duration.TEN_SECONDS);
+ }
+ newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
+ .body.append(format("nohup ./bin/storm %s > %s 2>&1 &", getRoleName(), getLogFileLocation()))
+ .execute();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
+ }
+
+ @Override
+ public void stop() {
+ newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
+ }
+
+ @Override
+ public void customize() {
+ log.debug("Customizing {}", entity);
+ Networking.checkPortsValid(getPortMap());
+
+ newScript(CUSTOMIZING)
+ .body.append(format("cp -R %s/* .", getExpandedInstallDir()))
+ .execute();
+
+ String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/storm.yaml");
+ copyTemplate(getStormConfigTemplateUrl(), destinationConfigFile);
+ }
+
+ protected List<String> installNativeDependencies() {
+ String zeromqUrl = format("http://download.zeromq.org/zeromq-%s.tar.gz", getZeromqVersion());
+ String targz = format("zeromq-%s.tar.gz", getZeromqVersion());
+ String jzmq = "https://github.com/nathanmarz/jzmq.git";
+
+ ImmutableList.Builder<String> commands = ImmutableList.<String>builder();
+ if (getLocation().getOsDetails().isMac()) {
+ commands.add("export PATH=$PATH:/usr/local/bin")
+ .add("export JAVA_HOME=$(/usr/libexec/java_home)")
+ .add("cd " + getInstallDir())
+ .add(BashCommands.installPackage(ImmutableMap.of("brew", "automake"), "make"))
+ .add(BashCommands.installPackage(ImmutableMap.of("brew", "libtool"), "libtool"))
+ .add(BashCommands.installPackage(ImmutableMap.of("brew", "pkg-config"), "pkg-config"))
+ .add(BashCommands.installPackage(ImmutableMap.of("brew", "zeromq"), "zeromq"))
+ .add("git clone https://github.com/asmaier/jzmq")
+ .add("cd jzmq")
+ .add("./autogen.sh")
+ .add("./configure")
+ .add("make")
+ .add((BashCommands.sudo("make install")))
+ .add("cd " + getInstallDir());
+ } else {
+ commands.add("export JAVA_HOME=$(dirname $(readlink -m `which java`))/../../ || export JAVA_HOME=/usr/lib/jvm/java")
+ .add("cd " + getInstallDir())
+ .add(BashCommands.commandToDownloadUrlAs(zeromqUrl, targz))
+ .add("tar xzf " + targz)
+ .add(format("cd zeromq-%s", getZeromqVersion()))
+ .add("./configure")
+ .add("make")
+ .add((BashCommands.sudo("make install")))
+ // install jzmq
+ .add("cd " + getInstallDir())
+ .add("git clone " + jzmq)
+ .add("cd jzmq")
+ .add("./autogen.sh")
+ .add("./configure")
+
+ // hack needed on ubuntu 12.04; ignore if it fails
+ // see https://github.com/zeromq/jzmq/issues/114
+ .add(BashCommands.ok(
+ "pushd src ; touch classdist_noinst.stamp ; CLASSPATH=.:./.:$CLASSPATH "
+ + "javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java"))
+ .add(BashCommands.ok("popd"))
+
+ .add("make")
+ .add((BashCommands.sudo("make install")))
+ .add("cd " + getInstallDir());
+ }
+ return commands.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
new file mode 100644
index 0000000..6cb5ab0
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+import brooklyn.event.feed.jmx.JmxAttributePollConfig;
+import brooklyn.event.feed.jmx.JmxFeed;
+import brooklyn.event.feed.jmx.JmxHelper;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Objects.ToStringHelper;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance.
+ */
+public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl implements ZooKeeperNode {
+
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(AbstractZooKeeperImpl.class);
+ private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
+
+ private volatile JmxFeed jmxFeed;
+
+ public AbstractZooKeeperImpl() {
+ }
+
+ @Override
+ public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
+
+ @Override
+ public String getHostname() { return getAttribute(HOSTNAME); }
+
+ @Override
+ public void waitForServiceUp(long duration, TimeUnit units) {
+ super.waitForServiceUp(duration, units);
+
+ if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+ // Wait for the MBean to exist
+ JmxHelper helper = new JmxHelper(this);
+ try {
+ helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
+ } finally {
+ helper.terminate();
+ }
+ }
+ }
+
+ @Override
+ protected void connectSensors() {
+ connectServiceUpIsRunning();
+
+ if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
+ jmxFeed = JmxFeed.builder()
+ .entity(this)
+ .period(500, TimeUnit.MILLISECONDS)
+ .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
+ .objectName(ZOOKEEPER_MBEAN)
+ .attributeName("OutstandingRequests")
+ .onFailureOrException(Functions.constant(-1l)))
+ .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
+ .objectName(ZOOKEEPER_MBEAN)
+ .attributeName("PacketsReceived")
+ .onFailureOrException(Functions.constant(-1l)))
+ .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
+ .objectName(ZOOKEEPER_MBEAN)
+ .attributeName("PacketsSent")
+ .onFailureOrException(Functions.constant(-1l)))
+ .build();
+ }
+ }
+
+ @Override
+ public void disconnectSensors() {
+ super.disconnectSensors();
+ disconnectServiceUpIsRunning();
+ if (jmxFeed != null) jmxFeed.stop();
+ }
+
+ @Override
+ protected ToStringHelper toStringHelper() {
+ return super.toStringHelper()
+ .add("zookeeperPort", getZookeeperPort());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
new file mode 100644
index 0000000..36388f0
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import brooklyn.entity.java.JavaSoftwareProcessDriver;
+
+public interface ZooKeeperDriver extends JavaSoftwareProcessDriver {
+
+ Integer getZooKeeperPort();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
new file mode 100644
index 0000000..d29bc91
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import java.util.List;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.Sensors;
+
+import com.google.common.reflect.TypeToken;
+
+@Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. "
+ + "Apache ZooKeeper enables highly reliable distributed coordination.")
+@ImplementedBy(ZooKeeperEnsembleImpl.class)
+public interface ZooKeeperEnsemble extends DynamicCluster {
+
+ @SetFromFlag("clusterName")
+ BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String
+ .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
+
+ @SetFromFlag("initialSize")
+ public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3);
+
+ @SuppressWarnings("serial")
+ AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>() { },
+ "zookeeper.servers", "Hostnames to connect to cluster with");
+
+ String getClusterName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
new file mode 100644
index 0000000..f9ce930
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.proxying.EntitySpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.basic.Attributes;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.group.AbstractMembershipTrackingPolicy;
+import brooklyn.entity.group.DynamicClusterImpl;
+
+import com.google.common.collect.Lists;
+
+public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble {
+
+ private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class);
+ private static final AtomicInteger myId = new AtomicInteger();
+
+ private MemberTrackingPolicy policy;
+
+ public ZooKeeperEnsembleImpl() {}
+
+ /**
+ * Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes.
+ */
+ @Override
+ protected EntitySpec<?> getMemberSpec() {
+ return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
+ }
+
+ @Override
+ public String getClusterName() {
+ return getAttribute(CLUSTER_NAME);
+ }
+
+ @Override
+ public void init() {
+ log.info("Initializing the ZooKeeper Ensemble");
+ super.init();
+
+ policy = addPolicy(PolicySpec.create(MemberTrackingPolicy.class)
+ .displayName("Members tracker")
+ .configure("group", this));
+ }
+
+ public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
+ @Override
+ protected void onEntityChange(Entity member) {
+ }
+
+ @Override
+ protected void onEntityAdded(Entity member) {
+ if (member.getAttribute(ZooKeeperNode.MY_ID) == null) {
+ ((EntityInternal) member).setAttribute(ZooKeeperNode.MY_ID, myId.incrementAndGet());
+ }
+ }
+
+ @Override
+ protected void onEntityRemoved(Entity member) {
+ }
+ };
+
+ @Override
+ protected void initEnrichers() {
+ super.initEnrichers();
+
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locations) {
+ super.start(locations);
+
+ List<String> zookeeperServers = Lists.newArrayList();
+ for (Entity zookeeper : getMembers()) {
+ zookeeperServers.add(zookeeper.getAttribute(Attributes.HOSTNAME));
+ }
+ setAttribute(ZOOKEEPER_SERVERS, zookeeperServers);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
new file mode 100644
index 0000000..6a67394
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensorAndConfigKey;
+import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance.
+ */
+@Catalog(name="ZooKeeper Node", description="Apache ZooKeeper is a server which enables "
+ + "highly reliable distributed coordination.")
+@ImplementedBy(ZooKeeperNodeImpl.class)
+public interface ZooKeeperNode extends SoftwareProcess {
+
+ @SetFromFlag("version")
+ ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.5");
+ @SetFromFlag("zookeeperPort")
+ PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
+ @SetFromFlag("zookeeperLeaderPort")
+ PortAttributeSensorAndConfigKey ZOOKEEPER_LEADER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.leader.port", "Zookeeper leader ports", "2888+");
+ @SetFromFlag("zookeeperElectionPort")
+ PortAttributeSensorAndConfigKey ZOOKEEPER_ELECTION_PORT = new PortAttributeSensorAndConfigKey("zookeeper.election.port", "Zookeeper election ports", "3888+");
+ @SetFromFlag("downloadUrl")
+ BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
+ SoftwareProcess.DOWNLOAD_URL, "http://apache.fastbull.org/zookeeper/zookeeper-${version}/zookeeper-${version}.tar.gz");
+ /**
+ * Location of the ZK configuration file template to be copied to the server.
+ */
+ @SetFromFlag("zookeeperConfig")
+ ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = ConfigKeys.newStringConfigKey(
+ "zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)",
+ "classpath://org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg");
+ AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count");
+ AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received");
+ AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent");
+ AttributeSensor<Integer> MY_ID = new BasicAttributeSensor<Integer>(Integer.class, "zookeeper.myid", "ZooKeeper node's myId");
+
+ Integer getZookeeperPort();
+
+ String getHostname();
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
new file mode 100644
index 0000000..275e101
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+/**
+ * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single standalone zookeeper instance.
+ */
+public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements ZooKeeperNode {
+
+ public ZooKeeperNodeImpl() {}
+
+ @Override
+ public Class<?> getDriverInterface() {
+ return ZooKeeperDriver.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
new file mode 100644
index 0000000..709e44c
--- /dev/null
+++ b/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.zookeeper;
+
+import static java.lang.String.format;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.java.JavaSoftwareProcessSshDriver;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.net.Networking;
+import brooklyn.util.os.Os;
+import brooklyn.util.ssh.BashCommands;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements ZooKeeperDriver {
+
+ public ZooKeeperSshDriver(ZooKeeperNodeImpl entity, SshMachineLocation machine) {
+ super(entity, machine);
+ }
+
+ @Override
+ protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(), "console.out"); }
+
+ protected Map<String, Integer> getPortMap() {
+ return MutableMap.of("zookeeperPort", getZooKeeperPort());
+ }
+
+ protected String getConfigFileName() {
+ return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE);
+ }
+
+ protected int getMyId() {
+ return entity.getAttribute(ZooKeeperNode.MY_ID);
+ }
+
+ // FIXME All for one, and one for all! If any node fails then we're stuck waiting for its hostname/port forever.
+ // Need a way to terminate the wait based on the entity going on-fire etc.
+ // FIXME Race in getMemebers. Should we change DynamicCluster.grow to create the members and only then call start on them all?
+ public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException, InterruptedException {
+ ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent();
+ List<ZooKeeperServerConfig> result = Lists.newArrayList();
+
+ for (Entity member : ensemble.getMembers()) {
+ Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get();
+ String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get();
+ Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get();
+ Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get();
+ Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get();
+ result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort));
+ }
+ return result;
+ }
+
+ @Override
+ public Integer getZooKeeperPort() {
+ return getEntity().getAttribute(ZooKeeperNode.ZOOKEEPER_PORT);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
+ }
+
+ @Override
+ public void stop() {
+ newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
+ }
+
+ @Override
+ public void preInstall() {
+ resolver = Entities.newDownloader(this);
+ setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("zookeeper-%s", getVersion()))));
+ }
+
+ @Override
+ public void install() {
+ List<String> urls = resolver.getTargets();
+ String saveAs = resolver.getFilename();
+
+ List<String> commands = ImmutableList.<String> builder()
+ .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
+ .add(BashCommands.INSTALL_TAR)
+ .add("tar xzfv " + saveAs)
+ .build();
+
+ newScript(INSTALLING)
+ .body.append(commands)
+ .execute();
+ }
+
+ @Override
+ public void customize() {
+ log.debug("Customizing {}", entity);
+ Networking.checkPortsValid(getPortMap());
+ newScript(CUSTOMIZING)
+ .body.append(
+ format("cp -R %s/* .", getExpandedInstallDir()),
+ format("mkdir %s/zookeeper", getRunDir()),
+ format("echo %d > %s/zookeeper/myid", getMyId(), getRunDir())
+ )
+ .execute();
+
+ String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/zoo.cfg");
+ copyTemplate(getConfigFileName(), destinationConfigFile);
+ }
+
+ public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "zookeeper.pid"); }
+
+ @Override
+ public void launch() {
+ newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
+ .body.append(format("nohup java $JAVA_OPTS -cp zookeeper-%s.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg > %s 2>&1 &", getVersion(), getLogFileLocation()))
+ .execute();
+ }
+
+ public static class ZooKeeperServerConfig {
+ private final Integer myid;
+ private final String hostname;
+ private final Integer port;
+ private final Integer leaderPort;
+ private final Integer electionPort;
+
+ public ZooKeeperServerConfig(Integer myid, String hostname, Integer port, Integer leaderPort, Integer electionPort) {
+ this.myid = myid;
+ this.hostname = hostname;
+ this.port = port;
+ this.leaderPort = leaderPort;
+ this.electionPort = electionPort;
+ }
+
+ public Integer getMyid() { return myid; }
+ public String getHostname() { return hostname; }
+ public Integer getPort() { return port; }
+ public Integer getLeaderPort() { return leaderPort; }
+ public Integer getElectionPort() { return electionPort; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml b/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml
deleted file mode 100644
index 52114d1..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/activemq/activemq.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-[#ftl]
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Based on standard file from ActiveMQ Version 5.7.0 -->
-<!-- START SNIPPET: example -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <!-- Allows us to use system properties as variables in this configuration file -->
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="locations">
- <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value>
- </property>
- </bean>
-
- <!--
- The <broker> element is used to configure the ActiveMQ broker.
- -->
- <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]">
-
- <!--
- For better performances use VM cursor and small memory limit.
- For more information, see:
-
- http://activemq.apache.org/message-cursors.html
-
- Also, if your producer is "hanging", it's probably due to producer flow control.
- For more information, see:
- http://activemq.apache.org/producer-flow-control.html
- -->
-
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry topic=">" producerFlowControl="true">
- <!-- The constantPendingMessageLimitStrategy is used to prevent
- slow topic consumers to block producers and affect other consumers
- by limiting the number of messages that are retained
- For more information, see:
-
- http://activemq.apache.org/slow-consumer-handling.html
-
- -->
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="1000"/>
- </pendingMessageLimitStrategy>
- </policyEntry>
- <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
- <!-- Use VM cursor for better latency
- For more information, see:
-
- http://activemq.apache.org/message-cursors.html
-
- <pendingQueuePolicy>
- <vmQueueCursor/>
- </pendingQueuePolicy>
- -->
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
-
-
- <!--
- The managementContext is used to configure how ActiveMQ is exposed in
- JMX. By default, ActiveMQ uses the MBean server that is started by
- the JVM. For more information, see:
-
- http://activemq.apache.org/jmx.html
- -->
- <managementContext>
- [#if entity.jmxPort > 0]
- <managementContext connectorPort="${entity.jmxPort?c}"/>
- [#else]
- <managementContext createConnector="false"/>
- [/#if]
- </managementContext>
-
- <!--
- Configure message persistence for the broker. The default persistence
- mechanism is the KahaDB store (identified by the kahaDB tag).
- For more information, see:
-
- http://activemq.apache.org/persistence.html
- -->
- <persistenceAdapter>
- <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/>
- </persistenceAdapter>
-
-
- <!--
- The systemUsage controls the maximum amount of space the broker will
- use before slowing down producers. For more information, see:
- http://activemq.apache.org/producer-flow-control.html
- If using ActiveMQ embedded - the following limits could safely be used:
-
- <systemUsage>
- <systemUsage>
- <memoryUsage>
- <memoryUsage limit="20 mb"/>
- </memoryUsage>
- <storeUsage>
- <storeUsage limit="1 gb"/>
- </storeUsage>
- <tempUsage>
- <tempUsage limit="100 mb"/>
- </tempUsage>
- </systemUsage>
- </systemUsage>
- -->
- <systemUsage>
- <systemUsage>
- <memoryUsage>
- <memoryUsage limit="64 mb"/>
- </memoryUsage>
- <storeUsage>
- <storeUsage limit="100 gb"/>
- </storeUsage>
- <tempUsage>
- <tempUsage limit="50 gb"/>
- </tempUsage>
- </systemUsage>
- </systemUsage>
-
- <!--
- The transport connectors expose ActiveMQ over a given protocol to
- clients and other brokers. For more information, see:
-
- http://activemq.apache.org/configuring-transports.html
- -->
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
- <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
- </transportConnectors>
-
- <!-- destroy the spring context on shutdown to stop jetty -->
- <shutdownHooks>
- <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
- </shutdownHooks>
-
- </broker>
-
- <!--
- Enable web consoles, REST and Ajax APIs and demos
-
- Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details
- -->
- <import resource="jetty.xml"/>
-
-</beans>
-<!-- END SNIPPET: example -->
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
deleted file mode 100644
index d600ef5..0000000
Binary files a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
deleted file mode 100644
index feb871f..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/server.properties
+++ /dev/null
@@ -1,112 +0,0 @@
-[#ftl]
-#
-##
-# KafkaBroker configuration template for Brooklyn
-#
-# see kafka.server.KafkaConfig for additional details and defaults
-##
-
-############################# Server Basics #############################
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=${entity.brokerId?c}
-
-############################# Socket Server Settings #############################
-
-# The port the socket server listens on
-port=${entity.kafkaPort?c}
-
-# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-host.name=${driver.hostname}
-
-# Hostname the broker will advertise to producers and consumers. If not set, it uses the
-# value for "host.name" if configured. Otherwise, it will use the value returned from
-# java.net.InetAddress.getCanonicalHostName().
-#advertised.host.name=<hostname routable by clients>
-
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
-#advertised.port=<port accessible by clients>
-
-# The number of threads handling network requests
-num.network.threads=3
-
-# The number of threads doing disk I/O
-num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# The directory under which to store log files
-log.dir=${driver.runDir}/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
-num.recovery.threads.per.data.dir=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk.
-# There are a few important trade-offs here:
-# 1. Durability: Unflushed data may be lost if you are not using replication.
-# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-log.flush.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
-#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted according
-# to the retention policies
-log.retention.check.interval.ms=300000
-
-# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
-log.cleaner.enable=false
-
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
-
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=1000000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
deleted file mode 100644
index 646d2f1..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/kafka/zookeeper.properties
+++ /dev/null
@@ -1,13 +0,0 @@
-[#ftl]
-#
-
-##
-# KafkaZookeeper configuration template for Brooklyn
-##
-
-# the directory where the snapshot is stored.
-dataDir=${driver.runDir}/zookeeper
-# the port at which the clients will connect
-clientPort=${entity.zookeeperPort?c}
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config b/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config
deleted file mode 100644
index b4428f0..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/rabbit/rabbitmq.config
+++ /dev/null
@@ -1,5 +0,0 @@
-[
-<#if entity.enableManagementPlugin>
- {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]}
-</#if>
-].
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml b/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml
deleted file mode 100644
index 99f0c71..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/storm/storm.yaml
+++ /dev/null
@@ -1,39 +0,0 @@
-[#ftl]
-#
-# Storm Configuration
-[#if driver.zookeeperServers?has_content]
- storm.zookeeper.servers:
-[#list driver.zookeeperServers as zkServer]
- - "${zkServer}"
-[/#list]
-[/#if]
-
- storm.local.dir: "${driver.localDir}"
-
-### ui.* configs are for the master
- ui.port: ${driver.uiPort?c}
- ui.childopts: "-Xmx768m"
-
-[#if driver.roleName == "ui"]
- nimbus.host: "${driver.nimbusHostname}"
-[/#if]
-
- nimbus.childopts: " ${driver.jvmOptsLine}"
- worker.childopts: " ${driver.jvmOptsLine}"
- supervisor.childopts: " ${driver.jvmOptsLine}"
-
-# ##### These may optionally be filled in:
-#
-## List of custom serializations
-# topology.kryo.register:
-# - org.mycompany.MyType
-# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
-#
-## List of custom kryo decorators
-# topology.kryo.decorators:
-# - org.mycompany.MyDecorator
-#
-## Locations of the drpc servers
-# drpc.servers:
-# - "server1"
-# - "server2"
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg b/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg
deleted file mode 100644
index 79721a6..0000000
--- a/software/messaging/src/main/resources/brooklyn/entity/messaging/zookeeper/zoo.cfg
+++ /dev/null
@@ -1,42 +0,0 @@
-[#ftl]
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-##
-# ZooKeeper configuration template for Brooklyn
-##
-
-# The number of milliseconds of each tick
-tickTime=2000
-# The number of ticks that the initial
-# synchronization phase can take
-initLimit=10
-# The number of ticks that can pass between
-# sending a request and getting an acknowledgement
-syncLimit=5
-# the directory where the snapshot is stored.
-dataDir=${driver.runDir}/zookeeper
-# the port at which the clients will connect
-clientPort=${entity.zookeeperPort?c}
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
-
-[#list driver.zookeeperServers as zkServer]
-server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c}
-[/#list]
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
new file mode 100644
index 0000000..52114d1
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
@@ -0,0 +1,154 @@
+[#ftl]
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Based on standard file from ActiveMQ Version 5.7.0 -->
+<!-- START SNIPPET: example -->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!-- Allows us to use system properties as variables in this configuration file -->
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="locations">
+ <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value>
+ </property>
+ </bean>
+
+ <!--
+ The <broker> element is used to configure the ActiveMQ broker.
+ -->
+ <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]">
+
+ <!--
+ For better performances use VM cursor and small memory limit.
+ For more information, see:
+
+ http://activemq.apache.org/message-cursors.html
+
+ Also, if your producer is "hanging", it's probably due to producer flow control.
+ For more information, see:
+ http://activemq.apache.org/producer-flow-control.html
+ -->
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry topic=">" producerFlowControl="true">
+ <!-- The constantPendingMessageLimitStrategy is used to prevent
+ slow topic consumers to block producers and affect other consumers
+ by limiting the number of messages that are retained
+ For more information, see:
+
+ http://activemq.apache.org/slow-consumer-handling.html
+
+ -->
+ <pendingMessageLimitStrategy>
+ <constantPendingMessageLimitStrategy limit="1000"/>
+ </pendingMessageLimitStrategy>
+ </policyEntry>
+ <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
+ <!-- Use VM cursor for better latency
+ For more information, see:
+
+ http://activemq.apache.org/message-cursors.html
+
+ <pendingQueuePolicy>
+ <vmQueueCursor/>
+ </pendingQueuePolicy>
+ -->
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+
+ <!--
+ The managementContext is used to configure how ActiveMQ is exposed in
+ JMX. By default, ActiveMQ uses the MBean server that is started by
+ the JVM. For more information, see:
+
+ http://activemq.apache.org/jmx.html
+ -->
+ <managementContext>
+ [#if entity.jmxPort > 0]
+ <managementContext connectorPort="${entity.jmxPort?c}"/>
+ [#else]
+ <managementContext createConnector="false"/>
+ [/#if]
+ </managementContext>
+
+ <!--
+ Configure message persistence for the broker. The default persistence
+ mechanism is the KahaDB store (identified by the kahaDB tag).
+ For more information, see:
+
+ http://activemq.apache.org/persistence.html
+ -->
+ <persistenceAdapter>
+ <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/>
+ </persistenceAdapter>
+
+
+ <!--
+ The systemUsage controls the maximum amount of space the broker will
+ use before slowing down producers. For more information, see:
+ http://activemq.apache.org/producer-flow-control.html
+ If using ActiveMQ embedded - the following limits could safely be used:
+
+ <systemUsage>
+ <systemUsage>
+ <memoryUsage>
+ <memoryUsage limit="20 mb"/>
+ </memoryUsage>
+ <storeUsage>
+ <storeUsage limit="1 gb"/>
+ </storeUsage>
+ <tempUsage>
+ <tempUsage limit="100 mb"/>
+ </tempUsage>
+ </systemUsage>
+ </systemUsage>
+ -->
+ <systemUsage>
+ <systemUsage>
+ <memoryUsage>
+ <memoryUsage limit="64 mb"/>
+ </memoryUsage>
+ <storeUsage>
+ <storeUsage limit="100 gb"/>
+ </storeUsage>
+ <tempUsage>
+ <tempUsage limit="50 gb"/>
+ </tempUsage>
+ </systemUsage>
+ </systemUsage>
+
+ <!--
+ The transport connectors expose ActiveMQ over a given protocol to
+ clients and other brokers. For more information, see:
+
+ http://activemq.apache.org/configuring-transports.html
+ -->
+ <transportConnectors>
+ <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
+ <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
+ </transportConnectors>
+
+ <!-- destroy the spring context on shutdown to stop jetty -->
+ <shutdownHooks>
+ <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
+ </shutdownHooks>
+
+ </broker>
+
+ <!--
+ Enable web consoles, REST and Ajax APIs and demos
+
+ Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details
+ -->
+ <import resource="jetty.xml"/>
+
+</beans>
+<!-- END SNIPPET: example -->
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
new file mode 100644
index 0000000..d600ef5
Binary files /dev/null and b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg differ
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
new file mode 100644
index 0000000..feb871f
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
@@ -0,0 +1,112 @@
+[#ftl]
+#
+##
+# KafkaBroker configuration template for Brooklyn
+#
+# see kafka.server.KafkaConfig for additional details and defaults
+##
+
+############################# Server Basics #############################
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=${entity.brokerId?c}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=${entity.kafkaPort?c}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name=${driver.hostname}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=3
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+max.socket.request.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=${driver.runDir}/kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
new file mode 100644
index 0000000..646d2f1
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
@@ -0,0 +1,13 @@
+[#ftl]
+#
+
+##
+# KafkaZookeeper configuration template for Brooklyn
+##
+
+# the directory where the snapshot is stored.
+dataDir=${driver.runDir}/zookeeper
+# the port at which the clients will connect
+clientPort=${entity.zookeeperPort?c}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
new file mode 100644
index 0000000..b4428f0
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
@@ -0,0 +1,5 @@
+[
+<#if entity.enableManagementPlugin>
+ {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]}
+</#if>
+].
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
new file mode 100644
index 0000000..99f0c71
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
@@ -0,0 +1,39 @@
+[#ftl]
+#
+# Storm Configuration
+[#if driver.zookeeperServers?has_content]
+ storm.zookeeper.servers:
+[#list driver.zookeeperServers as zkServer]
+ - "${zkServer}"
+[/#list]
+[/#if]
+
+ storm.local.dir: "${driver.localDir}"
+
+### ui.* configs are for the master
+ ui.port: ${driver.uiPort?c}
+ ui.childopts: "-Xmx768m"
+
+[#if driver.roleName == "ui"]
+ nimbus.host: "${driver.nimbusHostname}"
+[/#if]
+
+ nimbus.childopts: " ${driver.jvmOptsLine}"
+ worker.childopts: " ${driver.jvmOptsLine}"
+ supervisor.childopts: " ${driver.jvmOptsLine}"
+
+# ##### These may optionally be filled in:
+#
+## List of custom serializations
+# topology.kryo.register:
+# - org.mycompany.MyType
+# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
+#
+## List of custom kryo decorators
+# topology.kryo.decorators:
+# - org.mycompany.MyDecorator
+#
+## Locations of the drpc servers
+# drpc.servers:
+# - "server1"
+# - "server2"
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
new file mode 100644
index 0000000..79721a6
--- /dev/null
+++ b/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
@@ -0,0 +1,42 @@
+[#ftl]
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+##
+# ZooKeeper configuration template for Brooklyn
+##
+
+# The number of milliseconds of each tick
+tickTime=2000
+# The number of ticks that the initial
+# synchronization phase can take
+initLimit=10
+# The number of ticks that can pass between
+# sending a request and getting an acknowledgement
+syncLimit=5
+# the directory where the snapshot is stored.
+dataDir=${driver.runDir}/zookeeper
+# the port at which the clients will connect
+clientPort=${entity.zookeeperPort?c}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+
+[#list driver.zookeeperServers as zkServer]
+server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c}
+[/#list]
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
deleted file mode 100644
index aaffda8..0000000
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.activemq;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import brooklyn.entity.AbstractEc2LiveTest;
-import brooklyn.entity.trait.Startable;
-
-import com.google.common.collect.ImmutableList;
-
-public class ActiveMQEc2LiveTest extends AbstractEc2LiveTest {
-
- /**
- * Test that can install+start, and use, ActiveMQ.
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- String queueName = "testQueue";
- int number = 10;
- String content = "01234567890123456789012345678901";
-
- // Start broker with a configured queue
- ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
-
- app.start(ImmutableList.of(loc));
-
- EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
-
- // Check queue created
- assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
- assertEquals(activeMQ.getChildren().size(), 1);
- assertEquals(activeMQ.getQueues().size(), 1);
-
- // Get the named queue entity
- ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
- assertNotNull(queue);
-
- // Connect to broker using JMS and send messages
- Connection connection = getActiveMQConnection(activeMQ);
- clearQueue(connection, queueName);
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
- sendMessages(connection, number, queueName, content);
-
- // Check messages arrived
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
- connection.close();
- }
-
- private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
- int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
- String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
- Connection connection = factory.createConnection("admin", "activemq");
- connection.start();
- return connection;
- }
-
- private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageProducer messageProducer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage(content);
- messageProducer.send(message);
- }
-
- session.close();
- }
-
- private int clearQueue(Connection connection, String queueName) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- int received = 0;
- while (messageConsumer.receive(500) != null) received++;
-
- session.close();
-
- return received;
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c14fef53/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java b/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
deleted file mode 100644
index e26dc2d..0000000
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.messaging.activemq;
-
-import brooklyn.entity.AbstractGoogleComputeLiveTest;
-import brooklyn.entity.trait.Startable;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.proxying.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-public class ActiveMQGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest {
-
- /**
- * Test that can install+start, and use, ActiveMQ.
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- String queueName = "testQueue";
- int number = 10;
- String content = "01234567890123456789012345678901";
-
- // Start broker with a configured queue
- ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
-
- app.start(ImmutableList.of(loc));
-
- EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
-
- // Check queue created
- assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
- assertEquals(activeMQ.getChildren().size(), 1);
- assertEquals(activeMQ.getQueues().size(), 1);
-
- // Get the named queue entity
- ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
- assertNotNull(queue);
-
- // Connect to broker using JMS and send messages
- Connection connection = getActiveMQConnection(activeMQ);
- clearQueue(connection, queueName);
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
- sendMessages(connection, number, queueName, content);
-
- // Check messages arrived
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
- connection.close();
- }
-
- private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
- int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
- String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
- Connection connection = factory.createConnection("admin", "activemq");
- connection.start();
- return connection;
- }
-
- private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageProducer messageProducer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage(content);
- messageProducer.send(message);
- }
-
- session.close();
- }
-
- private int clearQueue(Connection connection, String queueName) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- int received = 0;
- while (messageConsumer.receive(500) != null) received++;
-
- session.close();
-
- return received;
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}