You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:38 UTC

[39/50] [abbrv] git commit: removed duplicated code

removed duplicated code


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c8c04a6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c8c04a6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c8c04a6a

Branch: refs/heads/master
Commit: c8c04a6a6d9575f700d4c3db35927ddb5347a265
Parents: 9370c5c
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sat Apr 5 14:03:49 2014 +0100
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sat Apr 5 14:03:49 2014 +0100

----------------------------------------------------------------------
 src/jvm/storm/kafka/PartitionManager.java | 17 ++++++-----------
 1 file changed, 6 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c8c04a6a/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 915f0f9..03075bb 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -174,25 +174,20 @@ public class PartitionManager {
     }
 
     public void commit() {
-        long committedTo;
-        if (_pending.isEmpty()) {
-            committedTo = _emittedToOffset;
-        } else {
-            committedTo = _pending.first();
-        }
-        if (committedTo != _committedTo) {
-            LOG.info("Writing committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+        long lastCompletedOffset = lastCompletedOffset();
+        if (lastCompletedOffset != lastCommittedOffset()) {
+            LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
             Map<Object, Object> data = ImmutableMap.builder()
                     .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                             "name", _stormConf.get(Config.TOPOLOGY_NAME)))
-                    .put("offset", committedTo)
+                    .put("offset", lastCompletedOffset)
                     .put("partition", _partition.partition)
                     .put("broker", ImmutableMap.of("host", _partition.host.host,
                             "port", _partition.host.port))
                     .put("topic", _spoutConfig.topic).build();
             _state.writeJSON(committedPath(), data);
-            _committedTo = committedTo;
-            LOG.info("Wrote committed offset (" + committedTo + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
+            _committedTo = lastCompletedOffset;
+            LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
         } else {
             LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
         }