You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:12 UTC

[13/50] [abbrv] git commit: calculate start offset for new topology consistently

calculate start offset for new topology consistently


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f7890915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f7890915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f7890915

Branch: refs/heads/master
Commit: f789091534be95c103890be6539bbfc5faf69b37
Parents: 5b764cd
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Jan 18 15:51:44 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Jan 18 15:56:19 2014 +0000

----------------------------------------------------------------------
 src/test/storm/kafka/KafkaUtilsTest.java | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f7890915/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index 506789c..20a4221 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -70,6 +70,26 @@ public class KafkaUtilsTest {
         sendMessageAndAssertValueForOffset(-99);
     }
 
+    @Test
+    public void getOffsetFromConfigAndDontForceFromStart() {
+        config.forceFromStart = false;
+        config.startOffsetTime = OffsetRequest.EarliestTime();
+        createTopicAndSendMessage();
+        long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime());
+        long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
+        assertThat(latestOffset, is(equalTo(offsetFromConfig)));
+    }
+
+    @Test
+    public void getOffsetFromConfigAndFroceFromStart() {
+        config.forceFromStart = true;
+        config.startOffsetTime = OffsetRequest.EarliestTime();
+        createTopicAndSendMessage();
+        long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
+        long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
+        assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
+    }
+
     private String createTopicAndSendMessage() {
         Properties p = new Properties();
         p.setProperty("metadata.broker.list", "localhost:49123");