You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/01/08 21:14:36 UTC

[jira] [Commented] (STORM-495) Add delayed retries to KafkaSpout

    [ https://issues.apache.org/jira/browse/STORM-495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14269980#comment-14269980 ] 

ASF GitHub Bot commented on STORM-495:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/254#discussion_r22678017
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java ---
    @@ -0,0 +1,154 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.kafka;
    +
    +import java.util.PriorityQueue;
    +import java.util.Queue;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +
    +public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
    +
    +    private final long retryInitialDelayMs;
    +    private final double retryDelayMultiplier;
    +    private final long retryDelayMaxMs;
    +
    +    private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>();
    +    private SortedMap<Long,MessageRetryRecord> records = new TreeMap<Long,MessageRetryRecord>();
    +
    +    public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
    +        this.retryInitialDelayMs = retryInitialDelayMs;
    +        this.retryDelayMultiplier = retryDelayMultiplier;
    +        this.retryDelayMaxMs = retryDelayMaxMs;
    +    }
    +
    +    @Override
    +    public void failed(Long offset) {
    +        MessageRetryRecord oldRecord = this.records.get(offset);
    +        MessageRetryRecord newRecord = oldRecord == null ?
    +                                       new MessageRetryRecord(offset) :
    +                                       oldRecord.createNextRetryRecord();
    +        this.records.put(offset, newRecord);
    +        this.waiting.add(newRecord);
    +    }
    +
    +    @Override
    +    public void acked(Long offset) {
    +        MessageRetryRecord record = this.records.remove(offset);
    +        if (record != null) {
    +            this.waiting.remove(record);
    +        }
    +    }
    +
    +    @Override
    +    public void retryStarted(Long offset) {
    +        MessageRetryRecord record = this.records.get(offset);
    +        if (record == null || !this.waiting.contains(record)) {
    +            throw new IllegalStateException("cannot retry a message that has not failed");
    +        } else {
    +            this.waiting.remove(record);
    +        }
    +    }
    +
    +    @Override
    +    public Long nextFailedMessageToRetry() {
    +        if (this.waiting.size() > 0) {
    +            MessageRetryRecord first = this.waiting.peek();
    +            if (System.currentTimeMillis() >= first.retryTimeUTC) {
    +                if (this.records.containsKey(first.offset)) {
    +                    return first.offset;
    +                } else {
    +                    // defensive programming - should be impossible
    +                    this.waiting.remove(first);
    +                    return nextFailedMessageToRetry();
    +                }
    +            }
    +        }
    +        return null;
    +    }
    +
    +    @Override
    +    public boolean shouldRetryMsg(Long offset) {
    +        MessageRetryRecord record = this.records.get(offset);
    +        return record != null &&
    +                this.waiting.contains(record) &&
    +                System.currentTimeMillis() >= record.retryTimeUTC;
    +    }
    +
    +    /**
    +     * A MessageRetryRecord holds the data of how many times a message has
    +     * failed and been retried, and when the last failure occurred.  It can
    +     * determine whether it is ready to be retried by employing an exponential
    +     * back-off calculation using config values stored in SpoutConfig:
    +     * <ul>
    +     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
    +     *  <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li>
    +     *  <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will
    +     *                        delay for this amount of time every time)
    +     *  </li>
    +     * </ul>
    +     */
    +    class MessageRetryRecord implements Comparable<MessageRetryRecord> {
    +        private final long offset;
    +        private final int retryNum;
    +        private final long retryTimeUTC;
    +
    +        public MessageRetryRecord(long offset) {
    +            this(offset, 1);
    +        }
    +
    +        private MessageRetryRecord(long offset, int retryNum) {
    +            this.offset = offset;
    +            this.retryNum = retryNum;
    +            this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
    +        }
    +
    +        /**
    +         * Create a MessageRetryRecord for the next retry that should occur after this one.
    +         * @return MessageRetryRecord with the next retry time, or null to indicate that another
    +         *         retry should not be performed.  The latter case can happen if we are about to
    +         *         run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
    +         *         configuration.
    +         */
    +        public MessageRetryRecord createNextRetryRecord() {
    +            return new MessageRetryRecord(this.offset, this.retryNum + 1);
    +        }
    +
    +        private long calculateRetryDelay() {
    +            double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
    --- End diff --
    
    We have had issues with other exponential backoff code in the past where the retry count would overflow the number of bits which would result in a negative delay.  This code looks like it is also susceptible to that.  Because the math is done in longs and doubles I think it is OK for just about all use cases, but it would be nice to either fix it, or file a JIRA to follow up on it in the future.


> Add delayed retries to KafkaSpout
> ---------------------------------
>
>                 Key: STORM-495
>                 URL: https://issues.apache.org/jira/browse/STORM-495
>             Project: Apache Storm
>          Issue Type: Improvement
>    Affects Versions: 0.9.3
>         Environment: all environments
>            Reporter: Rick Kilgore
>            Priority: Minor
>              Labels: kafka, retry
>
> If a tuple in the topology originates from the KafkaSpout from the external/storm-kafka sources, and if a bolt in the topology indicates a failure by calling fail() on its OutputCollector, the KafkaSpout will immediately retry the message.
> We wish to use this failure and retry behavior in our ingestion system whenever we experience a recoverable error from a downstream system, such as a 500 or 503 error from a service we depend on.  But with the current KafkaSpout behavior, doing so results in a tight loop where we retry several times over a few seconds and then give up.  I want to be able to delay retry to give the downstream service some time to recover.  Ideally, I would like to have configurable, exponential backoff retry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)