You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:12 UTC
[13/50] [abbrv] git commit: calculate start offset for new topology
consistently
calculate start offset for new topology consistently
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f7890915
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f7890915
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f7890915
Branch: refs/heads/master
Commit: f789091534be95c103890be6539bbfc5faf69b37
Parents: 5b764cd
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Jan 18 15:51:44 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Jan 18 15:56:19 2014 +0000
----------------------------------------------------------------------
src/test/storm/kafka/KafkaUtilsTest.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f7890915/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/KafkaUtilsTest.java b/src/test/storm/kafka/KafkaUtilsTest.java
index 506789c..20a4221 100644
--- a/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/src/test/storm/kafka/KafkaUtilsTest.java
@@ -70,6 +70,26 @@ public class KafkaUtilsTest {
sendMessageAndAssertValueForOffset(-99);
}
+ @Test
+ public void getOffsetFromConfigAndDontForceFromStart() {
+ config.forceFromStart = false;
+ config.startOffsetTime = OffsetRequest.EarliestTime();
+ createTopicAndSendMessage();
+ long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime());
+ long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
+ assertThat(latestOffset, is(equalTo(offsetFromConfig)));
+ }
+
+ @Test
+ public void getOffsetFromConfigAndFroceFromStart() {
+ config.forceFromStart = true;
+ config.startOffsetTime = OffsetRequest.EarliestTime();
+ createTopicAndSendMessage();
+ long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
+ long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
+ assertThat(earliestOffset, is(equalTo(offsetFromConfig)));
+ }
+
private String createTopicAndSendMessage() {
Properties p = new Properties();
p.setProperty("metadata.broker.list", "localhost:49123");