You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2016/02/01 18:46:45 UTC
[27/50] brooklyn-library git commit: Update test using Kafka API
Update test using Kafka API
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/ea32c5ad
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/ea32c5ad
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/ea32c5ad
Branch: refs/heads/0.5.0
Commit: ea32c5ad63b80c2e3b0b4c60f7f35c28042d5b38
Parents: dee689a
Author: Andrew Kennedy <an...@cloudsoftcorp.com>
Authored: Wed Mar 20 23:46:05 2013 +0000
Committer: Andrew Kennedy <an...@cloudsoftcorp.com>
Committed: Fri Apr 19 10:36:07 2013 +0100
----------------------------------------------------------------------
.../entity/messaging/kafka/KafkaBrokerImpl.java | 2 +-
.../messaging/kafka/KafkaIntegrationTest.groovy | 5 ++---
.../entity/messaging/kafka/KafkaSupport.java | 23 +++++++++++---------
3 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ea32c5ad/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
index 625fe3f..ed9ae0c 100644
--- a/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
+++ b/software/messaging/src/main/java/brooklyn/entity/messaging/kafka/KafkaBrokerImpl.java
@@ -64,7 +64,7 @@ public class KafkaBrokerImpl extends SoftwareProcessImpl implements MessageBroke
@Override
public void postConstruct() {
- setAttribute(BROKER_ID, hashCode());
+ setAttribute(BROKER_ID, Math.abs(hashCode())); // Must be positive for partitioning to work
}
@Override
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ea32c5ad/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
index 89f5773..2ef95c5 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaIntegrationTest.groovy
@@ -119,9 +119,8 @@ public class KafkaIntegrationTest {
KafkaSupport support = new KafkaSupport(cluster.getZookeeper());
support.sendMessage("brooklyn", "TEST_MESSAGE")
- List<String> messages = support.getMessage("brooklyn");
- assertEquals(messages.size(), 1);
- assertEquals(messages.get(0), "TEST_MESSAGE");
+ String message = support.getMessage("brooklyn");
+ assertEquals(message, "TEST_MESSAGE");
}
}
http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/ea32c5ad/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
----------------------------------------------------------------------
diff --git a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
index d026f06..d9372a9 100644
--- a/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
+++ b/software/messaging/src/test/java/brooklyn/entity/messaging/kafka/KafkaSupport.java
@@ -15,13 +15,15 @@
*/
package brooklyn.entity.messaging.kafka;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertTrue;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaMessageStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
@@ -53,20 +55,21 @@ public class KafkaSupport {
producer.close();
}
- public List<String> getMessage(String topic) {
+ public String getMessage(String topic) {
Properties props = new Properties();
props.put("zk.connect", String.format("%s:%d", zookeeper.getAttribute(Attributes.HOSTNAME), zookeeper.getZookeeperPort()));
props.put("zk.connectiontimeout.ms", "1000000");
- props.put("groupid", "test_group");
+ props.put("groupid", "brooklyn");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
List<KafkaMessageStream<Message>> streams = consumer.createMessageStreams(ImmutableMap.of(topic, 1)).get(topic);
- List<String> messages = Lists.newArrayList();
- for (Message msg : Iterables.getOnlyElement(streams)) {
- assertTrue(msg.isValid());
- String payload = new String(msg.payload().array());
- messages.add(payload);
- }
- return messages;
+ ConsumerIterator<Message> iterator = Iterables.getOnlyElement(streams).iterator();
+ Message msg = iterator.next();
+ assertTrue(msg.isValid());
+ ByteBuffer buf = msg.payload();
+ byte[] data = new byte[buf.remaining()];
+ buf.get(data);
+ String payload = new String(data);
+ return payload;
}
}