You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 10:41:52 UTC

[3/5] storm git commit: STORM-2515: Other minor changes

STORM-2515: Other minor changes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d5a5700
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d5a5700
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d5a5700

Branch: refs/heads/master
Commit: 9d5a57004518b0f0aae3809b2fac13d51c75f928
Parents: 4d8f5d6
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon May 15 22:03:24 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 19:30:50 2017 +0900

----------------------------------------------------------------------
 .../java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java  | 4 ++--
 .../org/apache/storm/kafka/spout/internal/OffsetManager.java    | 5 +----
 2 files changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9d5a5700/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index e2ab5b7..dd6411a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -96,9 +96,9 @@ public class KafkaSpoutMessageId {
         return "{"
                 + "topic-partition=" + topicPart
                 + ", offset=" + offset
-                + "topic-partition=" + topicPart + ", numFails=" + numFails
+                + ", numFails=" + numFails
                 + ", thread='" + currThread.getName() + "'"
-                + "topic-partition=" + topicPart + '}';
+                + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/9d5a5700/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index a1a8d9d..be0f551 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -74,7 +74,6 @@ public class OffsetManager {
      *     ready to commit.
      */
     public OffsetAndMetadata findNextCommitOffset() {
-        boolean found = false;
         long currOffset;
         long nextCommitOffset = committedOffset;
         KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
@@ -82,7 +81,6 @@ public class OffsetManager {
         for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
             currOffset = currAckedMsg.offset();
             if (currOffset == nextCommitOffset + 1) {            // found the next offset to commit
-                found = true;
                 nextCommitMsg = currAckedMsg;
                 nextCommitOffset = currOffset;
             } else if (currOffset > nextCommitOffset + 1) {
@@ -103,7 +101,6 @@ public class OffsetManager {
                         + " Committed: [{}], Processed: [{}]", committedOffset, currOffset);
                     final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
                     if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
-                        found = true;
                         nextCommitMsg = currAckedMsg;
                         nextCommitOffset = currOffset;
                     } else {
@@ -120,7 +117,7 @@ public class OffsetManager {
         }
 
         OffsetAndMetadata nextCommitOffsetAndMetadata = null;
-        if (found) {
+        if (nextCommitMsg != null) {
             nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
             LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",
                 tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());