You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:38 UTC

[20/50] brooklyn-library git commit: Added test using Kafka API and example application

Added test using Kafka API and example application


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/c1437063
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/c1437063
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/c1437063

Branch: refs/heads/0.5.0
Commit: c14370630975d0b83741704f0df266cf7033a6a8
Parents: 37e890c
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Wed Mar 20 19:49:50 2013 +0000
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:06 2013 +0100

----------------------------------------------------------------------
 examples/simple-messaging-pubsub/README.txt     |  7 +-
 .../java/brooklyn/demo/KafkaClusterExample.java | 40 +++++++++++
 .../brooklyn/demo/StandaloneBrokerExample.java  | 54 ---------------
 .../demo/StandaloneQpidBrokerExample.java       | 54 +++++++++++++++
 software/messaging/pom.xml                      | 73 +++++++++++++-------
 .../entity/messaging/kafka/KafkaBroker.java     |  4 +-
 .../entity/messaging/kafka/KafkaBrokerImpl.java |  7 +-
 .../entity/messaging/kafka/KafkaZookeeper.java  |  2 +-
 .../messaging/kafka/KafkaIntegrationTest.groovy | 21 ++++--
 .../entity/messaging/kafka/KafkaSupport.java    | 72 +++++++++++++++++++
 10 files changed, 240 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/README.txt
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/README.txt b/examples/simple-messaging-pubsub/README.txt
index 6048867..e27992e 100644
--- a/examples/simple-messaging-pubsub/README.txt
+++ b/examples/simple-messaging-pubsub/README.txt
@@ -7,7 +7,7 @@ The commands below assume that the `brooklyn` script is already on your $PATH, a
   export BROOKLYN_CLASSPATH=$(pwd)/target/classes
   
   # Launches a qpid broker on localhost
-  brooklyn -v launch --app brooklyn.demo.StandaloneBrokerExample --location localhost
+  brooklyn -v launch --app brooklyn.demo.StandaloneQpidBrokerExample --location localhost
 
   # You can get the broker's URL from the brooklyn web-console at http://localhost:8081
   # by looking at the broker entity's sensors or from the verbose output from the application startup
@@ -19,6 +19,11 @@ The commands below assume that the `brooklyn` script is already on your $PATH, a
   # Test publishing a message to the broker
   java -cp "./resources/lib/*:./target/classes" brooklyn.demo.Publish ${URL}
 
