You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:45 UTC

[27/50] brooklyn-library git commit: Update test using Kafka API

Update test using Kafka API


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/ea32c5ad
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/ea32c5ad
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/ea32c5ad

Branch: refs/heads/0.5.0
Commit: ea32c5ad63b80c2e3b0b4c60f7f35c28042d5b38
Parents: dee689a
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Wed Mar 20 23:46:05 2013 +0000
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:07 2013 +0100

----------------------------------------------------------------------
 .../entity/messaging/kafka/KafkaBrokerImpl.java |  2 +-
 .../messaging/kafka/KafkaIntegrationTest.groovy |  5 ++---
 .../entity/messaging/kafka/KafkaSupport.java    | 23 +++++++++++---------
 3 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ea32c5ad/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index 625fe3f..ed9ae0c 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -64,7 +64,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
 
     @Override
     public void postConstruct() {
-        setAttribute(BROKER_ID, hashCode());
+        setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ea32c5ad/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
index 89f5773..2ef95c5 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
@@ -119,9 +119,8 @@ public class KafkaIntegrationTest {
 
         KafkaSupport support = new KafkaSupport(cluster.getZookeeper());
         support.sendMessage("brooklyn", "TEST_MESSAGE")
-        List<String> messages = support.getMessage("brooklyn");
-        assertEquals(messages.size(), 1);
-        assertEquals(messages.get(0), "TEST_MESSAGE");
+        String message = support.getMessage("brooklyn");
+        assertEquals(message, "TEST_MESSAGE");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ea32c5ad/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index d026f06..d9372a9 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -15,13 +15,15 @@
  */
 package brooklyn.entity.messaging.kafka;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertTrue;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Properties;
 
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaMessageStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.javaapi.producer.Producer;
@@ -53,20 +55,21 @@ public class KafkaSupport {
         producer.close();
     }
 
-    public List<String> getMessage(String topic) {
+    public String getMessage(String topic) {
         Properties props = new Properties();
         props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
         props.put("zk.connectiontimeout.ms", "1000000");
-        props.put("groupid", "test_group");
+        props.put("groupid", "brooklyn");
         ConsumerConfig consumerConfig = new ConsumerConfig(props);
         ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
         List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
-        List<String> messages = Lists.newArrayList();
-        for (Message msg : Iterables.getOnlyElement(streams)) {
-            assertTrue(msg.isValid());
-            String payload = new String(msg.payload().array());
-            messages.add(payload);
-          }
-        return messages;
+        ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator();
+        Message msg = iterator.next();
+        assertTrue(msg.isValid());
+        ByteBuffer buf = msg.payload();
+        byte[] data = new byte[buf.remaining()];
+        buf.get(data);
+        String payload = new String(data);
+        return payload;
     }
 }