You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:57 UTC
[21/51] [abbrv] [partial] brooklyn-library git commit: move subdir
from incubator up a level as it is promoted to its own repo (first
non-incubator commit!)
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
deleted file mode 100644
index 71f145c..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.entity.java.JavaAppUtils;
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessDriver;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
-import org.apache.brooklyn.feed.jmx.JmxFeed;
-import org.apache.brooklyn.feed.jmx.JmxHelper;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StormImpl extends SoftwareProcessImpl implements Storm {
-
- private static final Logger log = LoggerFactory.getLogger(StormImpl.class);
-
- public static final ObjectName STORM_MBEAN = JmxHelper.createObjectName("backtype.storm.daemon.nimbus:type=*");
-
- private JmxHelper jmxHelper;
- private volatile JmxFeed jmxFeed;
-
- public StormImpl() {}
-
- @Override
- public String getHostname() { return getAttribute(HOSTNAME); }
-
- @Override
- public Role getRole() { return getConfig(ROLE); }
-
- @Override
- public String getStormConfigTemplateUrl() { return getConfig(STORM_CONFIG_TEMPLATE_URL); }
-
- @Override
- public Class<?> getDriverInterface() {
- return StormDriver.class;
- }
-
- public String getRoleName() { return getRole().name().toLowerCase(); }
-
- @Override
- protected void preStart() {
- setDefaultDisplayName("Storm Node ("+ getRoleName()+")");
- super.preStart();
- }
-
- @Override
- protected void connectSensors() {
- super.connectSensors();
-
- // give it plenty of time to start before we advertise ourselves
- Time.sleep(Duration.TEN_SECONDS);
-
- if (getRole() == Role.UI) {
- sensors().set(STORM_UI_URL, "http://"+getAttribute(Attributes.HOSTNAME)+":"+getAttribute(UI_PORT)+"/");
- }
-
- if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
- jmxHelper = new JmxHelper(this);
-// jmxFeed = JmxFeed.builder()
-// .entity(this)
-// .period(3000, TimeUnit.MILLISECONDS)
-// .helper(jmxHelper)
-// .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
-// .objectName(STORM_MBEAN)
-// .attributeName("Initialized")
-// .onSuccess(Functions.forPredicate(Predicates.notNull()))
-// .onException(Functions.constant(false)))
-// // TODO SERVICE_UP should really be a combo of JMX plus is running
-// .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP)
-// .objectName(STORM_MBEAN)
-// .attributeName("Initialized")
-// .onSuccess(Functions.forPredicate(Predicates.notNull()))
-// .onException(Functions.constant(false)))
-// .build();
- jmxFeed = JavaAppUtils.connectMXBeanSensors(this);
-
- // FIXME for now we do service up based on pid check -- we get a warning that:
- // JMX object backtype.storm.daemon.nimbus:type=* not found at service:jmx:jmxmp://108.59.82.105:31001
- // (JMX is up fine, but no such object there)
- connectServiceUpIsRunning();
- } else {
- // if not using JMX
- log.warn("Storm running without JMX monitoring; limited visibility of service available");
- connectServiceUpIsRunning();
- }
- }
-
- @Override
- public void disconnectSensors() {
- super.disconnectSensors();
- disconnectServiceUpIsRunning();
- if (jmxFeed != null) jmxFeed.stop();
- if (jmxHelper !=null) jmxHelper.terminate();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
deleted file mode 100644
index 66578e6..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/messaging/storm/StormSshDriver.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import static java.lang.String.format;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntityLocal;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.location.Machines;
-import org.apache.brooklyn.core.sensor.DependentConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessSshDriver;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.net.Networking;
-import org.apache.brooklyn.util.os.Os;
-import org.apache.brooklyn.util.ssh.BashCommands;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-public class StormSshDriver extends JavaSoftwareProcessSshDriver implements StormDriver {
-
- private static final Logger log = LoggerFactory.getLogger(StormSshDriver.class);
-
- public StormSshDriver(EntityLocal entity, SshMachineLocation machine) {
- super(entity, machine);
- }
-
- public String getRoleName() {
- return entity.getConfig(Storm.ROLE).name().toLowerCase();
- }
-
- public String getZeromqVersion() {
- return entity.getConfig(Storm.ZEROMQ_VERSION);
- }
-
- public String getLocalDir() {
- return Optional.fromNullable(entity.getConfig(Storm.LOCAL_DIR)).or(Os.mergePathsUnix(getRunDir(), "storm"));
- }
-
- public String getNimbusHostname() {
- String result = entity.getConfig(Storm.NIMBUS_HOSTNAME);
- if (result != null) return result;
-
- Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
- if (nimbus == null) {
- log.warn("No nimbus hostname available; using 'localhost'");
- return "localhost";
- }
- return Entities.submit(entity, DependentConfiguration.attributeWhenReady(nimbus, Attributes.HOSTNAME)).getUnchecked();
- }
-
- public Integer getUiPort() {
- return entity.getAttribute(Storm.UI_PORT);
- }
-
- public Map<String, Integer> getPortMap() {
- return MutableMap.of("uiPort", getUiPort());
- }
-
- @Override
- protected List<String> getCustomJavaConfigOptions() {
- List<String> result = super.getCustomJavaConfigOptions();
- if ("nimbus".equals(getRoleName()) || "supervisor".equals(getRoleName())) {
- result.add("-verbose:gc");
- result.add("-XX:+PrintGCTimeStamps");
- result.add("-XX:+PrintGCDetails");
- }
-
- if ("ui".equals(getRoleName())) {
- result.add("-Xmx768m");
- }
-
- return result;
- }
-
- public String getJvmOptsLine() {
- return Optional.fromNullable(getShellEnvironment().get("JAVA_OPTS")).or("");
- }
-
- public List<String> getZookeeperServers() {
- ZooKeeperEnsemble zooKeeperEnsemble = entity.getConfig(Storm.ZOOKEEPER_ENSEMBLE);
- Supplier<List<String>> supplier = Entities.attributeSupplierWhenReady(zooKeeperEnsemble, ZooKeeperEnsemble.ZOOKEEPER_SERVERS);
- return supplier.get();
- }
-
- public String getStormConfigTemplateUrl() {
- return entity.getConfig(Storm.STORM_CONFIG_TEMPLATE_URL);
- }
-
- @Override
- public void preInstall() {
- resolver = Entities.newDownloader(this);
- setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("storm-%s", getVersion()))));
- }
-
- @Override
- public void install() {
- List<String> urls = resolver.getTargets();
- String saveAs = resolver.getFilename();
-
- ImmutableList.Builder<String> commands= ImmutableList.<String> builder();
- if (!getLocation().getOsDetails().isMac()) {
- commands.add(BashCommands.installPackage(ImmutableMap.of(
- "yum", "libuuid-devel",
- "apt", "build-essential uuid-dev pkg-config libtool automake"),
- "libuuid-devel"));
- commands.add(BashCommands.ifExecutableElse0("yum", BashCommands.sudo("yum -y groupinstall 'Development Tools'")));
- }
- commands.add(BashCommands.installPackage(ImmutableMap.of("yum", "git"), "git"))
- .add(BashCommands.INSTALL_UNZIP)
- .addAll(installNativeDependencies())
- .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
- .add("unzip " + saveAs)
- .add("mkdir -p " + getLocalDir())
- .add("chmod 777 " + getLocalDir()); // FIXME
- newScript(INSTALLING)
- .body.append(commands.build())
- .gatherOutput()
- .execute();
- }
-
- public String getPidFile() {
- return Os.mergePathsUnix(getRunDir(), format("%s.pid", getRoleName()));
- }
-
- @Override
- protected String getLogFileLocation() {
- return Os.mergePathsUnix(getRunDir(), "logs", format("%s.log", getRoleName()));
- }
-
- @Override
- public void launch() {
- boolean needsSleep = false;
- if (getRoleName().equals("supervisor")) {
- Entity nimbus = entity.getConfig(Storm.NIMBUS_ENTITY);
- if (nimbus == null) {
- log.warn("No nimbus entity available; not blocking before starting supervisors");
- } else {
- Entities.waitForServiceUp(nimbus, entity.getConfig(SoftwareProcess.START_TIMEOUT));
- needsSleep = true;
- }
- }
-
- String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
- log.info("Launching " + entity + " with role " + getRoleName() + " and " + "hostname (public) "
- + getEntity().getAttribute(Attributes.HOSTNAME) + ", " + "hostname (subnet) " + subnetHostname + ")");
-
- // ensure only one node at a time tries to start
- // attempting to eliminate the causes of:
- // 2013-12-12 09:21:45 supervisor [ERROR] Error on initialization of server mk-supervisor
- // org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /assignments
- // TODO use SoftwareProcess#START_LATCH instead here?
-
- Object startMutex = Optional.fromNullable(entity.getConfig(Storm.START_MUTEX)).or(new Object());
- synchronized (startMutex) {
- if (needsSleep) {
- // give 10s extra to make sure nimbus is ready; we see weird zookeeper no /assignments node error otherwise
- // (this could be optimized by recording nimbus service_up time)
- Time.sleep(Duration.TEN_SECONDS);
- }
- newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
- .body.append(format("nohup ./bin/storm %s > %s 2>&1 &", getRoleName(), getLogFileLocation()))
- .execute();
- }
- }
-
- @Override
- public boolean isRunning() {
- return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
- }
-
- @Override
- public void stop() {
- newScript(MutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
- }
-
- @Override
- public void customize() {
- log.debug("Customizing {}", entity);
- Networking.checkPortsValid(getPortMap());
-
- newScript(CUSTOMIZING)
- .body.append(format("cp -R %s/* .", getExpandedInstallDir()))
- .execute();
-
- String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/storm.yaml");
- copyTemplate(getStormConfigTemplateUrl(), destinationConfigFile);
- }
-
- protected List<String> installNativeDependencies() {
- String zeromqUrl = format("http://download.zeromq.org/zeromq-%s.tar.gz", getZeromqVersion());
- String targz = format("zeromq-%s.tar.gz", getZeromqVersion());
- String jzmq = "https://github.com/nathanmarz/jzmq.git";
-
- ImmutableList.Builder<String> commands = ImmutableList.<String>builder();
- if (getLocation().getOsDetails().isMac()) {
- commands.add("export PATH=$PATH:/usr/local/bin")
- .add("export JAVA_HOME=$(/usr/libexec/java_home)")
- .add("cd " + getInstallDir())
- .add(BashCommands.installPackage(ImmutableMap.of("brew", "automake"), "make"))
- .add(BashCommands.installPackage(ImmutableMap.of("brew", "libtool"), "libtool"))
- .add(BashCommands.installPackage(ImmutableMap.of("brew", "pkg-config"), "pkg-config"))
- .add(BashCommands.installPackage(ImmutableMap.of("brew", "zeromq"), "zeromq"))
- .add("git clone https://github.com/asmaier/jzmq")
- .add("cd jzmq")
- .add("./autogen.sh")
- .add("./configure")
- .add("make")
- .add((BashCommands.sudo("make install")))
- .add("cd " + getInstallDir());
- } else {
- commands.add("export JAVA_HOME=$(dirname $(readlink -m `which java`))/../../ || export JAVA_HOME=/usr/lib/jvm/java")
- .add("cd " + getInstallDir())
- .add(BashCommands.commandToDownloadUrlAs(zeromqUrl, targz))
- .add("tar xzf " + targz)
- .add(format("cd zeromq-%s", getZeromqVersion()))
- .add("./configure")
- .add("make")
- .add((BashCommands.sudo("make install")))
- // install jzmq
- .add("cd " + getInstallDir())
- .add("git clone " + jzmq)
- .add("cd jzmq")
- .add("./autogen.sh")
- .add("./configure")
-
- // hack needed on ubuntu 12.04; ignore if it fails
- // see https://github.com/zeromq/jzmq/issues/114
- .add(BashCommands.ok(
- "pushd src ; touch classdist_noinst.stamp ; CLASSPATH=.:./.:$CLASSPATH "
- + "javac -d . org/zeromq/ZMQ.java org/zeromq/App.java org/zeromq/ZMQForwarder.java org/zeromq/EmbeddedLibraryTools.java org/zeromq/ZMQQueue.java org/zeromq/ZMQStreamer.java org/zeromq/ZMQException.java"))
- .add(BashCommands.ok("popd"))
-
- .add("make")
- .add((BashCommands.sudo("make install")))
- .add("cd " + getInstallDir());
- }
- return commands.build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
deleted file mode 100644
index 60175c9..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/AbstractZooKeeperImpl.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-import java.util.concurrent.TimeUnit;
-
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessDriver;
-import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
-import org.apache.brooklyn.feed.jmx.JmxAttributePollConfig;
-import org.apache.brooklyn.feed.jmx.JmxFeed;
-import org.apache.brooklyn.feed.jmx.JmxHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Objects.ToStringHelper;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance.
- */
-public abstract class AbstractZooKeeperImpl extends SoftwareProcessImpl implements ZooKeeperNode {
-
- @SuppressWarnings("unused")
- private static final Logger log = LoggerFactory.getLogger(AbstractZooKeeperImpl.class);
- private static final ObjectName ZOOKEEPER_MBEAN = JmxHelper.createObjectName("org.apache.ZooKeeperService:name0=StandaloneServer_port-1");
-
- private volatile JmxFeed jmxFeed;
-
- public AbstractZooKeeperImpl() {
- }
-
- @Override
- public Integer getZookeeperPort() { return getAttribute(ZOOKEEPER_PORT); }
-
- @Override
- public String getHostname() { return getAttribute(HOSTNAME); }
-
- @Override
- public void waitForServiceUp(long duration, TimeUnit units) {
- super.waitForServiceUp(duration, units);
-
- if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
- // Wait for the MBean to exist
- JmxHelper helper = new JmxHelper(this);
- try {
- helper.assertMBeanExistsEventually(ZOOKEEPER_MBEAN, units.toMillis(duration));
- } finally {
- helper.terminate();
- }
- }
- }
-
- @Override
- protected void connectSensors() {
- connectServiceUpIsRunning();
-
- if (((JavaSoftwareProcessDriver)getDriver()).isJmxEnabled()) {
- jmxFeed = JmxFeed.builder()
- .entity(this)
- .period(500, TimeUnit.MILLISECONDS)
- .pollAttribute(new JmxAttributePollConfig<Long>(OUTSTANDING_REQUESTS)
- .objectName(ZOOKEEPER_MBEAN)
- .attributeName("OutstandingRequests")
- .onFailureOrException(Functions.constant(-1l)))
- .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_RECEIVED)
- .objectName(ZOOKEEPER_MBEAN)
- .attributeName("PacketsReceived")
- .onFailureOrException(Functions.constant(-1l)))
- .pollAttribute(new JmxAttributePollConfig<Long>(PACKETS_SENT)
- .objectName(ZOOKEEPER_MBEAN)
- .attributeName("PacketsSent")
- .onFailureOrException(Functions.constant(-1l)))
- .build();
- }
- }
-
- @Override
- public void disconnectSensors() {
- super.disconnectSensors();
- disconnectServiceUpIsRunning();
- if (jmxFeed != null) jmxFeed.stop();
- }
-
- @Override
- protected ToStringHelper toStringHelper() {
- return super.toStringHelper()
- .add("zookeeperPort", getZookeeperPort());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
deleted file mode 100644
index b92e9da..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperDriver.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessDriver;
-
-public interface ZooKeeperDriver extends JavaSoftwareProcessDriver {
-
- Integer getZooKeeperPort();
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
deleted file mode 100644
index a5ba570..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsemble.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-import java.util.List;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.Sensors;
-import org.apache.brooklyn.entity.group.DynamicCluster;
-import org.apache.brooklyn.util.core.flags.SetFromFlag;
-
-import com.google.common.reflect.TypeToken;
-
-@Catalog(name="ZooKeeper ensemble", description="A cluster of ZooKeeper servers. "
- + "Apache ZooKeeper enables highly reliable distributed coordination.")
-@ImplementedBy(ZooKeeperEnsembleImpl.class)
-public interface ZooKeeperEnsemble extends DynamicCluster {
-
- @SetFromFlag("clusterName")
- BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String
- .class, "zookeeper.cluster.name", "Name of the Zookeeper cluster", "BrooklynZookeeperCluster");
-
- @SetFromFlag("initialSize")
- public static final ConfigKey<Integer> INITIAL_SIZE = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.INITIAL_SIZE, 3);
-
- @SuppressWarnings("serial")
- AttributeSensor<List<String>> ZOOKEEPER_SERVERS = Sensors.newSensor(new TypeToken<List<String>>() { },
- "zookeeper.servers", "Hostnames to connect to cluster with");
-
- String getClusterName();
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
deleted file mode 100644
index c2c3e3f..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperEnsembleImpl.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.policy.PolicySpec;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import org.apache.brooklyn.entity.group.DynamicClusterImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-public class ZooKeeperEnsembleImpl extends DynamicClusterImpl implements ZooKeeperEnsemble {
-
- private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleImpl.class);
- private static final AtomicInteger myId = new AtomicInteger();
-
- private MemberTrackingPolicy policy;
-
- public ZooKeeperEnsembleImpl() {}
-
- /**
- * Sets the default {@link #MEMBER_SPEC} to describe the ZooKeeper nodes.
- */
- @Override
- protected EntitySpec<?> getMemberSpec() {
- return getConfig(MEMBER_SPEC, EntitySpec.create(ZooKeeperNode.class));
- }
-
- @Override
- public String getClusterName() {
- return getAttribute(CLUSTER_NAME);
- }
-
- @Override
- public void init() {
- log.info("Initializing the ZooKeeper Ensemble");
- super.init();
-
- policy = policies().add(PolicySpec.create(MemberTrackingPolicy.class)
- .displayName("Members tracker")
- .configure("group", this));
- }
-
- public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
- @Override
- protected void onEntityChange(Entity member) {
- }
-
- @Override
- protected void onEntityAdded(Entity member) {
- if (member.getAttribute(ZooKeeperNode.MY_ID) == null) {
- ((EntityInternal) member).sensors().set(ZooKeeperNode.MY_ID, myId.incrementAndGet());
- }
- }
-
- @Override
- protected void onEntityRemoved(Entity member) {
- }
- };
-
- @Override
- protected void initEnrichers() {
- super.initEnrichers();
-
- }
-
- @Override
- public void start(Collection<? extends Location> locations) {
- super.start(locations);
-
- List<String> zookeeperServers = Lists.newArrayList();
- for (Entity zookeeper : getMembers()) {
- zookeeperServers.add(zookeeper.getAttribute(Attributes.HOSTNAME));
- }
- sensors().set(ZOOKEEPER_SERVERS, zookeeperServers);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
deleted file mode 100644
index 18cb6c6..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNode.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.ImplementedBy;
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensor;
-import org.apache.brooklyn.core.sensor.BasicAttributeSensorAndConfigKey;
-import org.apache.brooklyn.core.sensor.PortAttributeSensorAndConfigKey;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.util.core.flags.SetFromFlag;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single Apache ZooKeeper instance.
- */
-@Catalog(name="ZooKeeper Node", description="Apache ZooKeeper is a server which enables "
- + "highly reliable distributed coordination.")
-@ImplementedBy(ZooKeeperNodeImpl.class)
-public interface ZooKeeperNode extends SoftwareProcess {
-
- @SetFromFlag("version")
- ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.5");
- @SetFromFlag("zookeeperPort")
- PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
- @SetFromFlag("zookeeperLeaderPort")
- PortAttributeSensorAndConfigKey ZOOKEEPER_LEADER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.leader.port", "Zookeeper leader ports", "2888+");
- @SetFromFlag("zookeeperElectionPort")
- PortAttributeSensorAndConfigKey ZOOKEEPER_ELECTION_PORT = new PortAttributeSensorAndConfigKey("zookeeper.election.port", "Zookeeper election ports", "3888+");
- @SetFromFlag("downloadUrl")
- BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>(
- SoftwareProcess.DOWNLOAD_URL, "http://apache.fastbull.org/zookeeper/zookeeper-${version}/zookeeper-${version}.tar.gz");
- /**
- * Location of the ZK configuration file template to be copied to the server.
- */
- @SetFromFlag("zookeeperConfig")
- ConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = ConfigKeys.newStringConfigKey(
- "zookeeper.configTemplate", "Zookeeper configuration template (in freemarker format)",
- "classpath://org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg");
- AttributeSensor<Long> OUTSTANDING_REQUESTS = new BasicAttributeSensor<Long>(Long.class, "zookeeper.outstandingRequests", "Outstanding request count");
- AttributeSensor<Long> PACKETS_RECEIVED = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.received", "Total packets received");
- AttributeSensor<Long> PACKETS_SENT = new BasicAttributeSensor<Long>(Long.class, "zookeeper.packets.sent", "Total packets sent");
- AttributeSensor<Integer> MY_ID = new BasicAttributeSensor<Integer>(Integer.class, "zookeeper.myid", "ZooKeeper node's myId");
-
- Integer getZookeeperPort();
-
- String getHostname();
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
deleted file mode 100644
index 275e101..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperNodeImpl.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-/**
- * An {@link org.apache.brooklyn.api.entity.Entity} that represents a single standalone zookeeper instance.
- */
-public class ZooKeeperNodeImpl extends AbstractZooKeeperImpl implements ZooKeeperNode {
-
- public ZooKeeperNodeImpl() {}
-
- @Override
- public Class<?> getDriverInterface() {
- return ZooKeeperDriver.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java b/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
deleted file mode 100644
index fd1ba0b..0000000
--- a/brooklyn-library/software/messaging/src/main/java/org/apache/brooklyn/entity/zookeeper/ZooKeeperSshDriver.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.zookeeper;
-
-import static java.lang.String.format;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.entity.java.JavaSoftwareProcessSshDriver;
-import org.apache.brooklyn.location.ssh.SshMachineLocation;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.net.Networking;
-import org.apache.brooklyn.util.os.Os;
-import org.apache.brooklyn.util.ssh.BashCommands;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-public class ZooKeeperSshDriver extends JavaSoftwareProcessSshDriver implements ZooKeeperDriver {
-
- public ZooKeeperSshDriver(ZooKeeperNodeImpl entity, SshMachineLocation machine) {
- super(entity, machine);
- }
-
- @Override
- protected String getLogFileLocation() { return Os.mergePathsUnix(getRunDir(), "console.out"); }
-
- protected Map<String, Integer> getPortMap() {
- return MutableMap.of("zookeeperPort", getZooKeeperPort());
- }
-
- protected String getConfigFileName() {
- return entity.getConfig(ZooKeeperNode.ZOOKEEPER_CONFIG_TEMPLATE);
- }
-
- protected int getMyId() {
- return entity.getAttribute(ZooKeeperNode.MY_ID);
- }
-
- // FIXME All for one, and one for all! If any node fails then we're stuck waiting for its hostname/port forever.
- // Need a way to terminate the wait based on the entity going on-fire etc.
- // FIXME Race in getMemebers. Should we change DynamicCluster.grow to create the members and only then call start on them all?
- public List<ZooKeeperServerConfig> getZookeeperServers() throws ExecutionException, InterruptedException {
- ZooKeeperEnsemble ensemble = (ZooKeeperEnsemble) entity.getParent();
- List<ZooKeeperServerConfig> result = Lists.newArrayList();
-
- for (Entity member : ensemble.getMembers()) {
- Integer myid = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.MY_ID).get();
- String hostname = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.HOSTNAME).get();
- Integer port = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_PORT).get();
- Integer leaderPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_LEADER_PORT).get();
- Integer electionPort = Entities.attributeSupplierWhenReady(member, ZooKeeperNode.ZOOKEEPER_ELECTION_PORT).get();
- result.add(new ZooKeeperServerConfig(myid, hostname, port, leaderPort, electionPort));
- }
- return result;
- }
-
- @Override
- public Integer getZooKeeperPort() {
- return getEntity().getAttribute(ZooKeeperNode.ZOOKEEPER_PORT);
- }
-
- @Override
- public boolean isRunning() {
- return newScript(MutableMap.of(USE_PID_FILE, getPidFile()), CHECK_RUNNING).execute() == 0;
- }
-
- @Override
- public void stop() {
- newScript(ImmutableMap.of(USE_PID_FILE, getPidFile()), STOPPING).execute();
- }
-
- @Override
- public void preInstall() {
- resolver = Entities.newDownloader(this);
- setExpandedInstallDir(Os.mergePaths(getInstallDir(), resolver.getUnpackedDirectoryName(format("zookeeper-%s", getVersion()))));
- }
-
- @Override
- public void install() {
- List<String> urls = resolver.getTargets();
- String saveAs = resolver.getFilename();
-
- List<String> commands = ImmutableList.<String> builder()
- .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs))
- .add(BashCommands.INSTALL_TAR)
- .add("tar xzfv " + saveAs)
- .build();
-
- newScript(INSTALLING)
- .body.append(commands)
- .execute();
- }
-
- @Override
- public void customize() {
- log.debug("Customizing {}", entity);
- Networking.checkPortsValid(getPortMap());
- newScript(CUSTOMIZING)
- .body.append(
- format("cp -R %s/* .", getExpandedInstallDir()),
- format("mkdir %s/zookeeper", getRunDir()),
- format("echo %d > %s/zookeeper/myid", getMyId(), getRunDir())
- )
- .execute();
-
- String destinationConfigFile = Os.mergePathsUnix(getRunDir(), "conf/zoo.cfg");
- copyTemplate(getConfigFileName(), destinationConfigFile);
- }
-
- public String getPidFile() { return Os.mergePathsUnix(getRunDir(), "zookeeper.pid"); }
-
- @Override
- public void launch() {
- newScript(MutableMap.of(USE_PID_FILE, getPidFile()), LAUNCHING)
- .body.append(format("nohup java $JAVA_OPTS -cp zookeeper-%s.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain conf/zoo.cfg > %s 2>&1 &", getVersion(), getLogFileLocation()))
- .execute();
- }
-
- public static class ZooKeeperServerConfig {
- private final Integer myid;
- private final String hostname;
- private final Integer port;
- private final Integer leaderPort;
- private final Integer electionPort;
-
- public ZooKeeperServerConfig(Integer myid, String hostname, Integer port, Integer leaderPort, Integer electionPort) {
- this.myid = myid;
- this.hostname = hostname;
- this.port = port;
- this.leaderPort = leaderPort;
- this.electionPort = electionPort;
- }
-
- public Integer getMyid() { return myid; }
- public String getHostname() { return hostname; }
- public Integer getPort() { return port; }
- public Integer getLeaderPort() { return leaderPort; }
- public Integer getElectionPort() { return electionPort; }
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/RabbitMQLogo.png
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/RabbitMQLogo.png b/brooklyn-library/software/messaging/src/main/resources/RabbitMQLogo.png
deleted file mode 100644
index c5690ec..0000000
Binary files a/brooklyn-library/software/messaging/src/main/resources/RabbitMQLogo.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/activemq-logo.png
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/activemq-logo.png b/brooklyn-library/software/messaging/src/main/resources/activemq-logo.png
deleted file mode 100644
index d514448..0000000
Binary files a/brooklyn-library/software/messaging/src/main/resources/activemq-logo.png and /dev/null differ
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
deleted file mode 100644
index 52114d1..0000000
--- a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/activemq/activemq.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-[#ftl]
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- Based on standard file from ActiveMQ Version 5.7.0 -->
-<!-- START SNIPPET: example -->
-<beans
- xmlns="http://www.springframework.org/schema/beans"
- xmlns:amq="http://activemq.apache.org/schema/core"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
-
- <!-- Allows us to use system properties as variables in this configuration file -->
- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="locations">
- <value>file:[#noparse]${activemq.conf}[/#noparse]/credentials.properties</value>
- </property>
- </bean>
-
- <!--
- The <broker> element is used to configure the ActiveMQ broker.
- -->
- <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" brokerName="${entity.brokerName}" dataDirectory="[#noparse]${activemq.data}[/#noparse]">
-
- <!--
- For better performances use VM cursor and small memory limit.
- For more information, see:
-
- http://activemq.apache.org/message-cursors.html
-
- Also, if your producer is "hanging", it's probably due to producer flow control.
- For more information, see:
- http://activemq.apache.org/producer-flow-control.html
- -->
-
- <destinationPolicy>
- <policyMap>
- <policyEntries>
- <policyEntry topic=">" producerFlowControl="true">
- <!-- The constantPendingMessageLimitStrategy is used to prevent
- slow topic consumers to block producers and affect other consumers
- by limiting the number of messages that are retained
- For more information, see:
-
- http://activemq.apache.org/slow-consumer-handling.html
-
- -->
- <pendingMessageLimitStrategy>
- <constantPendingMessageLimitStrategy limit="1000"/>
- </pendingMessageLimitStrategy>
- </policyEntry>
- <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
- <!-- Use VM cursor for better latency
- For more information, see:
-
- http://activemq.apache.org/message-cursors.html
-
- <pendingQueuePolicy>
- <vmQueueCursor/>
- </pendingQueuePolicy>
- -->
- </policyEntry>
- </policyEntries>
- </policyMap>
- </destinationPolicy>
-
-
- <!--
- The managementContext is used to configure how ActiveMQ is exposed in
- JMX. By default, ActiveMQ uses the MBean server that is started by
- the JVM. For more information, see:
-
- http://activemq.apache.org/jmx.html
- -->
- <managementContext>
- [#if entity.jmxPort > 0]
- <managementContext connectorPort="${entity.jmxPort?c}"/>
- [#else]
- <managementContext createConnector="false"/>
- [/#if]
- </managementContext>
-
- <!--
- Configure message persistence for the broker. The default persistence
- mechanism is the KahaDB store (identified by the kahaDB tag).
- For more information, see:
-
- http://activemq.apache.org/persistence.html
- -->
- <persistenceAdapter>
- <kahaDB directory="[#noparse]${activemq.data}[/#noparse]/kahadb"/>
- </persistenceAdapter>
-
-
- <!--
- The systemUsage controls the maximum amount of space the broker will
- use before slowing down producers. For more information, see:
- http://activemq.apache.org/producer-flow-control.html
- If using ActiveMQ embedded - the following limits could safely be used:
-
- <systemUsage>
- <systemUsage>
- <memoryUsage>
- <memoryUsage limit="20 mb"/>
- </memoryUsage>
- <storeUsage>
- <storeUsage limit="1 gb"/>
- </storeUsage>
- <tempUsage>
- <tempUsage limit="100 mb"/>
- </tempUsage>
- </systemUsage>
- </systemUsage>
- -->
- <systemUsage>
- <systemUsage>
- <memoryUsage>
- <memoryUsage limit="64 mb"/>
- </memoryUsage>
- <storeUsage>
- <storeUsage limit="100 gb"/>
- </storeUsage>
- <tempUsage>
- <tempUsage limit="50 gb"/>
- </tempUsage>
- </systemUsage>
- </systemUsage>
-
- <!--
- The transport connectors expose ActiveMQ over a given protocol to
- clients and other brokers. For more information, see:
-
- http://activemq.apache.org/configuring-transports.html
- -->
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
- <transportConnector name="openwire" uri="tcp://0.0.0.0:${entity.openWirePort?c}?maximumConnections=1000&wireformat.maxFrameSize=104857600"/>
- </transportConnectors>
-
- <!-- destroy the spring context on shutdown to stop jetty -->
- <shutdownHooks>
- <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
- </shutdownHooks>
-
- </broker>
-
- <!--
- Enable web consoles, REST and Ajax APIs and demos
-
- Take a look at [#noparse]${ACTIVEMQ_HOME}[/#noparse]/conf/jetty.xml for more details
- -->
- <import resource="jetty.xml"/>
-
-</beans>
-<!-- END SNIPPET: example -->
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg
deleted file mode 100644
index d600ef5..0000000
Binary files a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/kafka-google-doorway.jpg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
deleted file mode 100644
index feb871f..0000000
--- a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/server.properties
+++ /dev/null
@@ -1,112 +0,0 @@
-[#ftl]
-#
-##
-# KafkaBroker configuration template for Brooklyn
-#
-# see kafka.server.KafkaConfig for additional details and defaults
-##
-
-############################# Server Basics #############################
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=${entity.brokerId?c}
-
-############################# Socket Server Settings #############################
-
-# The port the socket server listens on
-port=${entity.kafkaPort?c}
-
-# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-host.name=${driver.hostname}
-
-# Hostname the broker will advertise to producers and consumers. If not set, it uses the
-# value for "host.name" if configured. Otherwise, it will use the value returned from
-# java.net.InetAddress.getCanonicalHostName().
-#advertised.host.name=<hostname routable by clients>
-
-# The port to publish to ZooKeeper for clients to use. If this is not set,
-# it will publish the same port that the broker binds to.
-#advertised.port=<port accessible by clients>
-
-# The number of threads handling network requests
-num.network.threads=3
-
-# The number of threads doing disk I/O
-num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
-max.socket.request.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# The directory under which to store log files
-log.dir=${driver.runDir}/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
-num.recovery.threads.per.data.dir=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk.
-# There are a few important trade-offs here:
-# 1. Durability: Unflushed data may be lost if you are not using replication.
-# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-log.flush.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
-# segments don't drop below log.retention.bytes.
-#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted according
-# to the retention policies
-log.retention.check.interval.ms=300000
-
-# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
-# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
-log.cleaner.enable=false
-
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=${entity.zookeeper.hostname}:${entity.zookeeper.zookeeperPort?c}
-
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=1000000
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
deleted file mode 100644
index 646d2f1..0000000
--- a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/kafka/zookeeper.properties
+++ /dev/null
@@ -1,13 +0,0 @@
-[#ftl]
-#
-
-##
-# KafkaZookeeper configuration template for Brooklyn
-##
-
-# the directory where the snapshot is stored.
-dataDir=${driver.runDir}/zookeeper
-# the port at which the clients will connect
-clientPort=${entity.zookeeperPort?c}
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
deleted file mode 100644
index c350b65..0000000
--- a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/rabbit/rabbitmq.config
+++ /dev/null
@@ -1,6 +0,0 @@
-[
-<#if entity.enableManagementPlugin>
- {rabbit, [{loopback_users, []}]},
- {rabbitmq_mochiweb, [{listeners, [{mgmt, [{port, ${entity.managementPort?c}}]}]}]}
-</#if>
-].
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
deleted file mode 100644
index 99f0c71..0000000
--- a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/storm/storm.yaml
+++ /dev/null
@@ -1,39 +0,0 @@
-[#ftl]
-#
-# Storm Configuration
-[#if driver.zookeeperServers?has_content]
- storm.zookeeper.servers:
-[#list driver.zookeeperServers as zkServer]
- - "${zkServer}"
-[/#list]
-[/#if]
-
- storm.local.dir: "${driver.localDir}"
-
-### ui.* configs are for the master
- ui.port: ${driver.uiPort?c}
- ui.childopts: "-Xmx768m"
-
-[#if driver.roleName == "ui"]
- nimbus.host: "${driver.nimbusHostname}"
-[/#if]
-
- nimbus.childopts: " ${driver.jvmOptsLine}"
- worker.childopts: " ${driver.jvmOptsLine}"
- supervisor.childopts: " ${driver.jvmOptsLine}"
-
-# ##### These may optionally be filled in:
-#
-## List of custom serializations
-# topology.kryo.register:
-# - org.mycompany.MyType
-# - org.mycompany.MyType2: org.mycompany.MyType2Serializer
-#
-## List of custom kryo decorators
-# topology.kryo.decorators:
-# - org.mycompany.MyDecorator
-#
-## Locations of the drpc servers
-# drpc.servers:
-# - "server1"
-# - "server2"
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg b/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
deleted file mode 100644
index 79721a6..0000000
--- a/brooklyn-library/software/messaging/src/main/resources/org/apache/brooklyn/entity/messaging/zookeeper/zoo.cfg
+++ /dev/null
@@ -1,42 +0,0 @@
-[#ftl]
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-##
-# ZooKeeper configuration template for Brooklyn
-##
-
-# The number of milliseconds of each tick
-tickTime=2000
-# The number of ticks that the initial
-# synchronization phase can take
-initLimit=10
-# The number of ticks that can pass between
-# sending a request and getting an acknowledgement
-syncLimit=5
-# the directory where the snapshot is stored.
-dataDir=${driver.runDir}/zookeeper
-# the port at which the clients will connect
-clientPort=${entity.zookeeperPort?c}
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
-
-[#list driver.zookeeperServers as zkServer]
-server.${zkServer.myid?c}=${zkServer.hostname}:${zkServer.leaderPort?c}:${zkServer.electionPort?c}
-[/#list]
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/qpid-logo.jpeg
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/qpid-logo.jpeg b/brooklyn-library/software/messaging/src/main/resources/qpid-logo.jpeg
deleted file mode 100644
index 91e7004..0000000
Binary files a/brooklyn-library/software/messaging/src/main/resources/qpid-logo.jpeg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/main/resources/redis-logo.jpeg
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/main/resources/redis-logo.jpeg b/brooklyn-library/software/messaging/src/main/resources/redis-logo.jpeg
deleted file mode 100644
index a14894b..0000000
Binary files a/brooklyn-library/software/messaging/src/main/resources/redis-logo.jpeg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
deleted file mode 100644
index 1210ef5..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQEc2LiveTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.activemq;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class ActiveMQEc2LiveTest extends AbstractEc2LiveTest {
-
- /**
- * Test that can install+start, and use, ActiveMQ.
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- String queueName = "testQueue";
- int number = 10;
- String content = "01234567890123456789012345678901";
-
- // Start broker with a configured queue
- ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
-
- app.start(ImmutableList.of(loc));
-
- EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
-
- // Check queue created
- assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
- assertEquals(activeMQ.getChildren().size(), 1);
- assertEquals(activeMQ.getQueues().size(), 1);
-
- // Get the named queue entity
- ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
- assertNotNull(queue);
-
- // Connect to broker using JMS and send messages
- Connection connection = getActiveMQConnection(activeMQ);
- clearQueue(connection, queueName);
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
- sendMessages(connection, number, queueName, content);
-
- // Check messages arrived
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
- connection.close();
- }
-
- private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
- int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
- String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
- Connection connection = factory.createConnection("admin", "activemq");
- connection.start();
- return connection;
- }
-
- private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageProducer messageProducer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage(content);
- messageProducer.send(message);
- }
-
- session.close();
- }
-
- private int clearQueue(Connection connection, String queueName) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- int received = 0;
- while (messageConsumer.receive(500) != null) received++;
-
- session.close();
-
- return received;
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
deleted file mode 100644
index f865cb1..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQGoogleComputeLiveTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.activemq;
-
-import com.google.common.collect.ImmutableList;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.entity.AbstractGoogleComputeLiveTest;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-public class ActiveMQGoogleComputeLiveTest extends AbstractGoogleComputeLiveTest {
-
- /**
- * Test that can install+start, and use, ActiveMQ.
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- String queueName = "testQueue";
- int number = 10;
- String content = "01234567890123456789012345678901";
-
- // Start broker with a configured queue
- ActiveMQBroker activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class).configure("queue", queueName));
-
- app.start(ImmutableList.of(loc));
-
- EntityTestUtils.assertAttributeEqualsEventually(activeMQ, Startable.SERVICE_UP, true);
-
- // Check queue created
- assertEquals(ImmutableList.copyOf(activeMQ.getQueueNames()), ImmutableList.of(queueName));
- assertEquals(activeMQ.getChildren().size(), 1);
- assertEquals(activeMQ.getQueues().size(), 1);
-
- // Get the named queue entity
- ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
- assertNotNull(queue);
-
- // Connect to broker using JMS and send messages
- Connection connection = getActiveMQConnection(activeMQ);
- clearQueue(connection, queueName);
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
- sendMessages(connection, number, queueName, content);
-
- // Check messages arrived
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
- connection.close();
- }
-
- private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
- int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
- String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(String.format("tcp://%s:%s", address, port));
- Connection connection = factory.createConnection("admin", "activemq");
- connection.start();
- return connection;
- }
-
- private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageProducer messageProducer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage(content);
- messageProducer.send(message);
- }
-
- session.close();
- }
-
- private int clearQueue(Connection connection, String queueName) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- org.apache.activemq.command.ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue(queueName);
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- int received = 0;
- while (messageConsumer.receive(500) != null) received++;
-
- session.close();
-
- return received;
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}