You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/06 08:30:06 UTC

[rocketmq] branch 5.0.0-beta updated: [ISSUE #4384] Add RetryPolicy interface

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 42e5d734e [ISSUE #4384] Add RetryPolicy interface
42e5d734e is described below

commit 42e5d734eec8ed662fb6c9b06ff1faf615559653
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue May 31 19:27:39 2022 +0800

    [ISSUE #4384] Add RetryPolicy interface
---
 .../common/subscription/CustomizedRetryPolicy.java | 39 ++++++++++++++---
 .../subscription/ExponentialRetryPolicy.java       | 25 ++++++++---
 .../common/subscription/GroupRetryPolicy.java      | 33 ++++++++++++---
 .../rocketmq/common/subscription/RetryPolicy.java  | 31 ++++++++++++++
 .../subscription/CustomizedRetryPolicyTest.java    | 44 +++++++++++++++++++
 .../subscription/ExponentialRetryPolicyTest.java   | 44 +++++++++++++++++++
 .../common/subscription/GroupRetryPolicyTest.java  | 49 ++++++++++++++++++++++
 7 files changed, 247 insertions(+), 18 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java
index 029d08279..7ea663525 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java
@@ -17,12 +17,17 @@
 
 package org.apache.rocketmq.common.subscription;
 
-import java.util.Arrays;
+import com.google.common.base.MoreObjects;
 import java.util.concurrent.TimeUnit;
 
-public class CustomizedRetryPolicy {
+/**
+ * CustomizedRetryPolicy is aim to make group's behavior compatible with messageDelayLevel
+ *
+ * @see <a href="https://github.com/apache/rocketmq/blob/3bd4b2b2f61a824196f19b03146e2c929c62777b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java#L137">org.apache.rocketmq.store.config.MessageStoreConfig</a>
+ */
+public class CustomizedRetryPolicy implements RetryPolicy {
     // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
-    private long[] next = new long[]{
+    private long[] next = new long[] {
         TimeUnit.SECONDS.toMillis(1),
         TimeUnit.SECONDS.toMillis(5),
         TimeUnit.SECONDS.toMillis(10),
@@ -53,8 +58,30 @@ public class CustomizedRetryPolicy {
 
     @Override
     public String toString() {
-        return "CustomizedRetryPolicy{" +
-            "next=" + Arrays.toString(next) +
-            '}';
+        return MoreObjects.toStringHelper(this)
+            .add("next", next)
+            .toString();
+    }
+
+    /**
+     * Index = reconsumeTimes + 2 is compatible logic, cause old delayLevelTable starts from index 1,
+     * and old index is reconsumeTime + 3
+     *
+     * @param reconsumeTimes Message reconsumeTimes {@link org.apache.rocketmq.common.message.MessageExt#getReconsumeTimes}
+     * @param timeUnit       {@link TimeUnit}
+     * @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java#L210">org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor</a>
+     * @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L242">org.apache.rocketmq.store.DefaultMessageStore</a>
+     */
+    @Override
+    public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
+        if (reconsumeTimes < 0) {
+            reconsumeTimes = 0;
+        }
+        int index = reconsumeTimes + 2;
+        if (index >= next.length) {
+            index = next.length - 1;
+        }
+        long nextDelayDurationInMillis = next[index];
+        return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java
index 2dd2765fb..ff16b027a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java
@@ -17,12 +17,13 @@
 
 package org.apache.rocketmq.common.subscription;
 
+import com.google.common.base.MoreObjects;
 import java.util.concurrent.TimeUnit;
 
 /**
  * next delay time = min(max, initial * multiplier^reconsumeTimes)
  */
-public class ExponentialRetryPolicy {
+public class ExponentialRetryPolicy implements RetryPolicy {
     private long initial = TimeUnit.SECONDS.toMillis(5);
     private long max = TimeUnit.HOURS.toMillis(2);
     private long multiplier = 2;
@@ -53,10 +54,22 @@ public class ExponentialRetryPolicy {
 
     @Override
     public String toString() {
-        return "ExponentialRetryPolicy{" +
-            "initial=" + initial +
-            ", max=" + max +
-            ", multiplier=" + multiplier +
-            '}';
+        return MoreObjects.toStringHelper(this)
+            .add("initial", initial)
+            .add("max", max)
+            .add("multiplier", multiplier)
+            .toString();
+    }
+
+    @Override
+    public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
+        if (reconsumeTimes < 0) {
+            reconsumeTimes = 0;
+        }
+        if (reconsumeTimes > 32) {
+            reconsumeTimes = 32;
+        }
+        long nextDelayDurationInMillis = Math.min(max, initial * (long) Math.pow(multiplier, reconsumeTimes));
+        return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java
index 6cb7ca7dd..d3025235b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java
@@ -17,8 +17,12 @@
 
 package org.apache.rocketmq.common.subscription;
 
+import com.alibaba.fastjson.annotation.JSONField;
+import com.google.common.base.MoreObjects;
+
 public class GroupRetryPolicy {
-    private GroupRetryPolicyType type = GroupRetryPolicyType.EXPONENTIAL;
+    private final static RetryPolicy DEFAULT_RETRY_POLICY = new CustomizedRetryPolicy();
+    private GroupRetryPolicyType type = GroupRetryPolicyType.CUSTOMIZED;
     private ExponentialRetryPolicy exponentialRetryPolicy;
     private CustomizedRetryPolicy customizedRetryPolicy;
 
@@ -46,12 +50,29 @@ public class GroupRetryPolicy {
         this.customizedRetryPolicy = customizedRetryPolicy;
     }
 
+    @JSONField(serialize = false, deserialize = false)
+    public RetryPolicy getRetryPolicy() {
+        if (GroupRetryPolicyType.EXPONENTIAL.equals(type)) {
+            if (exponentialRetryPolicy == null) {
+                return DEFAULT_RETRY_POLICY;
+            }
+            return exponentialRetryPolicy;
+        } else if (GroupRetryPolicyType.CUSTOMIZED.equals(type)) {
+            if (customizedRetryPolicy == null) {
+                return DEFAULT_RETRY_POLICY;
+            }
+            return customizedRetryPolicy;
+        } else {
+            return DEFAULT_RETRY_POLICY;
+        }
+    }
+
     @Override
     public String toString() {
-        return "GroupRetryPolicy{" +
-            "type=" + type +
-            ", exponentialRetryPolicy=" + exponentialRetryPolicy +
-            ", customizedRetryPolicy=" + customizedRetryPolicy +
-            '}';
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("exponentialRetryPolicy", exponentialRetryPolicy)
+            .add("customizedRetryPolicy", customizedRetryPolicy)
+            .toString();
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/RetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/RetryPolicy.java
new file mode 100644
index 000000000..2af570da2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/RetryPolicy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import java.util.concurrent.TimeUnit;
+
+public interface RetryPolicy {
+    /**
+     * Compute message's next delay duration by specify reconsumeTimes
+     *
+     * @param reconsumeTimes Message reconsumeTimes
+     * @param timeUnit       Given timeUnit
+     * @return Message's nextDelayDuration in given timeUnit
+     */
+    long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit);
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicyTest.java b/common/src/test/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicyTest.java
new file mode 100644
index 000000000..67f2c6251
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CustomizedRetryPolicyTest {
+
+    @Test
+    public void testNextDelayDuration() {
+        CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
+        long actual = customizedRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
+        actual = customizedRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.MINUTES.toMillis(9));
+    }
+
+    @Test
+    public void testNextDelayDurationOutOfRange() {
+        CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
+        long actual = customizedRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
+        actual = customizedRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
+    }
+}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicyTest.java b/common/src/test/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicyTest.java
new file mode 100644
index 000000000..abce11204
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ExponentialRetryPolicyTest {
+
+    @Test
+    public void testNextDelayDuration() {
+        ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
+        long actual = exponentialRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
+        actual = exponentialRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(1024 * 5));
+    }
+
+    @Test
+    public void testNextDelayDurationOutOfRange() {
+        ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
+        long actual = exponentialRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
+        actual = exponentialRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
+        assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
+    }
+}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/subscription/GroupRetryPolicyTest.java b/common/src/test/java/org/apache/rocketmq/common/subscription/GroupRetryPolicyTest.java
new file mode 100644
index 000000000..0a81c5ca8
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/subscription/GroupRetryPolicyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class GroupRetryPolicyTest {
+
+    @Test
+    public void testGetRetryPolicy() {
+        GroupRetryPolicy groupRetryPolicy = new GroupRetryPolicy();
+        RetryPolicy retryPolicy = groupRetryPolicy.getRetryPolicy();
+        assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+        groupRetryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+        retryPolicy = groupRetryPolicy.getRetryPolicy();
+        assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+
+        groupRetryPolicy.setType(GroupRetryPolicyType.CUSTOMIZED);
+        groupRetryPolicy.setCustomizedRetryPolicy(new CustomizedRetryPolicy());
+        retryPolicy = groupRetryPolicy.getRetryPolicy();
+        assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+
+        groupRetryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+        groupRetryPolicy.setExponentialRetryPolicy(new ExponentialRetryPolicy());
+        retryPolicy = groupRetryPolicy.getRetryPolicy();
+        assertThat(retryPolicy).isInstanceOf(ExponentialRetryPolicy.class);
+
+        groupRetryPolicy.setType(null);
+        retryPolicy = groupRetryPolicy.getRetryPolicy();
+        assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+    }
+}
\ No newline at end of file