You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:38 UTC
[20/50] brooklyn-library git commit: Added test using Kafka API and
example application
Added test using Kafka API and example application
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/c1437063
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/c1437063
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/c1437063
Branch: refs/heads/0.5.0
Commit: c14370630975d0b83741704f0df266cf7033a6a8
Parents: 37e890c
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Wed Mar 20 19:49:50 2013 +0000
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:06 2013 +0100
----------------------------------------------------------------------
examples/simple-messaging-pubsub/README.txt | 7 +-
.../java/brooklyn/demo/KafkaClusterExample.java | 40 +++++++++++
.../brooklyn/demo/StandaloneBrokerExample.java | 54 ---------------
.../demo/StandaloneQpidBrokerExample.java | 54 +++++++++++++++
software/messaging/pom.xml | 73 +++++++++++++-------
.../entity/messaging/kafka/KafkaBroker.java | 4 +-
.../entity/messaging/kafka/KafkaBrokerImpl.java | 7 +-
.../entity/messaging/kafka/KafkaZookeeper.java | 2 +-
.../messaging/kafka/KafkaIntegrationTest.groovy | 21 ++++--
.../entity/messaging/kafka/KafkaSupport.java | 72 +++++++++++++++++++
10 files changed, 240 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/README.txt
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/README.txt b/examples/simple-messaging-pubsub/README.txt
index 6048867..e27992e 100644
--- a/examples/simple-messaging-pubsub/README.txt
+++ b/examples/simple-messaging-pubsub/README.txt
@@ -7,7 +7,7 @@ The commands below assume that the `brooklyn` script is already on your $PATH, a
export BROOKLYN_CLASSPATH=$(pwd)/target/classes
# Launches a qpid broker on localhost
- brooklyn -v launch --app brooklyn.demo.StandaloneBrokerExample --location localhost
+ brooklyn -v launch --app brooklyn.demo.StandaloneQpidBrokerExample --location localhost
# You can get the broker's URL from the brooklyn web-console at http://localhost:8081
# by looking at the broker entity's sensors or from the verbose output from the application startup
@@ -19,6 +19,11 @@ The commands below assume that the `brooklyn` script is already on your $PATH, a
# Test publishing a message to the broker
java -cp "./resources/lib/*:./target/classes" brooklyn.demo.Publish ${URL}
+To test a Kafka distributed messaging cluster example, use the following command:
+
+ # Launches a Kafka cluster on AWS EC2 with two brokers
+ brooklyn -v launch --app brooklyn.demo.KafkaClusterExample --location aws-ec2:eu-west-1
+
---
For more information, please visit: http://brooklyncentral.github.com/use/examples/messaging/
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
new file mode 100644
index 0000000..fae6bb6
--- /dev/null
+++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/KafkaClusterExample.java
@@ -0,0 +1,40 @@
+package brooklyn.demo;
+
+import java.util.List;
+
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.messaging.kafka.KafkaCluster;
+import brooklyn.entity.proxying.BasicEntitySpec;
+import brooklyn.launcher.BrooklynLauncher;
+import brooklyn.util.CommandLineUtil;
+
+import com.google.common.collect.Lists;
+
+/** Kafka Cluster Application */
+public class KafkaClusterExample extends ApplicationBuilder {
+
+ public static final String DEFAULT_LOCATION = "localhost";
+
+ /** Configure the application. */
+ protected void doBuild() {
+ createChild(BasicEntitySpec.newInstance(KafkaCluster.class)
+ .configure("initialSize", 2));
+
+ appDisplayName("Kafka cluster application");
+ }
+
+ public static void main(String[] argv) {
+ List<String> args = Lists.newArrayList(argv);
+ String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
+ String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
+
+ BrooklynLauncher launcher = BrooklynLauncher.newInstance()
+ .application(new KafkaClusterExample())
+ .webconsolePort(port)
+ .location(location)
+ .start();
+
+ Entities.dumpInfo(launcher.getApplications());
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java
deleted file mode 100644
index 83cdc31..0000000
--- a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneBrokerExample.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package brooklyn.demo;
-
-import java.util.List;
-
-import brooklyn.entity.basic.AbstractApplication;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.messaging.amqp.AmqpServer;
-import brooklyn.entity.messaging.qpid.QpidBroker;
-import brooklyn.entity.proxying.EntitySpecs;
-import brooklyn.launcher.BrooklynLauncher;
-import brooklyn.util.CommandLineUtil;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-/** Qpid Broker Application */
-public class StandaloneBrokerExample extends AbstractApplication {
-
- public static final String CUSTOM_CONFIG_PATH = "classpath://custom-config.xml";
- public static final String PASSWD_PATH = "classpath://passwd";
- public static final String QPID_BDBSTORE_JAR_PATH = "classpath://qpid-bdbstore-0.14.jar";
- public static final String BDBSTORE_JAR_PATH = "classpath://je-5.0.34.jar";
-
- public static final String DEFAULT_LOCATION = "localhost";
-
- @Override
- public void init() {
- // Configure the Qpid broker entity
- QpidBroker broker = addChild(EntitySpecs.spec(QpidBroker.class)
- .configure("amqpPort", 5672)
- .configure("amqpVersion", AmqpServer.AMQP_0_10)
- .configure("runtimeFiles", ImmutableMap.builder()
- .put(QpidBroker.CONFIG_XML, CUSTOM_CONFIG_PATH)
- .put(QpidBroker.PASSWD, PASSWD_PATH)
- .put("lib/opt/qpid-bdbstore-0.14.jar", QPID_BDBSTORE_JAR_PATH)
- .put("lib/opt/je-5.0.34.jar", BDBSTORE_JAR_PATH)
- .build())
- .configure("queue", "testQueue"));
- }
-
- public static void main(String[] argv) {
- List<String> args = Lists.newArrayList(argv);
- String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
- String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
-
- BrooklynLauncher launcher = BrooklynLauncher.newInstance()
- .application(EntitySpecs.appSpec(StandaloneBrokerExample.class).displayName("Qpid app"))
- .webconsolePort(port)
- .location(location)
- .start();
-
- Entities.dumpInfo(launcher.getApplications());
- }
-}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java
----------------------------------------------------------------------
diff --git a/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java
new file mode 100644
index 0000000..19b6c2e
--- /dev/null
+++ b/examples/simple-messaging-pubsub/src/main/java/brooklyn/demo/StandaloneQpidBrokerExample.java
@@ -0,0 +1,54 @@
+package brooklyn.demo;
+
+import java.util.List;
+
+import brooklyn.entity.basic.AbstractApplication;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.messaging.amqp.AmqpServer;
+import brooklyn.entity.messaging.qpid.QpidBroker;
+import brooklyn.entity.proxying.EntitySpecs;
+import brooklyn.launcher.BrooklynLauncher;
+import brooklyn.util.CommandLineUtil;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/** Qpid Broker Application */
+public class StandaloneQpidBrokerExample extends AbstractApplication {
+
+ public static final String CUSTOM_CONFIG_PATH = "classpath://custom-config.xml";
+ public static final String PASSWD_PATH = "classpath://passwd";
+ public static final String QPID_BDBSTORE_JAR_PATH = "classpath://qpid-bdbstore-0.14.jar";
+ public static final String BDBSTORE_JAR_PATH = "classpath://je-5.0.34.jar";
+
+ public static final String DEFAULT_LOCATION = "localhost";
+
+ @Override
+ public void init() {
+ // Configure the Qpid broker entity
+ QpidBroker broker = addChild(EntitySpecs.spec(QpidBroker.class)
+ .configure("amqpPort", 5672)
+ .configure("amqpVersion", AmqpServer.AMQP_0_10)
+ .configure("runtimeFiles", ImmutableMap.builder()
+ .put(QpidBroker.CONFIG_XML, CUSTOM_CONFIG_PATH)
+ .put(QpidBroker.PASSWD, PASSWD_PATH)
+ .put("lib/opt/qpid-bdbstore-0.14.jar", QPID_BDBSTORE_JAR_PATH)
+ .put("lib/opt/je-5.0.34.jar", BDBSTORE_JAR_PATH)
+ .build())
+ .configure("queue", "testQueue"));
+ }
+
+ public static void main(String[] argv) {
+ List<String> args = Lists.newArrayList(argv);
+ String port = CommandLineUtil.getCommandLineOption(args, "--port", "8081+");
+ String location = CommandLineUtil.getCommandLineOption(args, "--location", DEFAULT_LOCATION);
+
+ BrooklynLauncher launcher = BrooklynLauncher.newInstance()
+ .application(EntitySpecs.appSpec(StandaloneQpidBrokerExample.class).displayName("Qpid app"))
+ .webconsolePort(port)
+ .location(location)
+ .start();
+
+ Entities.dumpInfo(launcher.getApplications());
+ }
+}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/software/messaging/pom.xml b/software/messaging/pom.xml
index 4b0f372..7a81336 100644
--- a/software/messaging/pom.xml
+++ b/software/messaging/pom.xml
@@ -14,37 +14,21 @@
<relativePath>../../pom.xml</relativePath>
</parent>
- <dependencies>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- <version>1.1.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client</artifactId>
- <version>0.20</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-core</artifactId>
- <version>5.7.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>2.8.7</version>
- </dependency>
+ <repositories>
+ <repository>
+ <id>clojars.org</id>
+ <url>http://clojars.org/repo</url>
+ </repository>
+ </repositories>
+ <dependencies>
<dependency>
<groupId>io.brooklyn</groupId>
<artifactId>brooklyn-software-base</artifactId>
<version>${project.version}</version>
</dependency>
-
+
+ <!-- test dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brooklyn-test-support</artifactId>
@@ -64,13 +48,50 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
- <!-- bring in jclouds for testing -->
<dependency>
<groupId>io.brooklyn</groupId>
<artifactId>brooklyn-locations-jclouds</artifactId>
<version>${brooklyn.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- for qpid -->
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.1.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ <version>0.20</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- for activemq -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.7.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- for rabbit -->
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>2.8.7</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- for kafka -->
+ <dependency>
+ <groupId>storm</groupId>
+ <artifactId>kafka</artifactId>
+ <version>0.7.0-incubating</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
index 13b8d0d..01fa424 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBroker.java
@@ -45,7 +45,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
@SetFromFlag("zookeeper")
BasicConfigKey<KafkaZookeeper> ZOOKEEPER = new BasicConfigKey<KafkaZookeeper>(KafkaZookeeper.class, "Kafka zookeeper entity");
- AttributeSensor<Long> BROKER_ID = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.id", "Kafka unique broker ID");
+ AttributeSensor<Integer> BROKER_ID = new BasicAttributeSensor<Integer>(Integer.class, "kafka.broker.id", "Kafka unique broker ID");
BasicAttributeSensor<Long> FETCH_REQUEST_COUNT = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.total", "Fetch request count");
BasicAttributeSensor<Long> TOTAL_FETCH_TIME = new BasicAttributeSensor<Long>(Long.class, "kafka.broker.fetch.time.total", "Total fetch request processing time (millis)");
@@ -60,7 +60,7 @@ public interface KafkaBroker extends SoftwareProcess, MessageBroker, UsesJmx, Ka
Integer getKafkaPort();
- Long getBrokerId();
+ Integer getBrokerId();
KafkaZookeeper getZookeeper();
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index d76072e..ae21118 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -29,6 +29,7 @@ import brooklyn.entity.Entity;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.basic.SoftwareProcessImpl;
import brooklyn.entity.messaging.MessageBroker;
+import brooklyn.event.basic.BasicAttributeSensor;
import brooklyn.event.feed.function.FunctionFeed;
import brooklyn.event.feed.function.FunctionPollConfig;
import brooklyn.event.feed.jmx.JmxAttributePollConfig;
@@ -45,8 +46,6 @@ import com.google.common.collect.Sets;
public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroker, KafkaBroker {
private static final Logger log = LoggerFactory.getLogger(KafkaBrokerImpl.class);
- private static final AtomicLong brokers = new AtomicLong(0l);
-
public KafkaBrokerImpl() {
super();
}
@@ -62,14 +61,14 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
@Override
public void postConstruct() {
- setAttribute(BROKER_ID, brokers.incrementAndGet());
+ setAttribute(BROKER_ID, hashCode());
}
@Override
public Integer getKafkaPort() { return getAttribute(KAFKA_PORT); }
@Override
- public Long getBrokerId() { return getAttribute(BROKER_ID); }
+ public Integer getBrokerId() { return getAttribute(BROKER_ID); }
@Override
public KafkaZookeeper getZookeeper() { return getConfig(ZOOKEEPER); }
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
index 8e1b5da..a1001f3 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaZookeeper.java
@@ -35,7 +35,7 @@ public interface KafkaZookeeper extends SoftwareProcess, UsesJmx, Kafka {
@SetFromFlag("zookeeperPort")
PortAttributeSensorAndConfigKey ZOOKEEPER_PORT = new PortAttributeSensorAndConfigKey("zookeeper.port", "Zookeeper port", "2181+");
- /** Location of the configuration file template to be copied to the server.*/
+ /** Location of the configuration file template to be copied to the server. */
@SetFromFlag("zookeeperConfig")
BasicConfigKey<String> ZOOKEEPER_CONFIG_TEMPLATE = new BasicConfigKey<String>(
String.class, "kafka.config.zookeeper", "Zookeeper configuration template (in freemarker format)", "classpath://brooklyn/entity/messaging/kafka/zookeeper.properties");
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
index 0943dcd..cbfb410 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
@@ -46,6 +46,8 @@ import brooklyn.util.internal.TimeExtras
/**
* Test the operation of the {@link ActiveMQBroker} class.
+ *
+ * TODO test that sensors update.
*/
public class KafkaIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(KafkaIntegrationTest.class)
@@ -101,20 +103,27 @@ public class KafkaIntegrationTest {
/**
* Test that we can start a cluster with zookeeper and one broker.
+ *
+ * Connects to the zookeeper controller and tests sending and receiving messages on a topic.
*/
@Test(groups = "Integration")
public void testSingleBrokerCluster() {
KafkaCluster cluster = app.createAndManageChild(BasicEntitySpec.newInstance(KafkaCluster.class).configure(KafkaCluster.INITIAL_SIZE, 1));
cluster.start([ testLocation ])
- executeUntilSucceedsWithShutdown(cluster, timeout:600*TimeUnit.SECONDS) {
+ executeUntilSucceeds(timeout:600*TimeUnit.SECONDS) {
assertTrue cluster.getAttribute(Startable.SERVICE_UP)
- Entities.dumpInfo(cluster)
}
- assertFalse cluster.getAttribute(Startable.SERVICE_UP)
+
+ Entities.dumpInfo(cluster);
+
+ Thread.sleep(5000l);
+
+ KafkaSupport support = new KafkaSupport(cluster.getZookeeper());
+ support.sendMessage("brooklyn", "TEST_MESSAGE")
+ List<String> messages = support.getMessage("brooklyn");
+ assertEquals(messages.size(), 1);
+ assertEquals(messages.get(0), "TEST_MESSAGE");
}
- // TODO test with API sending messages
- // TODO test that sensors update
- // TODO add demo application
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/c1437063/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
new file mode 100644
index 0000000..d026f06
--- /dev/null
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2013 by Cloudsoft Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package brooklyn.entity.messaging.kafka;
+
+import static org.testng.Assert.*;
+
+import java.util.List;
+import java.util.Properties;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaMessageStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.javaapi.producer.Producer;
+import kafka.javaapi.producer.ProducerData;
+import kafka.message.Message;
+import kafka.producer.ProducerConfig;
+import brooklyn.entity.basic.Attributes;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class KafkaSupport {
+
+ private final KafkaZookeeper zookeeper;
+
+ public KafkaSupport(KafkaZookeeper zookeeper) {
+ this.zookeeper = zookeeper;
+ }
+
+ public void sendMessage(String topic, String message) {
+ Properties props = new Properties();
+ props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ ProducerConfig config = new ProducerConfig(props);
+ Producer<String, String> producer = new Producer<String, String>(config);
+ ProducerData<String, String> data = new ProducerData<String, String>(topic, message);
+ producer.send(data);
+ producer.close();
+ }
+
+ public List<String> getMessage(String topic) {
+ Properties props = new Properties();
+ props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
+ props.put("zk.connectiontimeout.ms", "1000000");
+ props.put("groupid", "test_group");
+ ConsumerConfig consumerConfig = new ConsumerConfig(props);
+ ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+ List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
+ List<String> messages = Lists.newArrayList();
+ for (Message msg : Iterables.getOnlyElement(streams)) {
+ assertTrue(msg.isValid());
+ String payload = new String(msg.payload().array());
+ messages.add(payload);
+ }
+ return messages;
+ }
+}