You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:11 UTC

[12/50] [abbrv] git commit: calculate start offset for new topology consistently

calculate start offset for new topology consistently


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5b764cd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5b764cd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5b764cd9

Branch: refs/heads/master
Commit: 5b764cd9138dd93e7382e3472b9d3d33d4b286a3
Parents: 80005ba
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Jan 18 15:25:36 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Jan 18 15:25:36 2014 +0000

----------------------------------------------------------------------
 src/jvm/storm/kafka/KafkaUtils.java                  |  9 +++++++++
 src/jvm/storm/kafka/PartitionManager.java            | 10 +++++-----
 src/jvm/storm/kafka/trident/TridentKafkaEmitter.java |  6 +-----
 3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b764cd9/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index 5094f14..d86519c 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -36,6 +36,15 @@ public class KafkaUtils {
         }
     }
 
+
+    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
+        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+        if ( config.forceFromStart ) {
+            startOffsetTime = config.startOffsetTime;
+        }
+        return getOffset(consumer, topic, partition, startOffsetTime);
+    }
+
     public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b764cd9/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 0861c25..f12c0d9 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -73,12 +73,12 @@ public class PartitionManager {
             LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
         }
 
-        if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
+        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
+            LOG.info("No partition information found, using configuration to determine offset");
+        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
-            LOG.info("Using startOffsetTime to choose last commit offset.");
-        } else if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
-            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, kafka.api.OffsetRequest.LatestTime());
-            LOG.info("Setting last commit offset to HEAD.");
+            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
         } else {
             _committedTo = jsonOffset;
             LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b764cd9/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index eceba47..fbbbd4b 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -96,11 +96,7 @@ public class TridentKafkaEmitter {
                 offset = (Long) lastMeta.get("nextOffset");
             }
         } else {
-            long startTime = kafka.api.OffsetRequest.LatestTime();
-            if (_config.forceFromStart) {
-                startTime = _config.startOffsetTime;
-            }
-            offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
+            offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
         }
         ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
         long endoffset = offset;