You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/11 10:19:41 UTC
[1/2] storm git commit: STORM-2458: Kafka Spout should allow hooking
up custom logic when retries have exhanusted
Repository: storm
Updated Branches:
refs/heads/master f49bbf9bf -> de753cba7
STORM-2458: Kafka Spout should allow hooking up custom logic when retries have exhanusted
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c5dba20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c5dba20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c5dba20
Branch: refs/heads/master
Commit: 8c5dba2065cc0fbefbbf022ecf178339c4021ce9
Parents: c652d3f
Author: Sourav Mitra <so...@gmail.com>
Authored: Thu Apr 6 17:23:35 2017 +0530
Committer: Sourav Mitra <so...@gmail.com>
Committed: Thu Apr 6 17:23:35 2017 +0530
----------------------------------------------------------------------
docs/storm-kafka.md | 7 +++++++
external/storm-kafka/README.md | 7 +++++++
.../apache/storm/kafka/ExponentialBackoffMsgRetryManager.java | 5 +++++
.../src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java | 7 +++++++
.../src/jvm/org/apache/storm/kafka/PartitionManager.java | 1 +
5 files changed, 27 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
index ea6056c..28f0e3c 100644
--- a/docs/storm-kafka.md
+++ b/docs/storm-kafka.md
@@ -155,6 +155,13 @@ of implementation. Here is the interface
* spout will called failed(offset) in next call and acked(offset) otherwise
*/
boolean retryFurther(Long offset);
+
+ /**
+ * Spout will call this method after retryFurther returns false.
+ * This gives a chance for hooking up custom logic before all clean up.
+ * @param partition,offset
+ */
+ void cleanOffsetAfterRetries(Partition partition, Long offset);
/**
* Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 80f31c3..acc738a 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -152,6 +152,13 @@ of implementation. Here is the interface
* spout will called failed(offset) in next call and acked(offset) otherwise
*/
boolean retryFurther(Long offset);
+
+ /**
+ * Spout will call this method after retryFurther returns false.
+ * This gives a chance for hooking up custom logic before all clean up.
+ * @param partition,offset
+ */
+ void cleanOffsetAfterRetries(Partition partition, Long offset);
/**
* Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index a37dbc9..90cf440 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -111,6 +111,11 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
@Override
+ public void cleanOffsetAfterRetries(Partition partition, Long offset) {
+ //Do nothing..
+ }
+
+ @Override
public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
Set<Long> invalidOffsets = new HashSet<Long>();
for(Long offset : records.keySet()){
http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 5e2cc5f..c1fb96e 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -67,6 +67,13 @@ public interface FailedMsgRetryManager extends Serializable {
boolean retryFurther(Long offset);
/**
+ * Spout will call this method after retryFurther returns false.
+ * This gives a chance for hooking up custom logic before all clean up.
+ * @param partition,offset
+ */
+ void cleanOffsetAfterRetries(Partition partition, Long offset);
+
+ /**
* Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
* @param kafkaOffset
* @return Set of offsets removed.
http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index e4ce657..f761d21 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -323,6 +323,7 @@ public class PartitionManager {
// state for the offset should be cleaned up
LOG.warn("Will not retry failed kafka offset {} further", offset);
_messageIneligibleForRetryCount.incr();
+ this._failedMsgRetryManager.cleanOffsetAfterRetries(_partition, offset);
_pending.remove(offset);
this._failedMsgRetryManager.acked(offset);
}
[2/2] storm git commit: Merge branch
'AddedCustomLogicAfterRetriesFinish' of https://github.com/souravmitra/storm
Posted by xi...@apache.org.
Merge branch 'AddedCustomLogicAfterRetriesFinish' of https://github.com/souravmitra/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de753cba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de753cba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de753cba
Branch: refs/heads/master
Commit: de753cba7d64aa89e2906cf1825e8ce40266b7c1
Parents: f49bbf9 8c5dba2
Author: vesense <be...@163.com>
Authored: Tue Apr 11 18:19:05 2017 +0800
Committer: vesense <be...@163.com>
Committed: Tue Apr 11 18:19:05 2017 +0800
----------------------------------------------------------------------
docs/storm-kafka.md | 7 +++++++
external/storm-kafka/README.md | 7 +++++++
.../apache/storm/kafka/ExponentialBackoffMsgRetryManager.java | 5 +++++
.../src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java | 7 +++++++
.../src/jvm/org/apache/storm/kafka/PartitionManager.java | 1 +
5 files changed, 27 insertions(+)
----------------------------------------------------------------------