You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:11 UTC
[12/50] [abbrv] git commit: calculate start offset for new topology
consistently
calculate start offset for new topology consistently
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/5b764cd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/5b764cd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/5b764cd9
Branch: refs/heads/master
Commit: 5b764cd9138dd93e7382e3472b9d3d33d4b286a3
Parents: 80005ba
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Jan 18 15:25:36 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Jan 18 15:25:36 2014 +0000
----------------------------------------------------------------------
src/jvm/storm/kafka/KafkaUtils.java | 9 +++++++++
src/jvm/storm/kafka/PartitionManager.java | 10 +++++-----
src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 6 +-----
3 files changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b764cd9/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index 5094f14..d86519c 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -36,6 +36,15 @@ public class KafkaUtils {
}
}
+
+ public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
+ long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
+ if ( config.forceFromStart ) {
+ startOffsetTime = config.startOffsetTime;
+ }
+ return getOffset(consumer, topic, partition, startOffsetTime);
+ }
+
public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b764cd9/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 0861c25..f12c0d9 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -73,12 +73,12 @@ public class PartitionManager {
LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}
- if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
+ if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+ _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
+ LOG.info("No partition information found, using configuration to determine offset");
+ } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
- LOG.info("Using startOffsetTime to choose last commit offset.");
- } else if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
- _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, kafka.api.OffsetRequest.LatestTime());
- LOG.info("Setting last commit offset to HEAD.");
+ LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/5b764cd9/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index eceba47..fbbbd4b 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -96,11 +96,7 @@ public class TridentKafkaEmitter {
offset = (Long) lastMeta.get("nextOffset");
}
} else {
- long startTime = kafka.api.OffsetRequest.LatestTime();
- if (_config.forceFromStart) {
- startTime = _config.startOffsetTime;
- }
- offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
+ offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
}
ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
long endoffset = offset;