You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:24 UTC
[25/50] [abbrv] git commit: fixed tests
fixed tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/bd0cc453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/bd0cc453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/bd0cc453
Branch: refs/heads/master
Commit: bd0cc453a5d7389eb2a92bacef46c13b436a0316
Parents: 4de85c8
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Feb 23 16:10:38 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Feb 23 16:10:38 2014 +0000
----------------------------------------------------------------------
src/test/storm/kafka/KafkaUtilsTest.java | 31 ++++++++++++++++-----------
1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bd0cc453/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index db270c2..0763042 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -28,13 +28,14 @@ public class KafkaUtilsTest {
private KafkaTestBroker broker;
private SimpleConsumer simpleConsumer;
private KafkaConfig config;
+ private BrokerHosts brokerHosts;
@Before
public void setup() {
broker = new KafkaTestBroker();
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
- BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
+ brokerHosts = new StaticHosts(globalPartitionInformation);
config = new KafkaConfig(brokerHosts, "testTopic");
simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
}
@@ -60,19 +61,31 @@ public class KafkaUtilsTest {
@Test
public void fetchMessage() throws Exception {
- long lastOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
- sendMessageAndAssertValueForOffset(lastOffset);
+ String value = "test";
+ createTopicAndSendMessage(value);
+ long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
+ ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer,
+ new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
+ String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+ assertThat(message, is(equalTo(value)));
}
@Test(expected = FailedFetchException.class)
public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
config.useStartOffsetTimeIfOffsetOutOfRange = false;
- sendMessageAndAssertValueForOffset(-99);
+ KafkaUtils.fetchMessages(config, simpleConsumer,
+ new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
}
@Test
public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
- sendMessageAndAssertValueForOffset(-99);
+ config = new KafkaConfig(brokerHosts, "newTopic");
+ String value = "test";
+ createTopicAndSendMessage(value);
+ ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer,
+ new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
+ String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+ assertThat(message, is(equalTo(value)));
}
@Test
@@ -165,12 +178,4 @@ public class KafkaUtilsTest {
Producer<String, String> producer = new Producer<String, String>(producerConfig);
producer.send(new KeyedMessage<String, String>(config.topic, key, value));
}
-
- private void sendMessageAndAssertValueForOffset(long offset) {
- String value = "test";
- createTopicAndSendMessage(value);
- ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
- String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
- assertThat(message, is(equalTo(value)));
- }
}