You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:24 UTC

[25/50] [abbrv] git commit: fixed tests

fixed tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/bd0cc453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/bd0cc453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/bd0cc453

Branch: refs/heads/master
Commit: bd0cc453a5d7389eb2a92bacef46c13b436a0316
Parents: 4de85c8
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Feb 23 16:10:38 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Feb 23 16:10:38 2014 +0000

----------------------------------------------------------------------
 src/test/storm/kafka/KafkaUtilsTest.java | 31 ++++++++++++++++-----------
 1 file changed, 18 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/bd0cc453/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index db270c2..0763042 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -28,13 +28,14 @@ public class KafkaUtilsTest {
     private KafkaTestBroker broker;
     private SimpleConsumer simpleConsumer;
     private KafkaConfig config;
+    private BrokerHosts brokerHosts;
 
     @Before
     public void setup() {
         broker = new KafkaTestBroker();
         GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
         globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
-        BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
+        brokerHosts = new StaticHosts(globalPartitionInformation);
         config = new KafkaConfig(brokerHosts, "testTopic");
         simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
     }
@@ -60,19 +61,31 @@ public class KafkaUtilsTest {
 
     @Test
     public void fetchMessage() throws Exception {
-        long lastOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
-        sendMessageAndAssertValueForOffset(lastOffset);
+        String value = "test";
+        createTopicAndSendMessage(value);
+        long offset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;
+        ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer,
+                new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
+        String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+        assertThat(message, is(equalTo(value)));
     }
 
     @Test(expected = FailedFetchException.class)
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingDisabled() throws Exception {
         config.useStartOffsetTimeIfOffsetOutOfRange = false;
-        sendMessageAndAssertValueForOffset(-99);
+        KafkaUtils.fetchMessages(config, simpleConsumer,
+                new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
     }
 
     @Test
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception {
-        sendMessageAndAssertValueForOffset(-99);
+        config = new KafkaConfig(brokerHosts, "newTopic");
+        String value = "test";
+        createTopicAndSendMessage(value);
+        ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer,
+                new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
+        String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
+        assertThat(message, is(equalTo(value)));
     }
 
     @Test
@@ -165,12 +178,4 @@ public class KafkaUtilsTest {
         Producer<String, String> producer = new Producer<String, String>(producerConfig);
         producer.send(new KeyedMessage<String, String>(config.topic, key, value));
     }
-
-    private void sendMessageAndAssertValueForOffset(long offset) {
-        String value = "test";
-        createTopicAndSendMessage(value);
-        ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), offset);
-        String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
-        assertThat(message, is(equalTo(value)));
-    }
 }