+To test a Kafka distributed messaging cluster example, use the following command:
+
+  # Launches a Kafka cluster on AWS EC2 with two brokers
+  brooklyn -v launch --app brooklyn.demo.KafkaClusterExample --location aws-ec2:eu-west-1
+
 ---
 
 For more information, please visit: http://brooklyncentral.github.com/use/examples/messaging/

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
new file mode 100644
index 0000000..fae6bb6
--- /dev/null
+++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
@@ -0,0 +1,40 @@
+package brooklyn.demo;
+
+import java.util.List;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.messaging.kafka.KafkaCluster;
+import brooklyn.entity.proxying.BasicEntitySpec;
+import brooklyn.launcher.BrooklynLauncher;
+import brooklyn.util.CommandLineUtil;
+
+import com.google.common.collect.Lists;
+
+/** Kafka Cluster Application */
+public class KafkaClusterExample extends ApplicationBuilder {
+
+    public static final String DEFAULT_LOCATION = "localhost";
+
+    /** Configure the application. */
+    protected void doBuild() {
+        createChild(BasicEntitySpec.newInstance(KafkaCluster.class)
+                .configure("initialSize", 2));
+
+        appDisplayName("Kafka cluster application");
+    }
+
+    public static void main(String[] argv) {
+        List<String> args = Lists.newArrayList(argv);
+        String port =  CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
+        String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
+
+        BrooklynLauncher launcher = BrooklynLauncher.newInstance()
+                .application(new KafkaClusterExample())
+                .webconsolePort(port)
+                .location(location)
+                .start();
+
+        Entities.dumpInfo(launcher.getApplications());
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java
deleted file mode 100644
index 83cdc31..0000000
--- a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package brooklyn.demo;
-
-import java.util.List;
-
-import brooklyn.entity.basic.AbstractApplication;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.messaging.amqp.AmqpServer;
-import brooklyn.entity.messaging.qpid.QpidBroker;
-import brooklyn.entity.proxying.EntitySpecs;
-import brooklyn.launcher.BrooklynLauncher;
-import brooklyn.util.CommandLineUtil;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-/** Qpid Broker Application */
-public class StandaloneBrokerExample extends AbstractApplication {
-
-    public static final String CUSTOM_CONFIG_PATH = "classpath://custom-config.xml";
-    public static final String PASSWD_PATH = "classpath://passwd";
-    public static final String QPID_BDBSTORE_JAR_PATH = "classpath://qpid-bdbstore-0.14.jar";
-    public static final String BDBSTORE_JAR_PATH = "classpath://je-5.0.34.jar";
-
-    public static final String DEFAULT_LOCATION = "localhost";
-    
-    @Override
-    public void init() {
-        // Configure the Qpid broker entity
-    	QpidBroker broker = addChild(EntitySpecs.spec(QpidBroker.class)
-    	        .configure("amqpPort", 5672)
-    	        .configure("amqpVersion", AmqpServer.AMQP_0_10)
-    	        .configure("runtimeFiles", ImmutableMap.builder()
-    	                .put(QpidBroker.CONFIG_XML, CUSTOM_CONFIG_PATH)
-    	                .put(QpidBroker.PASSWD, PASSWD_PATH)
-    	                .put("lib/opt/qpid-bdbstore-0.14.jar", QPID_BDBSTORE_JAR_PATH)
-    	                .put("lib/opt/je-5.0.34.jar", BDBSTORE_JAR_PATH)
-    	                .build())
-    	        .configure("queue", "testQueue"));
-    }
-
-    public static void main(String[] argv) {
-        List<String> args = Lists.newArrayList(argv);
-        String port =  CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
-        String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
-
-        BrooklynLauncher launcher = BrooklynLauncher.newInstance()
-                .application(EntitySpecs.appSpec(StandaloneBrokerExample.class).displayName("Qpid app"))
-                .webconsolePort(port)
-                .location(location)
-                .start();
-         
-        Entities.dumpInfo(launcher.getApplications());
-    }
-}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java
new file mode 100644
index 0000000..19b6c2e
--- /dev/null
+++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java
@@ -0,0 +1,54 @@
+package brooklyn.demo;
+
+import java.util.List;
+
+import brooklyn.entity.basic.AbstractApplication;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.messaging.amqp.AmqpServer;
+import brooklyn.entity.messaging.qpid.QpidBroker;
+import brooklyn.entity.proxying.EntitySpecs;
+import brooklyn.launcher.BrooklynLauncher;
+import brooklyn.util.CommandLineUtil;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/** Qpid Broker Application */
+public class StandaloneQpidBrokerExample extends AbstractApplication {
+
+    public static final String CUSTOM_CONFIG_PATH = "classpath://custom-config.xml";
+    public static final String PASSWD_PATH = "classpath://passwd";
+    public static final String QPID_BDBSTORE_JAR_PATH = "classpath://qpid-bdbstore-0.14.jar";
+    public static final String BDBSTORE_JAR_PATH = "classpath://je-5.0.34.jar";
+
+    public static final String DEFAULT_LOCATION = "localhost";
+    
+    @Override
+    public void init() {
+        // Configure the Qpid broker entity
+    	QpidBroker broker = addChild(EntitySpecs.spec(QpidBroker.class)
+    	        .configure("amqpPort", 5672)
+    	        .configure("amqpVersion", AmqpServer.AMQP_0_10)
+    	        .configure("runtimeFiles", ImmutableMap.builder()
+    	                .put(QpidBroker.CONFIG_XML, CUSTOM_CONFIG_PATH)
+    	                .put(QpidBroker.PASSWD, PASSWD_PATH)
+    	                .put("lib/opt/qpid-bdbstore-0.14.jar", QPID_BDBSTORE_JAR_PATH)
+    	                .put("lib/opt/je-5.0.34.jar", BDBSTORE_JAR_PATH)
+    	                .build())
+    	        .configure("queue", "testQueue"));
+    }
+
+    public static void main(String[] argv) {
+        List<String> args = Lists.newArrayList(argv);
+        String port =  CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
+        String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
+
+        BrooklynLauncher launcher = BrooklynLauncher.newInstance()
+                .application(EntitySpecs.appSpec(StandaloneQpidBrokerExample.class).displayName("Qpid app"))
+                .webconsolePort(port)
+                .location(location)
+                .start();
+         
+        Entities.dumpInfo(launcher.getApplications());
+    }
+}

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/software/messaging/pom.xml b/software/messaging/pom.xml
index 4b0f372..7a81336 100644
--- a/software/messaging/pom.xml
+++ b/software/messaging/pom.xml
@@ -14,37 +14,21 @@
 		<relativePath>../../pom.xml</relativePath>
 	</parent>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.geronimo.specs</groupId>
-            <artifactId>geronimo-jms_1.1_spec</artifactId>
-            <version>1.1.1</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.qpid</groupId>
-            <artifactId>qpid-client</artifactId>
-            <version>0.20</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-core</artifactId>
-            <version>5.7.0</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.rabbitmq</groupId>
-            <artifactId>amqp-client</artifactId>
-            <version>2.8.7</version>
-        </dependency>
+    <repositories>
+        <repository>
+            <id>clojars.org</id>
+            <url>http://clojars.org/repo</url>
+        </repository>
+    </repositories>
 
+    <dependencies>
         <dependency>
             <groupId>io.brooklyn</groupId>
             <artifactId>brooklyn-software-base</artifactId>
             <version>${project.version}</version>
         </dependency>
-        
+
+        <!-- test dependencies -->
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>brooklyn-test-support</artifactId>
@@ -64,13 +48,50 @@
             <classifier>tests</classifier>
             <scope>test</scope>
         </dependency>
-        <!-- bring in jclouds for testing -->
         <dependency>
             <groupId>io.brooklyn</groupId>
             <artifactId>brooklyn-locations-jclouds</artifactId>
             <version>${brooklyn.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <!-- for qpid -->
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+            <version>1.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-client</artifactId>
+            <version>0.20</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- for activemq -->
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <version>5.7.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- for rabbit -->
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>2.8.7</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- for kafka -->
+        <dependency>
+            <groupId>storm</groupId>
+            <artifactId>kafka</artifactId>
+            <version>0.7.0-incubating</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index 13b8d0d..01fa424 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -45,7 +45,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
     @SetFromFlag("zookeeper")
     BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity");
 
-    AttributeSensor<Long> BROKER_ID = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.id", "Kafka unique broker ID");
+    AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID");
 
     BasicAttributeSensor<Long> FETCH_REQUEST_COUNT = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.total", "Fetch request count");
     BasicAttributeSensor<Long> TOTAL_FETCH_TIME = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.time.total", "Total fetch request processing time (millis)");
@@ -60,7 +60,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
 
     Integer getKafkaPort();
 
-    Long getBrokerId();
+    Integer getBrokerId();
 
     KafkaZookeeper getZookeeper();
 

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index d76072e..ae21118 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -29,6 +29,7 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.SoftwareProcessImpl;
 import brooklyn.entity.messaging.MessageBroker;
+import brooklyn.event.basic.BasicAttributeSensor;
 import brooklyn.event.feed.function.FunctionFeed;
 import brooklyn.event.feed.function.FunctionPollConfig;
 import brooklyn.event.feed.jmx.JmxAttributePollConfig;
@@ -45,8 +46,6 @@ import com.google.common.collect.Sets;
 public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker {
     private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class);
 
-    private static final AtomicLong brokers = new AtomicLong(0l);
-
     public KafkaBrokerImpl() {
         super();
     }
@@ -62,14 +61,14 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
 
     @Override
     public void postConstruct() {
-        setAttribute(BROKER_ID, brokers.incrementAndGet());
+        setAttribute(BROKER_ID, hashCode());
     }
 
     @Override
     public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); }
 
     @Override
-    public Long getBrokerId() { return getAttribute(BROKER_ID); }
+    public Integer getBrokerId() { return getAttribute(BROKER_ID); }
 
     @Override
     public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
index 8e1b5da..a1001f3 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
@@ -35,7 +35,7 @@ public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka {
     @SetFromFlag("zookeeperPort")
     PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
 
-    /** Location of the configuration file template to be copied to the server.*/
+    /** Location of the configuration file template to be copied to the server. */
     @SetFromFlag("zookeeperConfig")
     BasicConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
             String.class, "kafka.config.zookeeper", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
index 0943dcd..cbfb410 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
@@ -46,6 +46,8 @@ import brooklyn.util.internal.TimeExtras
 
 /**
  * Test the operation of the {@link ActiveMQBroker} class.
+ *
+ * TODO test that sensors update.
  */
 public class KafkaIntegrationTest {
     private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class)
@@ -101,20 +103,27 @@ public class KafkaIntegrationTest {
 
     /**
      * Test that we can start a cluster with zookeeper and one broker.
+     *
+     * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
      */
     @Test(groups = "Integration")
     public void testSingleBrokerCluster() {
         KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1));
 
         cluster.start([ testLocation ])
-        executeUntilSucceedsWithShutdown(cluster, timeout:600*TimeUnit.SECONDS) {
+        executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
             assertTrue cluster.getAttribute(Startable.SERVICE_UP)
-            Entities.dumpInfo(cluster)
         }
-        assertFalse cluster.getAttribute(Startable.SERVICE_UP)
+
+        Entities.dumpInfo(cluster);
+
+        Thread.sleep(5000l);
+
+        KafkaSupport support = new KafkaSupport(cluster.getZookeeper());
+        support.sendMessage("brooklyn", "TEST_MESSAGE")
+        List<String> messages = support.getMessage("brooklyn");
+        assertEquals(messages.size(), 1);
+        assertEquals(messages.get(0), "TEST_MESSAGE");
     }
 
-    // TODO test with API sending messages
-    // TODO test that sensors update
-    // TODO add demo application
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
new file mode 100644
index 0000000..d026f06
--- /dev/null
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static org.testng.Assert.*;
+
+import java.util.List;
+import java.util.Properties;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaMessageStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
+import kafka.message.Message;
+import kafka.producer.ProducerConfig;
+import brooklyn.entity.basic.Attributes;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class KafkaSupport {
+
+    private final KafkaZookeeper zookeeper;
+
+    public KafkaSupport(KafkaZookeeper zookeeper) {
+        this.zookeeper = zookeeper;
+    }
+
+    public void sendMessage(String topic, String message) {
+        Properties props = new Properties();
+        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        ProducerConfig config = new ProducerConfig(props);
+        Producer<String, String> producer = new Producer<String, String>(config);
+        ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
+        producer.send(data);
+        producer.close();
+    }
+
+    public List<String> getMessage(String topic) {
+        Properties props = new Properties();
+        props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
+        props.put("zk.connectiontimeout.ms", "1000000");
+        props.put("groupid", "test_group");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+        List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
+        List<String> messages = Lists.newArrayList();
+        for (Message msg : Iterables.getOnlyElement(streams)) {
+            assertTrue(msg.isValid());
+            String payload = new String(msg.payload().array());
+            messages.add(payload);
+          }
+        return messages;
+    }
+}