You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/06 08:30:06 UTC
[rocketmq] branch 5.0.0-beta updated: [ISSUE #4384] Add RetryPolicy interface
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new 42e5d734e [ISSUE #4384] Add RetryPolicy interface
42e5d734e is described below
commit 42e5d734eec8ed662fb6c9b06ff1faf615559653
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue May 31 19:27:39 2022 +0800
[ISSUE #4384] Add RetryPolicy interface
---
.../common/subscription/CustomizedRetryPolicy.java | 39 ++++++++++++++---
.../subscription/ExponentialRetryPolicy.java | 25 ++++++++---
.../common/subscription/GroupRetryPolicy.java | 33 ++++++++++++---
.../rocketmq/common/subscription/RetryPolicy.java | 31 ++++++++++++++
.../subscription/CustomizedRetryPolicyTest.java | 44 +++++++++++++++++++
.../subscription/ExponentialRetryPolicyTest.java | 44 +++++++++++++++++++
.../common/subscription/GroupRetryPolicyTest.java | 49 ++++++++++++++++++++++
7 files changed, 247 insertions(+), 18 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java
index 029d08279..7ea663525 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicy.java
@@ -17,12 +17,17 @@
package org.apache.rocketmq.common.subscription;
-import java.util.Arrays;
+import com.google.common.base.MoreObjects;
import java.util.concurrent.TimeUnit;
-public class CustomizedRetryPolicy {
+/**
+ * CustomizedRetryPolicy is aim to make group's behavior compatible with messageDelayLevel
+ *
+ * @see <a href="https://github.com/apache/rocketmq/blob/3bd4b2b2f61a824196f19b03146e2c929c62777b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java#L137">org.apache.rocketmq.store.config.MessageStoreConfig</a>
+ */
+public class CustomizedRetryPolicy implements RetryPolicy {
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- private long[] next = new long[]{
+ private long[] next = new long[] {
TimeUnit.SECONDS.toMillis(1),
TimeUnit.SECONDS.toMillis(5),
TimeUnit.SECONDS.toMillis(10),
@@ -53,8 +58,30 @@ public class CustomizedRetryPolicy {
@Override
public String toString() {
- return "CustomizedRetryPolicy{" +
- "next=" + Arrays.toString(next) +
- '}';
+ return MoreObjects.toStringHelper(this)
+ .add("next", next)
+ .toString();
+ }
+
+ /**
+ * Index = reconsumeTimes + 2 is compatible logic, cause old delayLevelTable starts from index 1,
+ * and old index is reconsumeTime + 3
+ *
+ * @param reconsumeTimes Message reconsumeTimes {@link org.apache.rocketmq.common.message.MessageExt#getReconsumeTimes}
+ * @param timeUnit {@link TimeUnit}
+ * @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java#L210">org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor</a>
+ * @see <a href="https://github.com/apache/rocketmq/blob/3bddd514646826253a239f95959c14840a87034a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java#L242">org.apache.rocketmq.store.DefaultMessageStore</a>
+ */
+ @Override
+ public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
+ if (reconsumeTimes < 0) {
+ reconsumeTimes = 0;
+ }
+ int index = reconsumeTimes + 2;
+ if (index >= next.length) {
+ index = next.length - 1;
+ }
+ long nextDelayDurationInMillis = next[index];
+ return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java
index 2dd2765fb..ff16b027a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicy.java
@@ -17,12 +17,13 @@
package org.apache.rocketmq.common.subscription;
+import com.google.common.base.MoreObjects;
import java.util.concurrent.TimeUnit;
/**
* next delay time = min(max, initial * multiplier^reconsumeTimes)
*/
-public class ExponentialRetryPolicy {
+public class ExponentialRetryPolicy implements RetryPolicy {
private long initial = TimeUnit.SECONDS.toMillis(5);
private long max = TimeUnit.HOURS.toMillis(2);
private long multiplier = 2;
@@ -53,10 +54,22 @@ public class ExponentialRetryPolicy {
@Override
public String toString() {
- return "ExponentialRetryPolicy{" +
- "initial=" + initial +
- ", max=" + max +
- ", multiplier=" + multiplier +
- '}';
+ return MoreObjects.toStringHelper(this)
+ .add("initial", initial)
+ .add("max", max)
+ .add("multiplier", multiplier)
+ .toString();
+ }
+
+ @Override
+ public long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit) {
+ if (reconsumeTimes < 0) {
+ reconsumeTimes = 0;
+ }
+ if (reconsumeTimes > 32) {
+ reconsumeTimes = 32;
+ }
+ long nextDelayDurationInMillis = Math.min(max, initial * (long) Math.pow(multiplier, reconsumeTimes));
+ return timeUnit.convert(nextDelayDurationInMillis, TimeUnit.MILLISECONDS);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java
index 6cb7ca7dd..d3025235b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/GroupRetryPolicy.java
@@ -17,8 +17,12 @@
package org.apache.rocketmq.common.subscription;
+import com.alibaba.fastjson.annotation.JSONField;
+import com.google.common.base.MoreObjects;
+
public class GroupRetryPolicy {
- private GroupRetryPolicyType type = GroupRetryPolicyType.EXPONENTIAL;
+ private final static RetryPolicy DEFAULT_RETRY_POLICY = new CustomizedRetryPolicy();
+ private GroupRetryPolicyType type = GroupRetryPolicyType.CUSTOMIZED;
private ExponentialRetryPolicy exponentialRetryPolicy;
private CustomizedRetryPolicy customizedRetryPolicy;
@@ -46,12 +50,29 @@ public class GroupRetryPolicy {
this.customizedRetryPolicy = customizedRetryPolicy;
}
+ @JSONField(serialize = false, deserialize = false)
+ public RetryPolicy getRetryPolicy() {
+ if (GroupRetryPolicyType.EXPONENTIAL.equals(type)) {
+ if (exponentialRetryPolicy == null) {
+ return DEFAULT_RETRY_POLICY;
+ }
+ return exponentialRetryPolicy;
+ } else if (GroupRetryPolicyType.CUSTOMIZED.equals(type)) {
+ if (customizedRetryPolicy == null) {
+ return DEFAULT_RETRY_POLICY;
+ }
+ return customizedRetryPolicy;
+ } else {
+ return DEFAULT_RETRY_POLICY;
+ }
+ }
+
@Override
public String toString() {
- return "GroupRetryPolicy{" +
- "type=" + type +
- ", exponentialRetryPolicy=" + exponentialRetryPolicy +
- ", customizedRetryPolicy=" + customizedRetryPolicy +
- '}';
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("exponentialRetryPolicy", exponentialRetryPolicy)
+ .add("customizedRetryPolicy", customizedRetryPolicy)
+ .toString();
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/RetryPolicy.java b/common/src/main/java/org/apache/rocketmq/common/subscription/RetryPolicy.java
new file mode 100644
index 000000000..2af570da2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/RetryPolicy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import java.util.concurrent.TimeUnit;
+
+public interface RetryPolicy {
+ /**
+ * Compute message's next delay duration by specify reconsumeTimes
+ *
+ * @param reconsumeTimes Message reconsumeTimes
+ * @param timeUnit Given timeUnit
+ * @return Message's nextDelayDuration in given timeUnit
+ */
+ long nextDelayDuration(int reconsumeTimes, TimeUnit timeUnit);
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicyTest.java b/common/src/test/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicyTest.java
new file mode 100644
index 000000000..67f2c6251
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/subscription/CustomizedRetryPolicyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CustomizedRetryPolicyTest {
+
+ @Test
+ public void testNextDelayDuration() {
+ CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
+ long actual = customizedRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
+ actual = customizedRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.MINUTES.toMillis(9));
+ }
+
+ @Test
+ public void testNextDelayDurationOutOfRange() {
+ CustomizedRetryPolicy customizedRetryPolicy = new CustomizedRetryPolicy();
+ long actual = customizedRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(10));
+ actual = customizedRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
+ }
+}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicyTest.java b/common/src/test/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicyTest.java
new file mode 100644
index 000000000..abce11204
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/subscription/ExponentialRetryPolicyTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ExponentialRetryPolicyTest {
+
+ @Test
+ public void testNextDelayDuration() {
+ ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
+ long actual = exponentialRetryPolicy.nextDelayDuration(0, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
+ actual = exponentialRetryPolicy.nextDelayDuration(10, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(1024 * 5));
+ }
+
+ @Test
+ public void testNextDelayDurationOutOfRange() {
+ ExponentialRetryPolicy exponentialRetryPolicy = new ExponentialRetryPolicy();
+ long actual = exponentialRetryPolicy.nextDelayDuration(-1, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.SECONDS.toMillis(5));
+ actual = exponentialRetryPolicy.nextDelayDuration(100, TimeUnit.MILLISECONDS);
+ assertThat(actual).isEqualTo(TimeUnit.HOURS.toMillis(2));
+ }
+}
\ No newline at end of file
diff --git a/common/src/test/java/org/apache/rocketmq/common/subscription/GroupRetryPolicyTest.java b/common/src/test/java/org/apache/rocketmq/common/subscription/GroupRetryPolicyTest.java
new file mode 100644
index 000000000..0a81c5ca8
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/subscription/GroupRetryPolicyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class GroupRetryPolicyTest {
+
+ @Test
+ public void testGetRetryPolicy() {
+ GroupRetryPolicy groupRetryPolicy = new GroupRetryPolicy();
+ RetryPolicy retryPolicy = groupRetryPolicy.getRetryPolicy();
+ assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+ groupRetryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+ retryPolicy = groupRetryPolicy.getRetryPolicy();
+ assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+
+ groupRetryPolicy.setType(GroupRetryPolicyType.CUSTOMIZED);
+ groupRetryPolicy.setCustomizedRetryPolicy(new CustomizedRetryPolicy());
+ retryPolicy = groupRetryPolicy.getRetryPolicy();
+ assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+
+ groupRetryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL);
+ groupRetryPolicy.setExponentialRetryPolicy(new ExponentialRetryPolicy());
+ retryPolicy = groupRetryPolicy.getRetryPolicy();
+ assertThat(retryPolicy).isInstanceOf(ExponentialRetryPolicy.class);
+
+ groupRetryPolicy.setType(null);
+ retryPolicy = groupRetryPolicy.getRetryPolicy();
+ assertThat(retryPolicy).isInstanceOf(CustomizedRetryPolicy.class);
+ }
+}
\ No newline at end of file