You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by xi...@apache.org on 2017/04/11 10:19:41 UTC

[1/2] storm git commit: STORM-2458: Kafka Spout should allow hooking up custom logic when retries have exhanusted

Repository: storm
Updated Branches:
  refs/heads/master f49bbf9bf -> de753cba7


STORM-2458: Kafka Spout should allow hooking up custom logic when retries have exhanusted


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c5dba20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c5dba20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c5dba20

Branch: refs/heads/master
Commit: 8c5dba2065cc0fbefbbf022ecf178339c4021ce9
Parents: c652d3f
Author: Sourav Mitra <so...@gmail.com>
Authored: Thu Apr 6 17:23:35 2017 +0530
Committer: Sourav Mitra <so...@gmail.com>
Committed: Thu Apr 6 17:23:35 2017 +0530

----------------------------------------------------------------------
 docs/storm-kafka.md                                           | 7 +++++++
 external/storm-kafka/README.md                                | 7 +++++++
 .../apache/storm/kafka/ExponentialBackoffMsgRetryManager.java | 5 +++++
 .../src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java | 7 +++++++
 .../src/jvm/org/apache/storm/kafka/PartitionManager.java      | 1 +
 5 files changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/docs/storm-kafka.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md
index ea6056c..28f0e3c 100644
--- a/docs/storm-kafka.md
+++ b/docs/storm-kafka.md
@@ -155,6 +155,13 @@ of implementation. Here is the interface
      * spout will called failed(offset) in next call and acked(offset) otherwise 
      */
     boolean retryFurther(Long offset);
+    
+    /**
+     * Spout will call this method after retryFurther returns false.
+     * This gives a chance for hooking up custom logic before all clean up.
+     * @param partition,offset
+     */
+    void cleanOffsetAfterRetries(Partition partition, Long offset);
 
     /**
      * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.

http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 80f31c3..acc738a 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -152,6 +152,13 @@ of implementation. Here is the interface
      * spout will called failed(offset) in next call and acked(offset) otherwise 
      */
     boolean retryFurther(Long offset);
+    
+    /**
+     * Spout will call this method after retryFurther returns false.
+     * This gives a chance for hooking up custom logic before all clean up.
+     * @param partition,offset
+     */
+    void cleanOffsetAfterRetries(Partition partition, Long offset);
 
     /**
      * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.

http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index a37dbc9..90cf440 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -111,6 +111,11 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
     }
 
     @Override
+    public void cleanOffsetAfterRetries(Partition partition, Long offset) {
+        //Do nothing..
+    }
+
+    @Override
     public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
         Set<Long> invalidOffsets = new HashSet<Long>(); 
         for(Long offset : records.keySet()){

http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 5e2cc5f..c1fb96e 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -67,6 +67,13 @@ public interface FailedMsgRetryManager extends Serializable {
     boolean retryFurther(Long offset);
 
     /**
+     * Spout will call this method after retryFurther returns false.
+     * This gives a chance for hooking up custom logic before all clean up.
+     * @param partition,offset
+     */
+    void cleanOffsetAfterRetries(Partition partition, Long offset);
+
+    /**
      * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
      * @param kafkaOffset
      * @return Set of offsets removed.

http://git-wip-us.apache.org/repos/asf/storm/blob/8c5dba20/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index e4ce657..f761d21 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -323,6 +323,7 @@ public class PartitionManager {
                 // state for the offset should be cleaned up
                 LOG.warn("Will not retry failed kafka offset {} further", offset);
                 _messageIneligibleForRetryCount.incr();
+                this._failedMsgRetryManager.cleanOffsetAfterRetries(_partition, offset);
                 _pending.remove(offset);
                 this._failedMsgRetryManager.acked(offset);
             }


[2/2] storm git commit: Merge branch 'AddedCustomLogicAfterRetriesFinish' of https://github.com/souravmitra/storm

Posted by xi...@apache.org.
Merge branch 'AddedCustomLogicAfterRetriesFinish' of https://github.com/souravmitra/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/de753cba
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/de753cba
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/de753cba

Branch: refs/heads/master
Commit: de753cba7d64aa89e2906cf1825e8ce40266b7c1
Parents: f49bbf9 8c5dba2
Author: vesense <be...@163.com>
Authored: Tue Apr 11 18:19:05 2017 +0800
Committer: vesense <be...@163.com>
Committed: Tue Apr 11 18:19:05 2017 +0800

----------------------------------------------------------------------
 docs/storm-kafka.md                                           | 7 +++++++
 external/storm-kafka/README.md                                | 7 +++++++
 .../apache/storm/kafka/ExponentialBackoffMsgRetryManager.java | 5 +++++
 .../src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java | 7 +++++++
 .../src/jvm/org/apache/storm/kafka/PartitionManager.java      | 1 +
 5 files changed, 27 insertions(+)
----------------------------------------------------------------------