You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:47:56 UTC
[20/51] [abbrv] [partial] brooklyn-library git commit: move subdir
from incubator up a level as it is promoted to its own repo (first
non-incubator commit!)
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
deleted file mode 100644
index 7183c15..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/activemq/ActiveMQIntegrationTest.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.activemq;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.entity.java.UsesJmx;
-import org.apache.brooklyn.entity.java.UsesJmx.JmxAgentModes;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Test the operation of the {@link ActiveMQBroker} class.
- */
-public class ActiveMQIntegrationTest {
- private static final Logger log = LoggerFactory.getLogger(ActiveMQIntegrationTest.class);
-
- private TestApplication app;
- private Location testLocation;
- private ActiveMQBroker activeMQ;
-
- @BeforeMethod(alwaysRun = true)
- public void setup() throws Exception {
- app = ApplicationBuilder.newManagedApp(TestApplication.class);
- testLocation = app.newLocalhostProvisioningLocation();
- }
-
- @AfterMethod(alwaysRun = true)
- public void shutdown() throws Exception {
- if (app != null) Entities.destroyAll(app.getManagementContext());
- }
-
- /**
- * Test that the broker starts up and sets SERVICE_UP correctly.
- */
- @Test(groups = "Integration")
- public void canStartupAndShutdown() throws Exception {
- activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
-
- activeMQ.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
- log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
- activeMQ.stop();
- assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that the broker starts up and sets SERVICE_UP correctly,
- * when a jmx port is supplied
- */
- @Test(groups = "Integration")
- public void canStartupAndShutdownWithCustomJmx() throws Exception {
- activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
- .configure("jmxPort", "11099+"));
-
- activeMQ.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
- log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
- activeMQ.stop();
- assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
- }
-
- @Test(groups = "Integration")
- public void canStartupAndShutdownWithCustomBrokerName() throws Exception {
- activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
- .configure("jmxPort", "11099+")
- .configure("brokerName", "bridge"));
-
- activeMQ.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
- log.info("JMX URL is "+activeMQ.getAttribute(UsesJmx.JMX_URL));
- activeMQ.stop();
- assertFalse(activeMQ.getAttribute(Startable.SERVICE_UP));
- }
-
-
- @Test(groups = "Integration")
- public void canStartTwo() throws Exception {
- ActiveMQBroker activeMQ1 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
- ActiveMQBroker activeMQ2 = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class));
-
- activeMQ1.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ1, Startable.SERVICE_UP, true);
- log.info("JMX URL is "+activeMQ1.getAttribute(UsesJmx.JMX_URL));
-
- activeMQ2.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ2, Startable.SERVICE_UP, true);
- log.info("JMX URL is "+activeMQ2.getAttribute(UsesJmx.JMX_URL));
- }
-
- /**
- * Test that setting the 'queue' property causes a named queue to be created.
- */
- @Test(groups = "Integration")
- public void testCreatingQueuesDefault() throws Exception {
- String url = testCreatingQueuesInternal(null);
- // localhost default is jmxmp
- Assert.assertTrue(url.contains("jmxmp"), "url="+url);
- }
-
- @Test(groups = "Integration")
- public void testCreatingQueuesRmi() throws Exception {
- String url = testCreatingQueuesInternal(JmxAgentModes.JMX_RMI_CUSTOM_AGENT);
- Assert.assertTrue(url.contains("rmi://"), "url="+url);
- Assert.assertFalse(url.contains("rmi:///jndi"), "url="+url);
- Assert.assertFalse(url.contains("jmxmp"), "url="+url);
- }
-
- @Test(groups = "Integration")
- public void testCreatingQueuesJmxmp() throws Exception {
- String url = testCreatingQueuesInternal(JmxAgentModes.JMXMP);
- // localhost default is rmi
- Assert.assertTrue(url.contains("jmxmp"), "url="+url);
- Assert.assertFalse(url.contains("rmi"), "url="+url);
- }
-
- @Test(groups = "Integration")
- public void testCreatingQueuesNoAgent() throws Exception {
- String url = testCreatingQueuesInternal(JmxAgentModes.NONE);
- // localhost default is rmi
- Assert.assertTrue(url.contains("service:jmx:rmi"), "url="+url);
- Assert.assertFalse(url.contains("jmxmp"), "url="+url);
- }
-
- public String testCreatingQueuesInternal(JmxAgentModes mode) throws Exception {
- String queueName = "testQueue";
- int number = 20;
- String content = "01234567890123456789012345678901";
-
- // Start broker with a configured queue
- // FIXME Not yet using app.createAndManageChild because later in test do activeMQ.queueNames,
- // which is not on interface
- activeMQ = app.createAndManageChild(EntitySpec.create(ActiveMQBroker.class)
- .configure("queue", queueName)
- .configure(UsesJmx.JMX_AGENT_MODE, mode));
-
- activeMQ.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 10*60*1000), activeMQ, Startable.SERVICE_UP, true);
-
- String jmxUrl = activeMQ.getAttribute(UsesJmx.JMX_URL);
- log.info("JMX URL ("+mode+") is "+jmxUrl);
-
- try {
- // Check queue created
- assertFalse(activeMQ.getQueueNames().isEmpty());
- assertEquals(activeMQ.getQueueNames().size(), 1);
- assertTrue(activeMQ.getQueueNames().contains(queueName));
- assertEquals(activeMQ.getChildren().size(), 1);
- assertFalse(activeMQ.getQueues().isEmpty());
- assertEquals(activeMQ.getQueues().size(), 1);
-
- // Get the named queue entity
- ActiveMQQueue queue = activeMQ.getQueues().get(queueName);
- assertNotNull(queue);
- assertEquals(queue.getName(), queueName);
-
- // Connect to broker using JMS and send messages
- Connection connection = getActiveMQConnection(activeMQ);
- clearQueue(connection, queueName);
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
- sendMessages(connection, number, queueName, content);
- // Check messages arrived
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, number);
-
- // Clear the messages
- assertEquals(clearQueue(connection, queueName), number);
-
- // Check messages cleared
- EntityTestUtils.assertAttributeEqualsEventually(queue, ActiveMQQueue.QUEUE_DEPTH_MESSAGES, 0);
-
- connection.close();
-
- // Close the JMS connection
- } finally {
- // Stop broker
- activeMQ.stop();
- }
-
- return jmxUrl;
- }
-
- private Connection getActiveMQConnection(ActiveMQBroker activeMQ) throws Exception {
- int port = activeMQ.getAttribute(ActiveMQBroker.OPEN_WIRE_PORT);
- String address = activeMQ.getAttribute(ActiveMQBroker.ADDRESS);
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://"+address+":"+port);
- Connection connection = factory.createConnection("admin", "activemq");
- connection.start();
- return connection;
- }
-
- private void sendMessages(Connection connection, int count, String queueName, String content) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(queueName);
- MessageProducer messageProducer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage(content);
- messageProducer.send(message);
- }
-
- session.close();
- }
-
- private int clearQueue(Connection connection, String queueName) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(queueName);
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- int received = 0;
- while (messageConsumer.receive(500) != null) received++;
-
- session.close();
-
- return received;
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
deleted file mode 100644
index 744b2d5..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.kafka;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.location.LocationSpec;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.time.Duration;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.messaging.activemq.ActiveMQBroker;
-import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Test the operation of the {@link ActiveMQBroker} class.
- *
- * TODO test that sensors update.
- */
-public class KafkaIntegrationTest {
-
- private TestApplication app;
- private Location testLocation;
-
- @BeforeMethod(alwaysRun = true)
- public void setup() {
- app = ApplicationBuilder.newManagedApp(TestApplication.class);
- LocationSpec<LocalhostMachineProvisioningLocation> locationSpec = LocationSpec.create(LocalhostMachineProvisioningLocation.class);
- testLocation = app.getManagementContext().getLocationManager().createLocation(locationSpec);
- }
-
- @AfterMethod(alwaysRun = true)
- public void shutdown() {
- if (app != null) Entities.destroyAll(app.getManagementContext());
- }
-
- /**
- * Test that we can start a zookeeper.
- */
- @Test(groups = "Integration")
- public void testZookeeper() {
- final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
-
- zookeeper.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true);
-
- zookeeper.stop();
- assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that we can start a broker and zookeeper together.
- */
- @Test(groups = "Integration")
- public void testBrokerPlusZookeeper() {
- final KafkaZooKeeper zookeeper = app.createAndManageChild(EntitySpec.create(KafkaZooKeeper.class));
- final KafkaBroker broker = app.createAndManageChild(EntitySpec.create(KafkaBroker.class).configure(KafkaBroker.ZOOKEEPER, zookeeper));
-
- zookeeper.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), zookeeper, Startable.SERVICE_UP, true);
-
- broker.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(ImmutableMap.of("timeout", 60*1000), broker, Startable.SERVICE_UP, true);
-
- zookeeper.stop();
- assertFalse(zookeeper.getAttribute(Startable.SERVICE_UP));
-
- broker.stop();
- assertFalse(broker.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that we can start a cluster with zookeeper and one broker.
- *
- * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
- */
- @Test(groups = "Integration")
- public void testTwoBrokerCluster() throws InterruptedException {
- final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
- .configure(KafkaCluster.INITIAL_SIZE, 2));
-
- cluster.start(ImmutableList.of(testLocation));
- Asserts.succeedsEventually(MutableMap.of("timeout", Duration.TWO_MINUTES), new Callable<Void>() {
- @Override
- public Void call() {
- assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
- assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP));
- assertEquals(cluster.getCurrentSize().intValue(), 2);
- return null;
- }
- });
-
- Entities.dumpInfo(cluster);
-
- final KafkaSupport support = new KafkaSupport(cluster);
-
- support.sendMessage("brooklyn", "TEST_MESSAGE");
-
- Asserts.succeedsEventually(MutableMap.of("timeout", Duration.FIVE_SECONDS), new Runnable() {
- @Override
- public void run() {
- String message = support.getMessage("brooklyn");
- assertEquals(message, "TEST_MESSAGE");
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
deleted file mode 100644
index 096bdeb..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaLiveTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.kafka;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.util.concurrent.Callable;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.util.collections.MutableMap;
-
-import com.google.common.collect.ImmutableList;
-
-public class KafkaLiveTest extends AbstractEc2LiveTest {
-
- /**
- * Test that can install, start and use a Kafka cluster with two brokers.
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- final KafkaCluster cluster = app.createAndManageChild(EntitySpec.create(KafkaCluster.class)
- .configure("startTimeout", 300) // 5 minutes
- .configure("initialSize", 2));
- app.start(ImmutableList.of(loc));
-
- Asserts.succeedsEventually(MutableMap.of("timeout", 300000l), new Callable<Void>() {
- @Override
- public Void call() {
- assertTrue(cluster.getAttribute(Startable.SERVICE_UP));
- assertTrue(cluster.getZooKeeper().getAttribute(Startable.SERVICE_UP));
- assertEquals(cluster.getCurrentSize().intValue(), 2);
- return null;
- }
- });
-
- Entities.dumpInfo(cluster);
-
- KafkaSupport support = new KafkaSupport(cluster);
-
- support.sendMessage("brooklyn", "TEST_MESSAGE");
- String message = support.getMessage("brooklyn");
- assertEquals(message, "TEST_MESSAGE");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
deleted file mode 100644
index f385e10..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.kafka;
-
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.core.entity.EntityPredicates;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import java.security.InvalidParameterException;
-import java.util.Properties;
-
-import static java.lang.String.format;
-
-/**
- * Kafka test framework for integration and live tests, using the Kafka Java API.
- */
-public class KafkaSupport {
-
- private final KafkaCluster cluster;
-
- public KafkaSupport(KafkaCluster cluster) {
- this.cluster = cluster;
- }
-
- /**
- * Send a message to the {@link KafkaCluster} on the given topic.
- */
- public void sendMessage(String topic, String message) {
- Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
- Predicates.instanceOf(KafkaBroker.class),
- EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
- if (anyBrokerNodeInCluster.isPresent()) {
- KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
-
- Properties props = new Properties();
-
- props.put("metadata.broker.list", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
- props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer<String, String> producer = new KafkaProducer<>(props);
- ((KafkaZooKeeper)cluster.getZooKeeper()).createTopic(topic);
-
- ProducerRecord<String, String> data = new ProducerRecord<>(topic, message);
- producer.send(data);
- producer.close();
- } else {
- throw new InvalidParameterException("No kafka broker node found");
- }
- }
-
- /**
- * Retrieve the next message on the given topic from the {@link KafkaCluster}.
- */
- public String getMessage(String topic) {
- ZooKeeperNode zookeeper = cluster.getZooKeeper();
- Optional<Entity> anyBrokerNodeInCluster = Iterables.tryFind(cluster.getCluster().getChildren(), Predicates.and(
- Predicates.instanceOf(KafkaBroker.class),
- EntityPredicates.attributeEqualTo(KafkaBroker.SERVICE_UP, true)));
- if (anyBrokerNodeInCluster.isPresent()) {
- KafkaBroker broker = (KafkaBroker)anyBrokerNodeInCluster.get();
-
- Properties props = new Properties();
-
- props.put("bootstrap.servers", format("%s:%d", broker.getAttribute(KafkaBroker.HOSTNAME), broker.getKafkaPort()));
- props.put("zookeeper.connect", format(zookeeper.getHostname(), zookeeper.getZookeeperPort()));
- props.put("group.id", "brooklyn");
- props.put("partition.assignment.strategy", "RoundRobin");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- KafkaConsumer consumer = new KafkaConsumer(props);
-
- consumer.subscribe(topic);
- // FIXME unimplemented KafkaConsumer.poll
-// Object consumerRecords = consumer.poll(Duration.seconds(3).toMilliseconds()).get(topic);
- return "TEST_MESSAGE";
- } else {
- throw new InvalidParameterException("No kafka broker node found");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
deleted file mode 100644
index 6d793c8..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidEc2LiveTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.qpid;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-
-public class QpidEc2LiveTest extends AbstractEc2LiveTest {
-
- // TODO Also check can connect (e.g. to send/receive messages)
-
- @Override
- protected void doTest(Location loc) throws Exception {
- QpidBroker qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
- .configure("jmxPort", "9909+")
- .configure("rmiRegistryPort", "9910+"));
-
- qpid.start(ImmutableList.of(loc));
- EntityTestUtils.assertAttributeEqualsEventually(qpid, QpidBroker.SERVICE_UP, true);
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
deleted file mode 100644
index 977b934..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/qpid/QpidIntegrationTest.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.qpid;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.entity.software.base.SoftwareProcess;
-import org.apache.brooklyn.test.Asserts;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.apache.brooklyn.test.HttpTestUtils;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.configuration.ClientProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import com.google.common.collect.ImmutableList;
-
-/**
- * Test the operation of the {@link QpidBroker} class.
- */
-public class QpidIntegrationTest {
- private static final Logger log = LoggerFactory.getLogger(QpidIntegrationTest.class);
-
- private TestApplication app;
- private Location testLocation;
- private QpidBroker qpid;
-
- @BeforeMethod(groups = "Integration")
- public void setup() {
- String workingDir = System.getProperty("user.dir");
- log.info("Qpid working dir: {}", workingDir);
- app = ApplicationBuilder.newManagedApp(TestApplication.class);
- testLocation = app.newLocalhostProvisioningLocation();
- }
-
- @AfterMethod(alwaysRun=true)
- public void shutdown() {
- if (app != null) Entities.destroyAll(app.getManagementContext());
- }
-
- /**
- * Test that the broker starts up with JMX and RMI ports configured, and sets SERVICE_UP correctly.
- */
- @Test(groups = "Integration")
- public void canStartupAndShutdown() {
- qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
- .configure("jmxPort", "9909+")
- .configure("rmiRegistryPort", "9910+"));
- qpid.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
- qpid.stop();
- assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that the broker starts up with HTTP management enabled, and we can connect to the URL.
- */
- @Test(groups = "Integration")
- public void canStartupAndShutdownWithHttpManagement() {
- qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
- .configure("httpManagementPort", "8888+"));
- qpid.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
- String httpUrl = "http://"+qpid.getAttribute(QpidBroker.HOSTNAME)+":"+qpid.getAttribute(QpidBroker.HTTP_MANAGEMENT_PORT)+"/management";
- HttpTestUtils.assertHttpStatusCodeEventuallyEquals(httpUrl, 200);
- // TODO check actual REST output
- qpid.stop();
- assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that the broker starts up and sets SERVICE_UP correctly when plugins are configured.
- *
- * FIXME the custom plugin was written against qpid 0.14, so that's the version we need to run
- * this test against. However, v0.14 is no longer available from the download site.
- * We should update this plugin so it works with the latest qpid.
- */
- @Test(enabled = false, groups = "Integration")
- public void canStartupAndShutdownWithPlugin() {
- Map<String,String> qpidRuntimeFiles = MutableMap.<String,String>builder()
- .put("classpath://qpid-test-config.xml", "etc/config.xml")
- .put("http://developers.cloudsoftcorp.com/brooklyn/repository-test/0.7.0/QpidBroker/qpid-test-plugin.jar", "lib/plugins/sample-plugin.jar")
- .build();
- qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
- .configure(SoftwareProcess.RUNTIME_FILES, qpidRuntimeFiles)
- .configure(QpidBroker.SUGGESTED_VERSION, "0.14"));
- qpid.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
- qpid.stop();
- assertFalse(qpid.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that setting the 'queue' property causes a named queue to be created.
- *
- * This test is disabled, pending further investigation. Issue with AMQP 0-10 queue names.
- *
- * FIXME disabled becausing failing in jenkins CI (in QpidIntegrationTest.getQpidConnection()).
- * url=amqp://admin:********@brooklyn/localhost?brokerlist='tcp://localhost:5672'
- * Was previously enabled, dispite comment above about "test is disabled".
- */
- @Test(enabled = false, groups = { "Integration", "WIP" })
- public void testCreatingQueues() {
- final String queueName = "testQueue";
- final int number = 20;
- final String content = "01234567890123456789012345678901";
-
- // Start broker with a configured queue
- // FIXME Can't use app.createAndManageChild, because of QpidDestination reffing impl directly
- qpid = app.createAndManageChild(EntitySpec.create(QpidBroker.class)
- .configure("queue", queueName));
- qpid.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(qpid, Startable.SERVICE_UP, true);
-
- try {
- // Check queue created
- assertFalse(qpid.getQueueNames().isEmpty());
- assertEquals(qpid.getQueueNames().size(), 1);
- assertTrue(qpid.getQueueNames().contains(queueName));
- assertEquals(qpid.getChildren().size(), 1);
- assertFalse(qpid.getQueues().isEmpty());
- assertEquals(qpid.getQueues().size(), 1);
-
- // Get the named queue entity
- final QpidQueue queue = qpid.getQueues().get(queueName);
- assertNotNull(queue);
-
- // Connect to broker using JMS and send messages
- Connection connection = getQpidConnection(qpid);
- clearQueue(connection, queue.getQueueName());
- Asserts.succeedsEventually(new Runnable() {
- @Override
- public void run() {
- assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(0));
- }
- });
- sendMessages(connection, number, queue.getQueueName(), content);
-
- // Check messages arrived
- Asserts.succeedsEventually(new Runnable() {
- @Override
- public void run() {
- assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), Integer.valueOf(number));
- assertEquals(queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), Integer.valueOf(number * content.length()));
- }
- });
-
- //TODO clearing the queue currently returns 0
-// // Clear the messages -- should get 20
-// assertEquals clearQueue(connection, queue.queueName), 20
-//
-// // Check messages cleared
-// executeUntilSucceeds {
-// assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_MESSAGES), 0
-// assertEquals queue.getAttribute(QpidQueue.QUEUE_DEPTH_BYTES), 0
-// }
-
- // Close the JMS connection
- connection.close();
- } catch (JMSException jmse) {
- log.warn("JMS exception caught", jmse);
- throw Exceptions.propagate(jmse);
- } finally {
- // Stop broker
- qpid.stop();
- qpid = null;
- app = null;
- }
- }
-
- private Connection getQpidConnection(QpidBroker qpid) {
- int port = qpid.getAttribute(Attributes.AMQP_PORT);
- System.setProperty(ClientProperties.AMQP_VERSION, "0-10");
- System.setProperty(ClientProperties.DEST_SYNTAX, "ADDR");
- String connectionUrl = String.format("amqp://admin:admin@brooklyn/localhost?brokerlist='tcp://localhost:%d'", port);
- try {
- AMQConnectionFactory factory = new AMQConnectionFactory(connectionUrl);
- Connection connection = factory.createConnection();
- connection.start();
- return connection;
- } catch (Exception e) {
- log.error(String.format("Error connecting to qpid: %s", connectionUrl), e);
- throw Exceptions.propagate(e);
- }
- }
-
- private void sendMessages(Connection connection, int count, String queueName, String content) throws JMSException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(queueName);
- MessageProducer messageProducer = session.createProducer(destination);
-
- for (int i = 0; i < count; i++) {
- TextMessage message = session.createTextMessage(content);
- messageProducer.send(message);
- }
-
- session.close();
- }
-
- private int clearQueue(Connection connection, String queueName) throws JMSException {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(queueName);
- MessageConsumer messageConsumer = session.createConsumer(destination);
-
- int received = 0;
- while (messageConsumer.receive(500) != null) received++;
-
- session.close();
-
- return received;
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
deleted file mode 100644
index d6959b8..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitEc2LiveTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.rabbit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.EntityAsserts;
-import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
-import org.apache.brooklyn.core.location.cloud.CloudLocationConfig;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.entity.messaging.MessageBroker;
-import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.SkipException;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-
-public class RabbitEc2LiveTest extends AbstractEc2LiveTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(RabbitEc2LiveTest.class);
-
- @Override
- protected void doTest(Location loc) throws Exception {
- RabbitBroker rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
- rabbit.start(ImmutableList.of(loc));
- EntityTestUtils.assertAttributeEqualsEventually(rabbit, RabbitBroker.SERVICE_UP, true);
-
- byte[] content = "MessageBody".getBytes(Charsets.UTF_8);
- String queue = "queueName";
- Channel producer = null;
- Channel consumer = null;
- try {
- producer = getAmqpChannel(rabbit);
- consumer = getAmqpChannel(rabbit);
-
- producer.queueDeclare(queue, true, false, false, Maps.<String,Object>newHashMap());
- producer.queueBind(queue, AmqpExchange.DIRECT, queue);
- producer.basicPublish(AmqpExchange.DIRECT, queue, null, content);
-
- QueueingConsumer queueConsumer = new QueueingConsumer(consumer);
- consumer.basicConsume(queue, true, queueConsumer);
-
- QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery();
- assertEquals(delivery.getBody(), content);
- } finally {
- if (producer != null) producer.close();
- if (consumer != null) consumer.close();
- }
- }
-
- private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception {
- String uri = rabbit.getAttribute(MessageBroker.BROKER_URL);
- LOG.warn("connecting to rabbit {}", uri);
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUri(uri);
- Connection conn = factory.newConnection();
- Channel channel = conn.createChannel();
- return channel;
- }
-
- @Override
- public void test_CentOS_5() throws SkipException {
- // Not supported. The EPEL repository described here at [1] does not contain erlang, and the
- // Erlang repository at [1] requires old versions of rpmlib. Additionally, [2] suggests that
- // Centos 5 is not supported
- // [1]:http://www.rabbitmq.com/install-rpm.html
- // [2]: https://www.erlang-solutions.com/downloads/download-erlang-otp
- throw new SkipException("Centos 5 is not supported");
- }
-
- @Test(groups = {"Live"})
- public void testWithOnlyPort22() throws Exception {
- // CentOS-6.3-x86_64-GA-EBS-02-85586466-5b6c-4495-b580-14f72b4bcf51-ami-bb9af1d2.1
- jcloudsLocation = mgmt.getLocationRegistry().resolve(LOCATION_SPEC, ImmutableMap.of(
- "tags", ImmutableList.of(getClass().getName()),
- "imageId", "us-east-1/ami-a96b01c0",
- "hardwareId", SMALL_HARDWARE_ID));
-
- final RabbitBroker server = app.createAndManageChild(EntitySpec.create(RabbitBroker.class)
- .configure(RabbitBroker.PROVISIONING_PROPERTIES.subKey(CloudLocationConfig.INBOUND_PORTS.getName()), ImmutableList.of(22)));
-
- app.start(ImmutableList.of(jcloudsLocation));
-
- EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_UP, true);
- EntityAsserts.assertAttributeEqualsEventually(server, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
-
- Integer port = server.getAttribute(RabbitBroker.AMQP_PORT);
- assertNotNull(port);
-
- assertViaSshLocalPortListeningEventually(server, port);
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
deleted file mode 100644
index fc59dec..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/rabbit/RabbitIntegrationTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.rabbit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-
-import java.io.IOException;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.messaging.MessageBroker;
-import org.apache.brooklyn.entity.messaging.amqp.AmqpExchange;
-import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-
-/**
- * Test the operation of the {@link RabbitBroker} class.
- *
- * TODO If you're having problems running this test successfully, here are a few tips:
- *
- * - Is `erl` on your path for a non-interactive ssh session?
- * Look in rabbit's $RUN_DIR/console-err.log (e.g. /tmp/brooklyn-aled/apps/someappid/entities/RabbitBroker_2.8.7_JROYTcSL/console-err.log)
- * I worked around that by adding to my ~/.brooklyn/brooklyn.properties:
- * brooklyn.ssh.config.scriptHeader=#!/bin/bash -e\nif [ -f ~/.bashrc ] ; then . ~/.bashrc ; fi\nif [ -f ~/.profile ] ; then . ~/.profile ; fi\necho $PATH > /tmp/mypath.txt
- *
- * - Is the hostname resolving properly?
- * Look in $RUN_DIR/console-out.log; is there a message like:
- * ERROR: epmd error for host "Aleds-MacBook-Pro": timeout (timed out establishing tcp connection)
- * I got around that with disabling my wifi and running when not connected to the internet.
- */
-public class RabbitIntegrationTest {
- private static final Logger log = LoggerFactory.getLogger(RabbitIntegrationTest.class);
-
- private TestApplication app;
- private Location testLocation;
- private RabbitBroker rabbit;
-
- @BeforeMethod(groups = "Integration")
- public void setup() {
- app = ApplicationBuilder.newManagedApp(TestApplication.class);
- testLocation = new LocalhostMachineProvisioningLocation();
- }
-
- @AfterMethod(alwaysRun = true)
- public void shutdown() {
- if (app != null) Entities.destroyAll(app.getManagementContext());
- }
-
- /**
- * Test that the broker starts up and sets SERVICE_UP correctly.
- */
- @Test(groups = {"Integration", "WIP"})
- public void canStartupAndShutdown() throws Exception {
- rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
- rabbit.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true);
- rabbit.stop();
- assertFalse(rabbit.getAttribute(Startable.SERVICE_UP));
- }
-
- /**
- * Test that an AMQP client can connect to and use the broker.
- */
- @Test(groups = {"Integration", "WIP"})
- public void testClientConnection() throws Exception {
- rabbit = app.createAndManageChild(EntitySpec.create(RabbitBroker.class));
- rabbit.start(ImmutableList.of(testLocation));
- EntityTestUtils.assertAttributeEqualsEventually(rabbit, Startable.SERVICE_UP, true);
-
- byte[] content = "MessageBody".getBytes(Charsets.UTF_8);
- String queue = "queueName";
- Channel producer = null;
- Channel consumer = null;
- try {
- producer = getAmqpChannel(rabbit);
- consumer = getAmqpChannel(rabbit);
-
- producer.queueDeclare(queue, true, false, false, ImmutableMap.<String,Object>of());
- producer.queueBind(queue, AmqpExchange.DIRECT, queue);
- producer.basicPublish(AmqpExchange.DIRECT, queue, null, content);
-
- QueueingConsumer queueConsumer = new QueueingConsumer(consumer);
- consumer.basicConsume(queue, true, queueConsumer);
-
- QueueingConsumer.Delivery delivery = queueConsumer.nextDelivery(60 * 1000l); // one minute timeout
- assertEquals(delivery.getBody(), content);
- } finally {
- closeSafely(producer, 10*1000);
- closeSafely(consumer, 10*1000);
- }
- }
-
- /**
- * Closes the channel, guaranteeing the call won't hang this thread forever!
- *
- * Saw this during jenkins testing:
- * "main" prio=10 tid=0x00007f69c8008000 nid=0x5d70 in Object.wait() [0x00007f69d1318000]
- * java.lang.Thread.State: WAITING (on object monitor)
- * at java.lang.Object.wait(Native Method)
- * - waiting on <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
- * at java.lang.Object.wait(Object.java:502)
- * at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:50)
- * - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
- * at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:65)
- * - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
- * at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
- * - locked <0x00000000e0947cf8> (a com.rabbitmq.utility.BlockingValueOrException)
- * at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
- * at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:349)
- * at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:543)
- * at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:480)
- * at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:473)
- * at com.rabbitmq.client.Channel$close.call(Unknown Source)
- * at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:42)
- * at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
- * at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:112)
- * at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callSafe(AbstractCallSite.java:75)
- * at org.apache.brooklyn.entity.messaging.rabbit.RabbitIntegrationTest.testClientConnection(RabbitIntegrationTest.groovy:107)
- */
- private void closeSafely(final Channel channel, int timeoutMs) throws InterruptedException {
- if (channel == null) return;
- Thread t = new Thread(new Runnable() {
- @Override public void run() {
- try {
- channel.close();
- } catch (IOException e) {
- log.error("Error closing RabbitMQ Channel; continuing", e);
- }
- }});
- try {
- t.start();
- t.join(timeoutMs);
-
- if (t.isAlive()) {
- log.error("Timeout when closing RabbitMQ Channel "+channel+"; aborting close and continuing");
- }
- } finally {
- t.interrupt();
- t.join(1*1000);
- if (t.isAlive()) t.stop();
- }
- }
-
- private Channel getAmqpChannel(RabbitBroker rabbit) throws Exception {
- String uri = rabbit.getAttribute(MessageBroker.BROKER_URL);
- log.warn("connecting to rabbit {}", uri);
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUri(uri);
- Connection conn = factory.newConnection();
- Channel channel = conn.createChannel();
- return channel;
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
deleted file mode 100644
index 89afe00..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/LocalhostLiveTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import org.testng.annotations.Test;
-
-@Test(groups="Live")
-public class LocalhostLiveTest extends StormAbstractCloudLiveTest {
-
- private static final String NAMED_LOCATION = "localhost";
-
- public String getLocation() {
- return NAMED_LOCATION;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
deleted file mode 100644
index 17cb7d2..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/SoftLayerLiveTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import org.testng.annotations.Test;
-
-@Test(groups="Live")
-public class SoftLayerLiveTest extends StormAbstractCloudLiveTest {
-
- private static final String NAMED_LOCATION = "softlayer";
-
- @Override
- public String getLocation() {
- return NAMED_LOCATION;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
deleted file mode 100644
index 5633f74..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormAbstractCloudLiveTest.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import static org.apache.brooklyn.core.sensor.DependentConfiguration.attributeWhenReady;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.NIMBUS_HOSTNAME;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.ZOOKEEPER_ENSEMBLE;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.NIMBUS;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.SUPERVISOR;
-import static org.apache.brooklyn.entity.messaging.storm.Storm.Role.UI;
-
-import java.io.File;
-import java.util.Map;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
-import org.apache.brooklyn.core.test.BrooklynAppLiveTestSupport;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.ResourceUtils;
-import org.apache.brooklyn.util.core.file.ArchiveBuilder;
-import org.apache.brooklyn.util.os.Os;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.testing.TestWordSpout;
-import backtype.storm.topology.TopologyBuilder;
-
-import org.apache.brooklyn.entity.messaging.storm.topologies.ExclamationBolt;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-
-public abstract class StormAbstractCloudLiveTest extends BrooklynAppLiveTestSupport {
-
- protected static final Logger log = LoggerFactory
- .getLogger(StormAbstractCloudLiveTest.class);
- private Location location;
- private ZooKeeperEnsemble zooKeeperEnsemble;
- private Storm nimbus;
- private Storm supervisor;
- private Storm ui;
-
- @BeforeClass(alwaysRun = true)
- public void beforeClass() throws Exception {
- mgmt = new LocalManagementContext();
- location = mgmt.getLocationRegistry()
- .resolve(getLocation(), getFlags());
- super.setUp();
- }
-
- @AfterClass(alwaysRun = true)
- public void afterClass() throws Exception {
- // Entities.destroyAll(mgmt);
- }
-
- public abstract String getLocation();
-
- public Map<String, ?> getFlags() {
- return MutableMap.of();
- }
-
- @Test(groups = {"Live","WIP"}) // needs repair to avoid hard dependency on Andrea's environment
- public void deployStorm() throws Exception {
- try {
- zooKeeperEnsemble = app.createAndManageChild(EntitySpec.create(
- ZooKeeperEnsemble.class).configure(
- ZooKeeperEnsemble.INITIAL_SIZE, 3));
- nimbus = app.createAndManageChild(EntitySpec
- .create(Storm.class)
- .configure(Storm.ROLE, NIMBUS)
- .configure(NIMBUS_HOSTNAME, "localhost")
- .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
- );
- supervisor = app.createAndManageChild(EntitySpec
- .create(Storm.class)
- .configure(Storm.ROLE, SUPERVISOR)
- .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
- .configure(NIMBUS_HOSTNAME,
- attributeWhenReady(nimbus, Attributes.HOSTNAME)));
- ui = app.createAndManageChild(EntitySpec
- .create(Storm.class)
- .configure(Storm.ROLE, UI)
- .configure(ZOOKEEPER_ENSEMBLE, zooKeeperEnsemble)
- .configure(NIMBUS_HOSTNAME,
- attributeWhenReady(nimbus, Attributes.HOSTNAME)));
- log.info("Started Storm deployment on '" + getLocation() + "'");
- app.start(ImmutableList.of(location));
- Entities.dumpInfo(app);
- EntityTestUtils.assertAttributeEqualsEventually(app, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(zooKeeperEnsemble, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
-
- StormTopology stormTopology = createTopology();
- submitTopology(stormTopology, "myExclamation", 3, true, 60000);
- } catch (Exception e) {
- log.error("Failed to deploy Storm", e);
- Assert.fail();
- throw e;
- }
- }
-
- private StormTopology createTopology()
- throws AlreadyAliveException, InvalidTopologyException {
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.setSpout("word", new TestWordSpout(), 10);
- builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
- builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
-
- return builder.createTopology();
- }
-
- public boolean submitTopology(StormTopology stormTopology, String topologyName, int numOfWorkers, boolean debug, long timeoutMs) {
- if (log.isDebugEnabled()) log.debug("Connecting to NimbusClient: {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME));
- Config conf = new Config();
- conf.setDebug(debug);
- conf.setNumWorkers(numOfWorkers);
-
- // TODO - confirm this creats the JAR correctly
- String jar = createJar(
- new File(Os.mergePaths(ResourceUtils.create(this).getClassLoaderDir(), "org/apache/brooklyn/entity/messaging/storm/topologies")),
- "org/apache/brooklyn/entity/messaging/storm/");
- System.setProperty("storm.jar", jar);
- long startMs = System.currentTimeMillis();
- long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs);
- long currentTime = startMs;
- Throwable lastError = null;
- int attempt = 0;
- while (currentTime <= endMs) {
- currentTime = System.currentTimeMillis();
- if (attempt != 0) Time.sleep(Duration.ONE_SECOND);
- if (log.isTraceEnabled()) log.trace("trying connection to {} at time {}", nimbus.getConfig(Storm.NIMBUS_HOSTNAME), currentTime);
-
- try {
- StormSubmitter.submitTopology(topologyName, conf, stormTopology);
- return true;
- } catch (Exception e) {
- if (shouldRetryOn(e)) {
- if (log.isDebugEnabled()) log.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, nimbus.getConfig(Storm.NIMBUS_HOSTNAME), e.getMessage()});
- lastError = e;
- } else {
- throw Throwables.propagate(e);
- }
- }
- attempt++;
- }
- log.warn("unable to connect to Nimbus client: ", lastError);
- Assert.fail();
- return false;
- }
-
- private boolean shouldRetryOn(Exception e) {
- if (e.getMessage().equals("org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused")) return true;
- return false;
- }
-
- private String createJar(File dir, String parentDirInJar) {
- if (dir.isDirectory()) {
- File jarFile = ArchiveBuilder.jar().addAt(dir, parentDirInJar).create(Os.newTempDir(getClass())+"/topologies.jar");
- return jarFile.getAbsolutePath();
- } else {
- return dir.getAbsolutePath(); // An existing Jar archive?
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
deleted file mode 100644
index bf54003..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormEc2LiveTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.collect.ImmutableList;
-
-public class StormEc2LiveTest extends AbstractEc2LiveTest {
-
- /**
- * Test that can install, start and use a Storm cluster: 1 nimbus, 1 zookeeper, 1 supervisor (worker node).
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class));
- Storm nimbus = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
- Storm.Role.NIMBUS));
- Storm supervisor = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
- Storm.Role.SUPERVISOR));
- Storm ui = app.createAndManageChild(EntitySpec.create(Storm.class).configure("storm.role",
- Storm.Role.UI));
- app.start(ImmutableList.of(loc));
- Entities.dumpInfo(app);
-
- EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(nimbus, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(supervisor, Startable.SERVICE_UP, true);
- EntityTestUtils.assertAttributeEqualsEventually(ui, Startable.SERVICE_UP, true);
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
deleted file mode 100644
index e1a9ad4..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/StormGceLiveTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm;
-
-import java.util.Map;
-
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.testng.annotations.Test;
-
-@Test(groups="Live")
-public class StormGceLiveTest extends StormAbstractCloudLiveTest {
-
- private static final String NAMED_LOCATION = "gce-europe-west1";
- private static final String LOCATION_ID = "gce-europe-west1-a";
- private static final String URI = "https://www.googleapis.com/compute/v1beta15/projects/google/global/images/centos-6-v20130325";
- private static final String IMAGE_ID = "centos-6-v20130325";
-
- @Override
- public String getLocation() {
- return NAMED_LOCATION;
- }
-
- @Override
- public Map<String, ?> getFlags() {
- return MutableMap.of(
- "locationId", LOCATION_ID,
- "imageId", IMAGE_ID,
- "uri", URI + IMAGE_ID,
- "groupId", "storm-test",
- "stopIptables", "true"
- );
- }
-
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
deleted file mode 100644
index a10a30e..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/storm/topologies/ExclamationBolt.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.storm.topologies;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class ExclamationBolt extends BaseRichBolt {
- OutputCollector _collector;
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
- _collector.ack(tuple);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java
deleted file mode 100644
index e8d49ee..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEc2LiveTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.zookeeper;
-
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.testng.annotations.Test;
-import org.apache.brooklyn.entity.AbstractEc2LiveTest;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.collect.ImmutableList;
-
-public class ZooKeeperEc2LiveTest extends AbstractEc2LiveTest {
-
- /**
- * Test that can install, start and use a Zookeeper instance.
- */
- @Override
- protected void doTest(Location loc) throws Exception {
- ZooKeeperNode zookeeper = app.createAndManageChild(EntitySpec.create(ZooKeeperNode.class).configure("jmxPort", "31001+"));
- app.start(ImmutableList.of(loc));
- Entities.dumpInfo(zookeeper);
- EntityTestUtils.assertAttributeEqualsEventually(zookeeper, Startable.SERVICE_UP, true);
- }
-
- @Test(enabled=false)
- public void testDummy() {} // Convince testng IDE integration that this really does have test methods
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/02abbab0/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
----------------------------------------------------------------------
diff --git a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java b/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
deleted file mode 100644
index f143df7..0000000
--- a/brooklyn-library/software/messaging/src/test/java/org/apache/brooklyn/entity/messaging/zookeeper/ZooKeeperEnsembleLiveTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.entity.messaging.zookeeper;
-
-import org.apache.brooklyn.core.entity.Attributes;
-import org.apache.brooklyn.core.entity.Entities;
-import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
-import org.apache.brooklyn.core.entity.trait.Startable;
-import org.apache.brooklyn.core.test.entity.TestApplication;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble;
-import org.apache.brooklyn.entity.zookeeper.ZooKeeperNode;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.EntitySpec;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.test.EntityTestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.net.Socket;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * A live test of the {@link org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble} entity.
- *
- * Tests that a 3 node cluster can be started on Amazon EC2 and data written on one {@link org.apache.brooklyn.entity.zookeeper.ZooKeeperEnsemble}
- * can be read from another, using the Astyanax API.
- */
-public class ZooKeeperEnsembleLiveTest {
-
- private static final Logger log = LoggerFactory.getLogger(ZooKeeperEnsembleLiveTest.class);
-
- private String provider =
- "gce-europe-west1";
-// "aws-ec2:eu-west-1";
-// "named:hpcloud-compute-at";
-// "localhost";
-
- protected TestApplication app;
- protected Location testLocation;
- protected ZooKeeperEnsemble cluster;
-
- @BeforeMethod(alwaysRun = true)
- public void setup() {
- app = ApplicationBuilder.newManagedApp(TestApplication.class);
- testLocation = app.getManagementContext().getLocationRegistry().resolve(provider);
- }
-
- @AfterMethod(alwaysRun = true)
- public void shutdown() {
- Entities.destroyAll(app.getManagementContext());
- }
-
- /**
- * Test that a two node cluster starts up and allows access through both nodes.
- */
- @Test(groups = "Live")
- public void testStartUpConnectAndResize() throws Exception {
- try {
- cluster = app.createAndManageChild(EntitySpec.create(ZooKeeperEnsemble.class)
- .configure("initialSize", 3)
- .configure("clusterName", "ZooKeeperEnsembleLiveTest"));
- assertEquals(cluster.getCurrentSize().intValue(), 0);
-
- app.start(ImmutableList.of(testLocation));
-
- EntityTestUtils.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 3);
- Entities.dumpInfo(app);
-
- EntityTestUtils.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true);
- for(Entity zkNode : cluster.getMembers()) {
- assertTrue(isSocketOpen((ZooKeeperNode) zkNode));
- }
- cluster.resize(1);
- EntityTestUtils.assertAttributeEqualsEventually(cluster, ZooKeeperEnsemble.GROUP_SIZE, 1);
- Entities.dumpInfo(app);
- EntityTestUtils.assertAttributeEqualsEventually(cluster, Startable.SERVICE_UP, true);
- for (Entity zkNode : cluster.getMembers()) {
- assertTrue(isSocketOpen((ZooKeeperNode) zkNode));
- }
- } catch (Throwable e) {
- throw Throwables.propagate(e);
- }
- }
-
- protected static boolean isSocketOpen(ZooKeeperNode node) {
- int attempt = 0, maxAttempts = 20;
- while(attempt < maxAttempts) {
- try {
- Socket s = new Socket(node.getAttribute(Attributes.HOSTNAME), node.getZookeeperPort());
- s.close();
- return true;
- } catch (Exception e) {
- attempt++;
- }
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- return false;
- }
-
-}