You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/06/10 15:04:35 UTC

[07/10] storm git commit: STORM-1705: Update storm-kafka readme

STORM-1705: Update storm-kafka readme

Signed-off-by: P. Taylor Goetz <pt...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b23febc3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b23febc3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b23febc3

Branch: refs/heads/master
Commit: b23febc39c4ba3f6f0e7eecfa0969cd958dc969a
Parents: fd51eac
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Thu Jun 9 18:36:49 2016 +0530
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jun 10 10:36:15 2016 -0400

----------------------------------------------------------------------
 external/storm-kafka/README.md                  | 55 ++++++++++++++++++--
 .../ExponentialBackoffMsgRetryManager.java      |  3 +-
 .../storm/kafka/FailedMsgRetryManager.java      |  3 +-
 .../apache/storm/kafka/PartitionManager.java    |  3 +-
 .../jvm/org/apache/storm/kafka/SpoutConfig.java | 10 ++--
 .../ExponentialBackoffMsgRetryManagerTest.java  |  2 +-
 6 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 7fb2757..7db8675 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -62,13 +62,19 @@ In addition to these parameters, SpoutConfig contains the following fields that
     // setting for how often to save the current Kafka offset to ZooKeeper
     public long stateUpdateIntervalMs = 2000;
 
-    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
-    // calls OutputCollector.fail().
-    // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
-    // resubmitting the message while still retrying.
+    // Retry strategy for failed messages
+    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
+    // Initial delay between successive retries
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
+    
+    // Maximum delay between successive retries    
     public long retryDelayMaxMs = 60 * 1000;
+    // Failed message will be retried infinitely if retryLimit is less than zero. 
+    public int retryLimit = -1;     
 
 ```
 Core KafkaSpout only accepts an instance of SpoutConfig.
@@ -112,6 +118,47 @@ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message,
 
 This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
 
+###Failed message retry
+FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
+between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
+of implementation. Here is the interface 
+
+```java
+    // Spout initialization can go here. This can be called multiple times during lifecycle of a worker. 
+    void prepare(SpoutConfig spoutConfig, Map stormConf);
+
+    // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
+    void failed(Long offset);
+
+    // Message corresponding to offset has been acked.  
+    void acked(Long offset);
+
+    // Message corresponding to the offset, has been re-emitted and under transit.
+    void retryStarted(Long offset);
+
+    /**
+     * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+     * and resend them, except completed messages.
+     */
+    Long nextFailedMessageToRetry();
+
+    /**
+     * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+     */
+    boolean shouldReEmitMsg(Long offset);
+
+    /**
+     * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
+     * spout will called failed(offset) in next call and acked(offset) otherwise 
+     */
+    boolean retryFurther(Long offset);
+
+    /**
+     * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+     */
+    Set<Long> clearOffsetsBefore(Long kafkaOffset);
+``` 
+
 #### Version incompatibility
 In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related
 Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].

http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 7b5f5dd..b2cfaf0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -39,7 +39,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
 
     }
 
-    public void prepare(SpoutConfig spoutConfig) {
+    public void prepare(SpoutConfig spoutConfig, Map stormConf) {
         this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
         this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
         this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
@@ -105,6 +105,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
     public boolean retryFurther(Long offset) {
         MessageRetryRecord record = this.records.get(offset);
         return ! (record != null &&
+               this.retryLimit > 0 &&
                this.waiting.contains(record) &&
                this.retryLimit <= record.retryNum);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 9a3b19f..5e2cc5f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -18,6 +18,7 @@
 package org.apache.storm.kafka;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.Set;
 
 public interface FailedMsgRetryManager extends Serializable {
@@ -25,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable {
     /**
      * Initialization
      */
-    void prepare(SpoutConfig spoutConfig);
+    void prepare(SpoutConfig spoutConfig, Map stormConf);
 
     /**
      * Message corresponding to the offset failed in kafka spout.

http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index f42b8f2..c51f36e 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -78,7 +78,7 @@ public class PartitionManager {
 
         try {
             _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
-            _failedMsgRetryManager.prepare(spoutConfig);
+            _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
             throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
                                                              FailedMsgRetryManager.class,
@@ -288,6 +288,7 @@ public class PartitionManager {
                 this._failedMsgRetryManager.failed(offset);
             } else {
                 // state for the offset should be cleaned up
+                LOG.warn("Will not retry failed kafka offset {} further", offset);
                 _messageIneligibleForRetryCount.incr();
                 _pending.remove(offset);
                 this._failedMsgRetryManager.acked(offset);

http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 415ce0a..aa93c24 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -32,13 +32,15 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
     // setting for how often to save the current kafka offset to ZooKeeper
     public long stateUpdateIntervalMs = 2000;
 
-    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
-    // calls OutputCollector.fail().
+    // Retry strategy for failed messages
+    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
     public long retryDelayMaxMs = 60 * 1000;
-    public int retryLimit = Integer.MAX_VALUE;
-    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+    public int retryLimit = -1;
 
     public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
         super(hosts, topic);

http://git-wip-us.apache.org/repos/asf/storm/blob/b23febc3/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index f2815e2..2f53612 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -260,7 +260,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
         spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
         spoutConfig.retryLimit = retryLimit; 
         ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
-        exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+        exponentialBackoffMsgRetryManager.prepare(spoutConfig, null);
         return exponentialBackoffMsgRetryManager;
     }
 }