You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/06/10 15:04:30 UTC
[02/10] storm git commit: STORM-1705: Update storm-kafka readme
STORM-1705: Update storm-kafka readme
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5c706afc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5c706afc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5c706afc
Branch: refs/heads/1.x-branch
Commit: 5c706afcb0922b891f1eda0f5483c89272491d12
Parents: 0f1aec5
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Thu Jun 9 18:36:49 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Thu Jun 9 18:36:49 2016 +0530
----------------------------------------------------------------------
external/storm-kafka/README.md | 55 ++++++++++++++++++--
.../ExponentialBackoffMsgRetryManager.java | 3 +-
.../storm/kafka/FailedMsgRetryManager.java | 3 +-
.../apache/storm/kafka/PartitionManager.java | 3 +-
.../jvm/org/apache/storm/kafka/SpoutConfig.java | 10 ++--
.../ExponentialBackoffMsgRetryManagerTest.java | 2 +-
6 files changed, 64 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 5a34b55..91a6553 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -63,13 +63,19 @@ In addition to these parameters, SpoutConfig contains the following fields that
// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
- // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
- // resubmitting the message while still retrying.
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
+ // Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
+
+ // Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
+ // Failed message will be retried infinitely if retryLimit is less than zero.
+ public int retryLimit = -1;
```
Core KafkaSpout only accepts an instance of SpoutConfig.
@@ -113,6 +119,47 @@ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message,
This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
+###Failed message retry
+FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
+between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
+of implementation. Here is the interface
+
+```java
+ // Spout initialization can go here. This can be called multiple times during lifecycle of a worker.
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
+
+ // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
+ void failed(Long offset);
+
+ // Message corresponding to offset has been acked.
+ void acked(Long offset);
+
+ // Message corresponding to the offset, has been re-emitted and under transit.
+ void retryStarted(Long offset);
+
+ /**
+ * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
+ * and resend them, except completed messages.
+ */
+ Long nextFailedMessageToRetry();
+
+ /**
+ * @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
+ */
+ boolean shouldReEmitMsg(Long offset);
+
+ /**
+ * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
+ * spout will called failed(offset) in next call and acked(offset) otherwise
+ */
+ boolean retryFurther(Long offset);
+
+ /**
+ * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
+ */
+ Set<Long> clearOffsetsBefore(Long kafkaOffset);
+```
+
#### Version incompatibility
In Storm versions prior to 1.0, the MultiScheme methods accepted a `byte[]` instead of `ByteBuffer`. The `MultScheme` and the related
Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
index 7b5f5dd..b2cfaf0 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -39,7 +39,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
}
- public void prepare(SpoutConfig spoutConfig) {
+ public void prepare(SpoutConfig spoutConfig, Map stormConf) {
this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
@@ -105,6 +105,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager
public boolean retryFurther(Long offset) {
MessageRetryRecord record = this.records.get(offset);
return ! (record != null &&
+ this.retryLimit > 0 &&
this.waiting.contains(record) &&
this.retryLimit <= record.retryNum);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
index 9a3b19f..5e2cc5f 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -18,6 +18,7 @@
package org.apache.storm.kafka;
import java.io.Serializable;
+import java.util.Map;
import java.util.Set;
public interface FailedMsgRetryManager extends Serializable {
@@ -25,7 +26,7 @@ public interface FailedMsgRetryManager extends Serializable {
/**
* Initialization
*/
- void prepare(SpoutConfig spoutConfig);
+ void prepare(SpoutConfig spoutConfig, Map stormConf);
/**
* Message corresponding to the offset failed in kafka spout.
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index d6e51b7..b65f053 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -77,7 +77,7 @@ public class PartitionManager {
try {
_failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
- _failedMsgRetryManager.prepare(spoutConfig);
+ _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
FailedMsgRetryManager.class,
@@ -284,6 +284,7 @@ public class PartitionManager {
this._failedMsgRetryManager.failed(offset);
} else {
// state for the offset should be cleaned up
+ LOG.warn("Will not retry failed kafka offset {} further", offset);
_messageIneligibleForRetryCount.incr();
_pending.remove(offset);
this._failedMsgRetryManager.acked(offset);
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
index 415ce0a..aa93c24 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
@@ -32,13 +32,15 @@ public class SpoutConfig extends KafkaConfig implements Serializable {
// setting for how often to save the current kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
- // Exponential back-off retry settings. These are used when retrying messages after a bolt
- // calls OutputCollector.fail().
+ // Retry strategy for failed messages
+ public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+
+ // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
+ // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
- public int retryLimit = Integer.MAX_VALUE;
- public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
+ public int retryLimit = -1;
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
super(hosts, topic);
http://git-wip-us.apache.org/repos/asf/storm/blob/5c706afc/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
index f2815e2..2f53612 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -260,7 +260,7 @@ public class ExponentialBackoffMsgRetryManagerTest {
spoutConfig.retryDelayMaxMs = retryDelayMaxMs;
spoutConfig.retryLimit = retryLimit;
ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager();
- exponentialBackoffMsgRetryManager.prepare(spoutConfig);
+ exponentialBackoffMsgRetryManager.prepare(spoutConfig, null);
return exponentialBackoffMsgRetryManager;
}
}