You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:56 UTC

[20/51] [abbrv] [partial] brooklyn-library git commit: move subdir from incubator up a level as it is promoted to its own repo (first non-incubator commit!)

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
deleted file mode 100644
index 7183c15..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.activemq;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.entity.java.UsesJmx;
-import org.apache.brooklyn.entity.java.UsesJmx.JmxAgentModes;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Test the operation of the {@link ActiveMQBroker} class.
- */
-public class ActiveMQIntegrationTest {
-    private static final Logger log = LoggerFactory.getLogger(ActiveMQIntegrationTest.class);
-
-    private TestApplication app;
-    private Location testLocation;
-    private ActiveMQBroker activeMQ;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() throws Exception {
-        app = ApplicationBuilder.newManagedApp(TestApplication.class);
-        testLocation = app.newLocalhostProvisioningLocation();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void shutdown() throws Exception {
-        if (app != null) Entities.destroyAll(app.getManagementContext());
-    }
-
-    /**
-     * Test that the broker starts up and sets SERVICE_UP correctly.
-     */
-    @Test(groups = "Integration")
-    public void canStartupAndShutdown() throws Exception {
-        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
-
-        activeMQ.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
-        log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
-        activeMQ.stop();
-        assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that the broker starts up and sets SERVICE_UP correctly,
-     * when a jmx port is supplied
-     */
-    @Test(groups = "Integration")
-    public void canStartupAndShutdownWithCustomJmx() throws Exception {
-        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
-            .configure("jmxPort", "11099+"));
-       
-        activeMQ.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
-        log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
-        activeMQ.stop();
-        assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
-    }
-
-    @Test(groups = "Integration")
-    public void canStartupAndShutdownWithCustomBrokerName() throws Exception {
-        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
-            .configure("jmxPort", "11099+")
-            .configure("brokerName", "bridge"));
-
-        activeMQ.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
-        log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
-        activeMQ.stop();
-        assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
-    }
-
-    
-    @Test(groups = "Integration")
-    public void canStartTwo() throws Exception {
-        ActiveMQBroker activeMQ1 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
-        ActiveMQBroker activeMQ2 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
-
-        activeMQ1.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ1, Startable.SERVICE_UP, true);
-        log.info("JMX URL is "+activeMQ1.getAttribute(UsesJmx.JMX_URL));
-
-        activeMQ2.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ2, Startable.SERVICE_UP, true);
-        log.info("JMX URL is "+activeMQ2.getAttribute(UsesJmx.JMX_URL));
-    }
-
-    /**
-     * Test that setting the 'queue' property causes a named queue to be created.
-     */
-    @Test(groups = "Integration")
-    public void testCreatingQueuesDefault() throws Exception {
-        String url = testCreatingQueuesInternal(null);
-        // localhost default is jmxmp
-        Assert.assertTrue(url.contains("jmxmp"), "url="+url);
-    }
-
-    @Test(groups = "Integration")
-    public void testCreatingQueuesRmi() throws Exception {
-        String url = testCreatingQueuesInternal(JmxAgentModes.JMX_RMI_CUSTOM_AGENT);
-        Assert.assertTrue(url.contains("rmi://"), "url="+url);
-        Assert.assertFalse(url.contains("rmi:///jndi"), "url="+url);
-        Assert.assertFalse(url.contains("jmxmp"), "url="+url);
-    }
-
-    @Test(groups = "Integration")
-    public void testCreatingQueuesJmxmp() throws Exception {
-        String url = testCreatingQueuesInternal(JmxAgentModes.JMXMP);
-        // localhost default is rmi
-        Assert.assertTrue(url.contains("jmxmp"), "url="+url);
-        Assert.assertFalse(url.contains("rmi"), "url="+url);
-    }
-
-    @Test(groups = "Integration")
-    public void testCreatingQueuesNoAgent() throws Exception {
-        String url = testCreatingQueuesInternal(JmxAgentModes.NONE);
-        // localhost default is rmi
-        Assert.assertTrue(url.contains("service:jmx:rmi"), "url="+url);
-        Assert.assertFalse(url.contains("jmxmp"), "url="+url);
-    }
-
-    public String testCreatingQueuesInternal(JmxAgentModes mode) throws Exception {
-        String queueName = "testQueue";
-        int number = 20;
-        String content = "01234567890123456789012345678901";
-
-        // Start broker with a configured queue
-        // FIXME Not yet using app.createAndManageChild because later in test do activeMQ.queueNames,
-        // which is not on interface
-        activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
-            .configure("queue", queueName)
-            .configure(UsesJmx.JMX_AGENT_MODE, mode));
-        
-        activeMQ.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
-
-        String jmxUrl = activeMQ.getAttribute(UsesJmx.JMX_URL);
-        log.info("JMX URL ("+mode+") is "+jmxUrl);
-        
-        try {
-            // Check queue created
-            assertFalse(activeMQ.getQueueNames().isEmpty());
-            assertEquals(activeMQ.getQueueNames().size(), 1);
-            assertTrue(activeMQ.getQueueNames().contains(queueName));
-            assertEquals(activeMQ.getChildren().size(), 1);
-            assertFalse(activeMQ.getQueues().isEmpty());
-            assertEquals(activeMQ.getQueues().size(), 1);
-
-            // Get the named queue entity
-            ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
-            assertNotNull(queue);
-            assertEquals(queue.getName(), queueName);
-
-            // Connect to broker using JMS and send messages
-            Connection connection = getActiveMQConnection(activeMQ);
-            clearQueue(connection, queueName);
-            EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
-            sendMessages(connection, number, queueName, content);
-            // Check messages arrived
-            EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
-            // Clear the messages
-            assertEquals(clearQueue(connection, queueName), number);
-
-            // Check messages cleared
-            EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
-
-            connection.close();
-
-            // Close the JMS connection
-        } finally {
-            // Stop broker
-            activeMQ.stop();
-        }
-        
-        return jmxUrl;
-    }
-
-    private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
-        int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
-        String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://"+address+":"+port);
-        Connection connection = factory.createConnection("admin", "activemq");
-        connection.start();
-        return connection;
-    }
-
-    private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(queueName);
-        MessageProducer messageProducer = session.createProducer(destination);
-
-        for (int i = 0; i < count; i++) {
-            TextMessage message = session.createTextMessage(content);
-            messageProducer.send(message);
-        }
-
-        session.close();
-    }
-
-    private int clearQueue(Connection connection, String queueName) throws Exception {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(queueName);
-        MessageConsumer messageConsumer = session.createConsumer(destination);
-
-        int received = 0;
-        while (messageConsumer.receive(500) != null) received++;
-
-        session.close();
-        
-        return received;
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
deleted file mode 100644
index 744b2d5..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.kafka;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.location.LocationSpec;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.time.Duration;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.messaging.activemq.ActiveMQBroker;
-import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Test the operation of the {@link ActiveMQBroker} class.
- *
- * TODO test that sensors update.
- */
-public class KafkaIntegrationTest {
-
-    private TestApplication app;
-    private Location testLocation;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() {
-        app = ApplicationBuilder.newManagedApp(TestApplication.class);
-        LocationSpec<LocalhostMachineProvisioningLocation> locationSpec = LocationSpec.create(LocalhostMachineProvisioningLocation.class);
-        testLocation = app.getManagementContext().getLocationManager().createLocation(locationSpec);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void shutdown() {
-        if (app != null) Entities.destroyAll(app.getManagementContext());
-    }
-
-    /**
-     * Test that we can start a zookeeper.
-     */
-    @Test(groups = "Integration")
-    public void testZookeeper() {
-        final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
-
-        zookeeper.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true);
-
-        zookeeper.stop();
-        assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that we can start a  broker and zookeeper together.
-     */
-    @Test(groups = "Integration")
-    public void testBrokerPlusZookeeper() {
-        final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
-        final KafkaBroker broker = app.createAndManageChild(EntitySpec.create(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
-
-        zookeeper.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true);
-
-        broker.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), broker, Startable.SERVICE_UP, true);
-
-        zookeeper.stop();
-        assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
-
-        broker.stop();
-        assertFalse(broker.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that we can start a cluster with zookeeper and one broker.
-     *
-     * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
-     */
-    @Test(groups = "Integration")
-    public void testTwoBrokerCluster() throws InterruptedException {
-        final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
-                .configure(KafkaCluster.INITIAL_SIZE, 2));
-
-        cluster.start(ImmutableList.of(testLocation));
-        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() {
-            @Override
-            public Void call() {
-                assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
-                assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP));
-                assertEquals(cluster.getCurrentSize().intValue(), 2);
-                return null;
-            }
-        });
-
-        Entities.dumpInfo(cluster);
-
-        final KafkaSupport support = new KafkaSupport(cluster);
-
-        support.sendMessage("brooklyn", "TEST_MESSAGE");
-
-        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable() {
-            @Override
-            public void run() {
-                String message = support.getMessage("brooklyn");
-                assertEquals(message, "TEST_MESSAGE");
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
deleted file mode 100644
index 096bdeb..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.kafka;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableMap;
-
-import com.google.common.collect.ImmutableList;
-
-public class KafkaLiveTest extends AbstractEc2LiveTest {
-
-    /**
-     * Test that can install, start and use a Kafka cluster with two brokers.
-     */
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
-                .configure("startTimeout", 300) // 5 minutes
-                .configure("initialSize", 2));
-        app.start(ImmutableList.of(loc));
-
-        Asserts.succeedsEventually(MutableMap.of("timeout", 300000l), new Callable<Void>() {
-            @Override
-            public Void call() {
-                assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
-                assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP));
-                assertEquals(cluster.getCurrentSize().intValue(), 2);
-                return null;
-            }
-        });
-
-        Entities.dumpInfo(cluster);
-
-        KafkaSupport support = new KafkaSupport(cluster);
-
-        support.sendMessage("brooklyn", "TEST_MESSAGE");
-        String message = support.getMessage("brooklyn");
-        assertEquals(message, "TEST_MESSAGE");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
deleted file mode 100644
index f385e10..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.kafka;
-
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.core.entity.EntityPredicates;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.security.InvalidParameterException;
-import java.util.Properties;
-
-import static java.lang.String.format;
-
-/**
- * Kafka test framework for integration and live tests, using the Kafka Java API.
- */
-public class KafkaSupport {
-
-    private final KafkaCluster cluster;
-
-    public KafkaSupport(KafkaCluster cluster) {
-        this.cluster = cluster;
-    }
-
-    /**
-     * Send a message to the {@link KafkaCluster} on the given topic.
-     */
-    public void sendMessage(String topic, String message) {
-        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
-                Predicates.instanceOf(KafkaBroker.class),
-                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
-        if (anyBrokerNodeInCluster.isPresent()) {
-            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
-
-            Properties props = new Properties();
-
-            props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
-            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
-            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-            Producer<String, String> producer = new KafkaProducer<>(props);
-            ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
-
-            ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
-            producer.send(data);
-            producer.close();
-        } else {
-            throw new InvalidParameterException("No kafka broker node found");
-        }
-    }
-
-    /**
-     * Retrieve the next message on the given topic from the {@link KafkaCluster}.
-     */
-    public String getMessage(String topic) {
-        ZooKeeperNode zookeeper = cluster.getZooKeeper();
-        Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
-                Predicates.instanceOf(KafkaBroker.class),
-                EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
-        if (anyBrokerNodeInCluster.isPresent()) {
-            KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
-
-            Properties props = new Properties();
-
-            props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
-            props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort()));
-            props.put("group.id", "brooklyn");
-            props.put("partition.assignment.strategy", "RoundRobin");
-            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
-            KafkaConsumer consumer = new KafkaConsumer(props);
-
-            consumer.subscribe(topic);
-            // FIXME unimplemented KafkaConsumer.poll
-//            Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic);
-            return "TEST_MESSAGE";
-        } else {
-            throw new InvalidParameterException("No kafka broker node found");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
deleted file mode 100644
index 6d793c8..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.qpid;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class QpidEc2LiveTest extends AbstractEc2LiveTest {
-
-    // TODO Also check can connect (e.g. to send/receive messages)
-    
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        QpidBroker qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
-                .configure("jmxPort", "9909+")
-                .configure("rmiRegistryPort", "9910+"));
-        
-        qpid.start(ImmutableList.of(loc));
-        EntityTestUtils.assertAttributeEqualsEventually(qpid, QpidBroker.SERVICE_UP, true);
-    }
-    
-    @Test(enabled=false)
-    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
deleted file mode 100644
index 977b934..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.qpid;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.apache.brooklyn.test.HttpTestUtils;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.configuration.ClientProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Test the operation of the {@link QpidBroker} class.
- */
-public class QpidIntegrationTest {
-    private static final Logger log = LoggerFactory.getLogger(QpidIntegrationTest.class);
-
-    private TestApplication app;
-    private Location testLocation;
-    private QpidBroker qpid;
-
-    @BeforeMethod(groups = "Integration")
-    public void setup() {
-        String workingDir = System.getProperty("user.dir");
-        log.info("Qpid working dir: {}", workingDir);
-        app = ApplicationBuilder.newManagedApp(TestApplication.class);
-        testLocation = app.newLocalhostProvisioningLocation();
-    }
-
-    @AfterMethod(alwaysRun=true)
-    public void shutdown() {
-        if (app != null) Entities.destroyAll(app.getManagementContext());
-    }
-
-    /**
-     * Test that the broker starts up with JMX and RMI ports configured, and sets SERVICE_UP correctly.
-     */
-    @Test(groups = "Integration")
-    public void canStartupAndShutdown() {
-        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
-                .configure("jmxPort", "9909+")
-                .configure("rmiRegistryPort", "9910+"));
-        qpid.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
-        qpid.stop();
-        assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that the broker starts up with HTTP management enabled, and we can connect to the URL.
-     */
-    @Test(groups = "Integration")
-    public void canStartupAndShutdownWithHttpManagement() {
-        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
-                .configure("httpManagementPort", "8888+"));
-        qpid.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
-        String httpUrl = "http://"+qpid.getAttribute(QpidBroker.HOSTNAME)+":"+qpid.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT)+"/management";
-        HttpTestUtils.assertHttpStatusCodeEventuallyEquals(httpUrl, 200);
-        // TODO check actual REST output
-        qpid.stop();
-        assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that the broker starts up and sets SERVICE_UP correctly when plugins are configured.
-     * 
-     * FIXME the custom plugin was written against qpid 0.14, so that's the version we need to run
-     * this test against. However, v0.14 is no longer available from the download site.
-     * We should update this plugin so it works with the latest qpid.
-     */
-    @Test(enabled = false, groups = "Integration")
-    public void canStartupAndShutdownWithPlugin() {
-        Map<String,String> qpidRuntimeFiles = MutableMap.<String,String>builder()
-                .put("classpath://qpid-test-config.xml", "etc/config.xml")
-                .put("http://developers.cloudsoftcorp.com/brooklyn/repository-test/0.7.0/QpidBroker/qpid-test-plugin.jar", "lib/plugins/sample-plugin.jar")
-                .build();
-        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
-                .configure(SoftwareProcess.RUNTIME_FILES, qpidRuntimeFiles)
-                .configure(QpidBroker.SUGGESTED_VERSION, "0.14"));
-        qpid.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
-        qpid.stop();
-        assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that setting the 'queue' property causes a named queue to be created.
-     *
-     * This test is disabled, pending further investigation. Issue with AMQP 0-10 queue names.
-     * 
-     * FIXME disabled becausing failing in jenkins CI (in QpidIntegrationTest.getQpidConnection()).
-     *     url=amqp://admin:********@brooklyn/localhost?brokerlist='tcp://localhost:5672'
-     * Was previously enabled, dispite comment above about "test is disabled".	
-     */
-    @Test(enabled = false, groups = { "Integration", "WIP" })
-    public void testCreatingQueues() {
-        final String queueName = "testQueue";
-        final int number = 20;
-        final String content = "01234567890123456789012345678901";
-
-        // Start broker with a configured queue
-        // FIXME Can't use app.createAndManageChild, because of QpidDestination reffing impl directly
-        qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
-                .configure("queue", queueName));
-        qpid.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
-
-        try {
-            // Check queue created
-            assertFalse(qpid.getQueueNames().isEmpty());
-            assertEquals(qpid.getQueueNames().size(), 1);
-            assertTrue(qpid.getQueueNames().contains(queueName));
-            assertEquals(qpid.getChildren().size(), 1);
-            assertFalse(qpid.getQueues().isEmpty());
-            assertEquals(qpid.getQueues().size(), 1);
-
-            // Get the named queue entity
-            final QpidQueue queue = qpid.getQueues().get(queueName);
-            assertNotNull(queue);
-
-            // Connect to broker using JMS and send messages
-            Connection connection = getQpidConnection(qpid);
-            clearQueue(connection, queue.getQueueName());
-            Asserts.succeedsEventually(new Runnable() {
-                @Override
-                public void run() {
-                    assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(0));
-                }
-            });
-            sendMessages(connection, number, queue.getQueueName(), content);
-
-            // Check messages arrived
-            Asserts.succeedsEventually(new Runnable() {
-                @Override
-                public void run() {
-                    assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(number));
-                    assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), Integer.valueOf(number * content.length()));
-                }
-            });
-
-            //TODO clearing the queue currently returns 0
-//            // Clear the messages -- should get 20
-//            assertEquals clearQueue(connection, queue.queueName), 20
-//
-//            // Check messages cleared
-//            executeUntilSucceeds {
-//                assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), 0
-//                assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), 0
-//            }
-
-            // Close the JMS connection
-            connection.close();
-        } catch (JMSException jmse) {
-            log.warn("JMS exception caught", jmse);
-            throw Exceptions.propagate(jmse);
-        } finally {
-            // Stop broker
-            qpid.stop();
-            qpid = null;
-            app = null;
-        }
-    }
-
-    private Connection getQpidConnection(QpidBroker qpid) {
-        int port = qpid.getAttribute(Attributes.AMQP_PORT);
-        System.setProperty(ClientProperties.AMQP_VERSION, "0-10");
-        System.setProperty(ClientProperties.DEST_SYNTAX, "ADDR");
-        String connectionUrl = String.format("amqp://admin:admin@brooklyn/localhost?brokerlist='tcp://localhost:%d'", port);
-        try {
-            AMQConnectionFactory factory = new AMQConnectionFactory(connectionUrl);
-            Connection connection = factory.createConnection();
-            connection.start();
-            return connection;
-        } catch (Exception e) {
-            log.error(String.format("Error connecting to qpid: %s", connectionUrl), e);
-            throw Exceptions.propagate(e);
-        }
-    }
-
-    private void sendMessages(Connection connection, int count, String queueName, String content) throws JMSException {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(queueName);
-        MessageProducer messageProducer = session.createProducer(destination);
-
-        for (int i = 0; i < count; i++) {
-            TextMessage message = session.createTextMessage(content);
-            messageProducer.send(message);
-        }
-
-        session.close();
-    }
-
-    private int clearQueue(Connection connection, String queueName) throws JMSException {
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue destination = session.createQueue(queueName);
-        MessageConsumer messageConsumer = session.createConsumer(destination);
-
-        int received = 0;
-        while (messageConsumer.receive(500) != null) received++;
-
-        session.close();
-
-        return received;
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
deleted file mode 100644
index d6959b8..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.rabbit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityAsserts;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
-import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.entity.messaging.MessageBroker;
-import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-
-public class RabbitEc2LiveTest extends AbstractEc2LiveTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RabbitEc2LiveTest.class);
-
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        RabbitBroker rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
-        rabbit.start(ImmutableList.of(loc));
-        EntityTestUtils.assertAttributeEqualsEventually(rabbit, RabbitBroker.SERVICE_UP, true);
-
-        byte[] content = "MessageBody".getBytes(Charsets.UTF_8);
-        String queue = "queueName";
-        Channel producer = null;
-        Channel consumer = null;
-        try {
-            producer = getAmqpChannel(rabbit);
-            consumer = getAmqpChannel(rabbit);
-
-            producer.queueDeclare(queue, true, false, false, Maps.<String,Object>newHashMap());
-            producer.queueBind(queue, AmqpExchange.DIRECT, queue);
-            producer.basicPublish(AmqpExchange.DIRECT, queue, null, content);
-            
-            QueueingConsumer queueConsumer = new QueueingConsumer(consumer);
-            consumer.basicConsume(queue, true, queueConsumer);
-        
-            QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery();
-            assertEquals(delivery.getBody(), content);
-        } finally {
-            if (producer != null) producer.close();
-            if (consumer != null) consumer.close();
-        }
-    }
-
-    private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception {
-        String uri = rabbit.getAttribute(MessageBroker.BROKER_URL);
-        LOG.warn("connecting to rabbit {}", uri);
-        ConnectionFactory factory = new ConnectionFactory();
-        factory.setUri(uri);
-        Connection conn = factory.newConnection();
-        Channel channel = conn.createChannel();
-        return channel;
-    }
-
-    @Override
-    public void test_CentOS_5() throws SkipException {
-        // Not supported. The EPEL repository described here at [1] does not contain erlang, and the
-        // Erlang repository at [1] requires old versions of rpmlib. Additionally, [2] suggests that
-        // Centos 5 is not supported
-        // [1]:http://www.rabbitmq.com/install-rpm.html
-        // [2]: https://www.erlang-solutions.com/downloads/download-erlang-otp
-        throw new SkipException("Centos 5 is not supported");
-    }
-
-    @Test(groups = {"Live"})
-    public void testWithOnlyPort22() throws Exception {
-        // CentOS-6.3-x86_64-GA-EBS-02-85586466-5b6c-4495-b580-14f72b4bcf51-ami-bb9af1d2.1
-        jcloudsLocation = mgmt.getLocationRegistry().resolve(LOCATION_SPEC, ImmutableMap.of(
-                "tags", ImmutableList.of(getClass().getName()),
-                "imageId", "us-east-1/ami-a96b01c0", 
-                "hardwareId", SMALL_HARDWARE_ID));
-
-        final RabbitBroker server = app.createAndManageChild(EntitySpec.create(RabbitBroker.class)
-                .configure(RabbitBroker.PROVISIONING_PROPERTIES.subKey(CloudLocationConfig.INBOUND_PORTS.getName()), ImmutableList.of(22)));
-        
-        app.start(ImmutableList.of(jcloudsLocation));
-        
-        EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_UP, true);
-        EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
-        
-        Integer port = server.getAttribute(RabbitBroker.AMQP_PORT);
-        assertNotNull(port);
-        
-        assertViaSshLocalPortListeningEventually(server, port);
-    }
-    
-    @Test(enabled=false)
-    public void testDummy() {} // Convince testng IDE integration that this really does have test methods  
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
deleted file mode 100644
index fc59dec..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.rabbit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-
-import java.io.IOException;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.messaging.MessageBroker;
-import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
-import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-
-/**
- * Test the operation of the {@link RabbitBroker} class.
- * 
- * TODO If you're having problems running this test successfully, here are a few tips:
- * 
- *  - Is `erl` on your path for a non-interactive ssh session?
- *    Look in rabbit's $RUN_DIR/console-err.log (e.g. /tmp/brooklyn-aled/apps/someappid/entities/RabbitBroker_2.8.7_JROYTcSL/console-err.log)
- *    I worked around that by adding to my ~/.brooklyn/brooklyn.properties:
- *      brooklyn.ssh.config.scriptHeader=#!/bin/bash -e\nif [ -f ~/.bashrc ] ; then . ~/.bashrc ; fi\nif [ -f ~/.profile ] ; then . ~/.profile ; fi\necho $PATH > /tmp/mypath.txt
- *    
- *  - Is the hostname resolving properly?
- *    Look in $RUN_DIR/console-out.log; is there a message like:
- *      ERROR: epmd error for host "Aleds-MacBook-Pro": timeout (timed out establishing tcp connection)
- *    I got around that with disabling my wifi and running when not connected to the internet.
- */
-public class RabbitIntegrationTest {
-    private static final Logger log = LoggerFactory.getLogger(RabbitIntegrationTest.class);
-
-    private TestApplication app;
-    private Location testLocation;
-    private RabbitBroker rabbit;
-
-    @BeforeMethod(groups = "Integration")
-    public void setup() {
-        app = ApplicationBuilder.newManagedApp(TestApplication.class);
-        testLocation = new LocalhostMachineProvisioningLocation();
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void shutdown() {
-        if (app != null) Entities.destroyAll(app.getManagementContext());
-    }
-
-    /**
-     * Test that the broker starts up and sets SERVICE_UP correctly.
-     */
-    @Test(groups = {"Integration", "WIP"})
-    public void canStartupAndShutdown() throws Exception {
-        rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
-        rabbit.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true);
-        rabbit.stop();
-        assertFalse(rabbit.getAttribute(Startable.SERVICE_UP));
-    }
-
-    /**
-     * Test that an AMQP client can connect to and use the broker.
-     */
-    @Test(groups = {"Integration", "WIP"})
-    public void testClientConnection() throws Exception {
-        rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
-        rabbit.start(ImmutableList.of(testLocation));
-        EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true);
-
-        byte[] content = "MessageBody".getBytes(Charsets.UTF_8);
-        String queue = "queueName";
-        Channel producer = null;
-        Channel consumer = null;
-        try {
-            producer = getAmqpChannel(rabbit);
-            consumer = getAmqpChannel(rabbit);
-
-            producer.queueDeclare(queue, true, false, false, ImmutableMap.<String,Object>of());
-            producer.queueBind(queue, AmqpExchange.DIRECT, queue);
-            producer.basicPublish(AmqpExchange.DIRECT, queue, null, content);
-            
-            QueueingConsumer queueConsumer = new QueueingConsumer(consumer);
-            consumer.basicConsume(queue, true, queueConsumer);
-        
-            QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(60 * 1000l); // one minute timeout
-            assertEquals(delivery.getBody(), content);
-        } finally {
-            closeSafely(producer, 10*1000);
-            closeSafely(consumer, 10*1000);
-        }
-    }
-
-    /**
-     * Closes the channel, guaranteeing the call won't hang this thread forever!
-     * 
-     * Saw this during jenkins testing:
-     * "main" prio=10 tid=0x00007f69c8008000 nid=0x5d70 in Object.wait() [0x00007f69d1318000]
-     *         java.lang.Thread.State: WAITING (on object monitor)
-     *         at java.lang.Object.wait(Native Method)
-     *         - waiting on <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
-     *         at java.lang.Object.wait(Object.java:502)
-     *         at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:50)
-     *         - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
-     *         at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:65)
-     *         - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
-     *         at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
-     *         - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
-     *         at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
-     *         at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:349)
-     *         at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:543)
-     *         at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:480)
-     *         at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:473)
-     *         at com.rabbitmq.client.Channel$close.call(Unknown Source)
-     *         at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
-     *         at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
-     *         at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112)
-     *         at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callSafe(AbstractCallSite.java:75)
-     *         at org.apache.brooklyn.entity.messaging.rabbit.RabbitIntegrationTest.testClientConnection(RabbitIntegrationTest.groovy:107)
-     */
-    private void closeSafely(final Channel channel, int timeoutMs) throws InterruptedException {
-        if (channel == null) return;
-        Thread t = new Thread(new Runnable() {
-                @Override public void run() {
-                    try {
-                        channel.close();
-                    } catch (IOException e) {
-                        log.error("Error closing RabbitMQ Channel; continuing", e);
-                    }
-                }});
-        try {
-            t.start();
-            t.join(timeoutMs);
-            
-            if (t.isAlive()) {
-                log.error("Timeout when closing RabbitMQ Channel "+channel+"; aborting close and continuing");
-            }
-        } finally {
-            t.interrupt();
-            t.join(1*1000);
-            if (t.isAlive()) t.stop();
-        }
-    }
-    
-    private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception {
-        String uri = rabbit.getAttribute(MessageBroker.BROKER_URL);
-        log.warn("connecting to rabbit {}", uri);
-        ConnectionFactory factory = new ConnectionFactory();
-        factory.setUri(uri);
-        Connection conn = factory.newConnection();
-        Channel channel = conn.createChannel();
-        return channel;
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
deleted file mode 100644
index 89afe00..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import org.testng.annotations.Test;
-
-@Test(groups="Live")
-public class LocalhostLiveTest extends StormAbstractCloudLiveTest {
-
-    private static final String NAMED_LOCATION = "localhost";
-
-    public String getLocation() {
-        return NAMED_LOCATION;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
deleted file mode 100644
index 17cb7d2..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import org.testng.annotations.Test;
-
-@Test(groups="Live")
-public class SoftLayerLiveTest extends StormAbstractCloudLiveTest {
-
-    private static final String NAMED_LOCATION = "softlayer";
-
-    @Override
-    public String getLocation() {
-        return NAMED_LOCATION;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
deleted file mode 100644
index 5633f74..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import static org.apache.brooklyn.core.sensor.DependentConfiguration.attributeWhenReady;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.NIMBUS_HOSTNAME;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.ZOOKEEPER_ENSEMBLE;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.NIMBUS;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.UI;
-
-import java.io.File;
-import java.util.Map;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
-import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.ResourceUtils;
-import org.apache.brooklyn.util.core.file.ArchiveBuilder;
-import org.apache.brooklyn.util.os.Os;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.TopologyBuilder;
-
-import org.apache.brooklyn.entity.messaging.storm.topologies.ExclamationBolt;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-
-public abstract class StormAbstractCloudLiveTest extends BrooklynAppLiveTestSupport {
-
-    protected static final Logger log = LoggerFactory
-            .getLogger(StormAbstractCloudLiveTest.class);
-    private Location location;
-    private ZooKeeperEnsemble zooKeeperEnsemble;
-    private Storm nimbus;
-    private Storm supervisor;
-    private Storm ui;
-
-    @BeforeClass(alwaysRun = true)
-    public void beforeClass() throws Exception {
-        mgmt = new LocalManagementContext();
-        location = mgmt.getLocationRegistry()
-                .resolve(getLocation(), getFlags());
-        super.setUp();
-    }
-
-    @AfterClass(alwaysRun = true)
-    public void afterClass() throws Exception {
-        // Entities.destroyAll(mgmt);
-    }
-
-    public abstract String getLocation();
-
-    public Map<String, ?> getFlags() {
-        return MutableMap.of();
-    }
-
-    @Test(groups = {"Live","WIP"})  // needs repair to avoid hard dependency on Andrea's environment
-    public void deployStorm() throws Exception {
-        try {
-            zooKeeperEnsemble = app.createAndManageChild(EntitySpec.create(
-                    ZooKeeperEnsemble.class).configure(
-                    ZooKeeperEnsemble.INITIAL_SIZE, 3));
-            nimbus = app.createAndManageChild(EntitySpec
-                    .create(Storm.class)
-                    .configure(Storm.ROLE, NIMBUS)
-                    .configure(NIMBUS_HOSTNAME, "localhost")
-                    .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
-                    );
-            supervisor = app.createAndManageChild(EntitySpec
-                    .create(Storm.class)
-                    .configure(Storm.ROLE, SUPERVISOR)
-                    .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
-                    .configure(NIMBUS_HOSTNAME,
-                            attributeWhenReady(nimbus, Attributes.HOSTNAME)));
-            ui = app.createAndManageChild(EntitySpec
-                    .create(Storm.class)
-                    .configure(Storm.ROLE, UI)
-                    .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
-                    .configure(NIMBUS_HOSTNAME,
-                            attributeWhenReady(nimbus, Attributes.HOSTNAME)));
-            log.info("Started Storm deployment on '" + getLocation() + "'");
-            app.start(ImmutableList.of(location));
-            Entities.dumpInfo(app);
-            EntityTestUtils.assertAttributeEqualsEventually(app, Startable.SERVICE_UP, true);
-            EntityTestUtils.assertAttributeEqualsEventually(zooKeeperEnsemble, Startable.SERVICE_UP, true);
-            EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
-            EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
-            EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
-            
-            StormTopology stormTopology = createTopology();
-            submitTopology(stormTopology, "myExclamation", 3, true, 60000);
-        } catch (Exception e) {
-            log.error("Failed to deploy Storm", e);
-            Assert.fail();
-            throw e;
-        }
-    }
-
-    private StormTopology createTopology()
-            throws AlreadyAliveException, InvalidTopologyException {
-        TopologyBuilder builder = new TopologyBuilder();
-
-        builder.setSpout("word", new TestWordSpout(), 10);
-        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
-        builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-        
-        return builder.createTopology();
-    }
-
-    public boolean submitTopology(StormTopology stormTopology, String topologyName, int numOfWorkers, boolean debug, long timeoutMs) {
-        if (log.isDebugEnabled()) log.debug("Connecting to NimbusClient: {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME));
-        Config conf = new Config();
-        conf.setDebug(debug);
-        conf.setNumWorkers(numOfWorkers);
-
-        // TODO - confirm this creats the JAR correctly
-        String jar = createJar(
-            new File(Os.mergePaths(ResourceUtils.create(this).getClassLoaderDir(), "org/apache/brooklyn/entity/messaging/storm/topologies")),
-            "org/apache/brooklyn/entity/messaging/storm/");
-        System.setProperty("storm.jar", jar);
-        long startMs = System.currentTimeMillis();
-        long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs);
-        long currentTime = startMs;
-        Throwable lastError = null;
-        int attempt = 0;
-        while (currentTime <= endMs) {
-            currentTime = System.currentTimeMillis();
-            if (attempt != 0) Time.sleep(Duration.ONE_SECOND);
-            if (log.isTraceEnabled()) log.trace("trying connection to {} at time {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME), currentTime);
-
-            try {
-                StormSubmitter.submitTopology(topologyName, conf, stormTopology);
-                return true;
-            } catch (Exception e) {
-                if (shouldRetryOn(e)) {
-                    if (log.isDebugEnabled()) log.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, nimbus.getConfig(Storm.NIMBUS_HOSTNAME), e.getMessage()});
-                    lastError = e;
-                } else {
-                    throw Throwables.propagate(e);
-                }
-            }
-            attempt++;
-        }
-        log.warn("unable to connect to Nimbus client: ", lastError);
-        Assert.fail();
-        return false;
-    }
-    
-    private boolean shouldRetryOn(Exception e) {
-        if (e.getMessage().equals("org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused"))  return true;
-        return false;
-    }
-    
-    private String createJar(File dir, String parentDirInJar) {
-        if (dir.isDirectory()) {
-            File jarFile = ArchiveBuilder.jar().addAt(dir, parentDirInJar).create(Os.newTempDir(getClass())+"/topologies.jar");
-            return jarFile.getAbsolutePath();
-        } else {
-            return dir.getAbsolutePath(); // An existing Jar archive?
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
deleted file mode 100644
index bf54003..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.collect.ImmutableList;
-
-public class StormEc2LiveTest extends AbstractEc2LiveTest {
-
-    /**
-     * Test that can install, start and use a Storm cluster: 1 nimbus, 1 zookeeper, 1 supervisor (worker node).
-     */
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class));
-        Storm nimbus = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
-                Storm.Role.NIMBUS));
-        Storm supervisor = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
-                Storm.Role.SUPERVISOR));
-        Storm ui = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
-                Storm.Role.UI));        
-        app.start(ImmutableList.of(loc));
-        Entities.dumpInfo(app);
-        
-        EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true);
-        EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
-        EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
-        EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
-    }
-
-    @Test(enabled=false)
-    public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
deleted file mode 100644
index e1a9ad4..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import java.util.Map;
-
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.testng.annotations.Test;
-
-@Test(groups="Live")
-public class StormGceLiveTest extends StormAbstractCloudLiveTest {
-
-    private static final String NAMED_LOCATION = "gce-europe-west1";
-    private static final String LOCATION_ID = "gce-europe-west1-a";
-    private static final String URI = "https://www.googleapis.com/compute/v1beta15/projects/google/global/images/centos-6-v20130325";
-    private static final String IMAGE_ID = "centos-6-v20130325";
-
-    @Override
-    public String getLocation() {
-        return NAMED_LOCATION;
-    }
-
-    @Override
-    public Map<String, ?> getFlags() {
-        return MutableMap.of(
-                "locationId", LOCATION_ID,
-                "imageId", IMAGE_ID,
-                "uri", URI + IMAGE_ID,
-                "groupId", "storm-test",
-                "stopIptables", "true"
-        );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
deleted file mode 100644
index a10a30e..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm.topologies;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class ExclamationBolt extends BaseRichBolt {
-    OutputCollector _collector;
-
-    @Override
-    public void prepare(Map conf, TopologyContext context,
-            OutputCollector collector) {
-        _collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-        _collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("word"));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java
deleted file mode 100644
index e8d49ee..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.zookeeper;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.collect.ImmutableList;
-
-public class ZooKeeperEc2LiveTest extends AbstractEc2LiveTest {
-
-    /**
-     * Test that can install, start and use a Zookeeper instance.
-     */
-    @Override
-    protected void doTest(Location loc) throws Exception {
-        ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class).configure("jmxPort", "31001+"));
-        app.start(ImmutableList.of(loc));
-        Entities.dumpInfo(zookeeper);
-        EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true);
-    }
-    
-    @Test(enabled=false)
-    public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
deleted file mode 100644
index f143df7..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.zookeeper;
-
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.net.Socket;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * A live test of the {@link org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble} entity.
- *
- * Tests that a 3 node cluster can be started on Amazon EC2 and data written on one {@link org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble}
- * can be read from another, using the Astyanax API.
- */
-public class ZooKeeperEnsembleLiveTest {
-
-    private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleLiveTest.class);
-    
-    private String provider = 
-            "gce-europe-west1";
-//            "aws-ec2:eu-west-1";
-//            "named:hpcloud-compute-at";
-//            "localhost";
-
-    protected TestApplication app;
-    protected Location testLocation;
-    protected ZooKeeperEnsemble cluster;
-
-    @BeforeMethod(alwaysRun = true)
-    public void setup() {
-        app = ApplicationBuilder.newManagedApp(TestApplication.class);
-        testLocation = app.getManagementContext().getLocationRegistry().resolve(provider);
-    }
-
-    @AfterMethod(alwaysRun = true)
-    public void shutdown() {
-        Entities.destroyAll(app.getManagementContext());
-    }
-
-    /**
-     * Test that a two node cluster starts up and allows access through both nodes.
-     */
-    @Test(groups = "Live")
-    public void testStartUpConnectAndResize() throws Exception {
-        try {
-            cluster = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class)
-                    .configure("initialSize", 3)
-                    .configure("clusterName", "ZooKeeperEnsembleLiveTest"));
-            assertEquals(cluster.getCurrentSize().intValue(), 0);
-
-            app.start(ImmutableList.of(testLocation));
-
-            EntityTestUtils.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 3);
-            Entities.dumpInfo(app);
-
-            EntityTestUtils.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true);
-            for(Entity zkNode : cluster.getMembers()) {
-                assertTrue(isSocketOpen((ZooKeeperNode) zkNode));
-            }
-            cluster.resize(1);
-            EntityTestUtils.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 1);
-            Entities.dumpInfo(app);
-            EntityTestUtils.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true);
-            for (Entity zkNode : cluster.getMembers()) {
-                assertTrue(isSocketOpen((ZooKeeperNode) zkNode));
-            }
-        } catch (Throwable e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    protected static boolean isSocketOpen(ZooKeeperNode node) {
-        int attempt = 0, maxAttempts = 20;
-        while(attempt < maxAttempts) {
-            try {
-                Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getZookeeperPort());
-                s.close();
-                return true;
-            } catch (Exception e) {
-                attempt++;
-            }
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-        return false;
-    }
-    
-}