You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:38 UTC
[39/50] [abbrv] git commit: removed duplicated code
removed duplicated code
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c8c04a6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c8c04a6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c8c04a6a
Branch: refs/heads/master
Commit: c8c04a6a6d9575f700d4c3db35927ddb5347a265
Parents: 9370c5c
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Apr 5 14:03:49 2014 +0100
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Apr 5 14:03:49 2014 +0100
----------------------------------------------------------------------
src/jvm/storm/kafka/PartitionManager.java | 17 ++++++-----------
1 file changed, 6 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c8c04a6a/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 915f0f9..03075bb 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -174,25 +174,20 @@ public class PartitionManager {
}
public void commit() {
- long committedTo;
- if (_pending.isEmpty()) {
- committedTo = _emittedToOffset;
- } else {
- committedTo = _pending.first();
- }
- if (committedTo != _committedTo) {
- LOG.info("Writing committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+ long lastCompletedOffset = lastCompletedOffset();
+ if (lastCompletedOffset != lastCommittedOffset()) {
+ LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
Map<Object, Object> data = ImmutableMap.builder()
.put("topology", ImmutableMap.of("id", _topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
- .put("offset", committedTo)
+ .put("offset", lastCompletedOffset)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host", _partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
- _committedTo = committedTo;
- LOG.info("Wrote committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+ _committedTo = lastCompletedOffset;
+ LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
} else {
LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
}