You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 10:41:52 UTC
[3/5] storm git commit: STORM-2515: Other minor changes
STORM-2515: Other minor changes
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d5a5700
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d5a5700
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d5a5700
Branch: refs/heads/master
Commit: 9d5a57004518b0f0aae3809b2fac13d51c75f928
Parents: 4d8f5d6
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon May 15 22:03:24 2017 +0200
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 19:30:50 2017 +0900
----------------------------------------------------------------------
.../java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java | 4 ++--
.../org/apache/storm/kafka/spout/internal/OffsetManager.java | 5 +----
2 files changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9d5a5700/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index e2ab5b7..dd6411a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -96,9 +96,9 @@ public class KafkaSpoutMessageId {
return "{"
+ "topic-partition=" + topicPart
+ ", offset=" + offset
- + "topic-partition=" + topicPart + ", numFails=" + numFails
+ + ", numFails=" + numFails
+ ", thread='" + currThread.getName() + "'"
- + "topic-partition=" + topicPart + '}';
+ + '}';
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/9d5a5700/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index a1a8d9d..be0f551 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -74,7 +74,6 @@ public class OffsetManager {
* ready to commit.
*/
public OffsetAndMetadata findNextCommitOffset() {
- boolean found = false;
long currOffset;
long nextCommitOffset = committedOffset;
KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata
@@ -82,7 +81,6 @@ public class OffsetManager {
for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap
currOffset = currAckedMsg.offset();
if (currOffset == nextCommitOffset + 1) { // found the next offset to commit
- found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else if (currOffset > nextCommitOffset + 1) {
@@ -103,7 +101,6 @@ public class OffsetManager {
+ " Committed: [{}], Processed: [{}]", committedOffset, currOffset);
final Long nextEmittedOffset = emittedOffsets.ceiling(nextCommitOffset);
if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
- found = true;
nextCommitMsg = currAckedMsg;
nextCommitOffset = currOffset;
} else {
@@ -120,7 +117,7 @@ public class OffsetManager {
}
OffsetAndMetadata nextCommitOffsetAndMetadata = null;
- if (found) {
+ if (nextCommitMsg != null) {
nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",
tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());