You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by hu...@apache.org on 2023/12/05 05:35:36 UTC

(flink) branch master updated (acd3e7ab66c -> 0acfc1a51e8)

This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from acd3e7ab66c [FLINK-27529] [connector/common] Fix Integer Comparison For HybridSource sourceIndex (#23703)
     new 2c5bc580e6c [hotfix][tests][JUnit5 migration] Migrate FixedRetryStrategyTest/ExponentialBackoffRetryStrategyTest to Junit5 and Assertj
     new 0acfc1a51e8 [FLINK-33702][core] Add the IncrementalDelayRetryStrategy implementation of RetryStrategy

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 ...egy.java => IncrementalDelayRetryStrategy.java} | 28 ++++---
 .../ExponentialBackoffRetryStrategyTest.java       | 47 ++++++------
 .../util/concurrent/FixedRetryStrategyTest.java    | 25 ++++---
 .../IncrementalDelayRetryStrategyTest.java         | 85 ++++++++++++++++++++++
 4 files changed, 144 insertions(+), 41 deletions(-)
 copy flink-core/src/main/java/org/apache/flink/util/concurrent/{ExponentialBackoffRetryStrategy.java => IncrementalDelayRetryStrategy.java} (71%)
 create mode 100644 flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java


(flink) 02/02: [FLINK-33702][core] Add the IncrementalDelayRetryStrategy implementation of RetryStrategy

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0acfc1a51e8802ebd0eb1ab5c00bbfab5032ebdf
Author: Xiangyu Feng <xi...@gmail.com>
AuthorDate: Fri Dec 1 23:59:36 2023 +0800

    [FLINK-33702][core] Add the IncrementalDelayRetryStrategy implementation of RetryStrategy
---
 .../concurrent/IncrementalDelayRetryStrategy.java  | 80 ++++++++++++++++++++
 .../IncrementalDelayRetryStrategyTest.java         | 85 ++++++++++++++++++++++
 2 files changed, 165 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java
new file mode 100644
index 00000000000..a412a307e3d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import java.time.Duration;
+
+/** An implementation of {@link RetryStrategy} that retries at an incremental delay with a cap. */
+public class IncrementalDelayRetryStrategy implements RetryStrategy {
+    private final int remainingRetries;
+    private final Duration currentRetryDelay;
+    private final Duration increment;
+    private final Duration maxRetryDelay;
+
+    /**
+     * @param remainingRetries number of times to retry
+     * @param currentRetryDelay the current delay between retries
+     * @param increment the delay increment between retries
+     * @param maxRetryDelay the max delay between retries
+     */
+    public IncrementalDelayRetryStrategy(
+            int remainingRetries,
+            Duration currentRetryDelay,
+            Duration increment,
+            Duration maxRetryDelay) {
+        Preconditions.checkArgument(
+                remainingRetries >= 0, "The number of retries must be greater or equal to 0.");
+        this.remainingRetries = remainingRetries;
+        Preconditions.checkArgument(
+                currentRetryDelay.toMillis() >= 0, "The currentRetryDelay must be positive");
+        this.currentRetryDelay = currentRetryDelay;
+        Preconditions.checkArgument(
+                increment.toMillis() >= 0, "The delay increment must be greater or equal to 0.");
+        this.increment = increment;
+        Preconditions.checkArgument(
+                maxRetryDelay.toMillis() >= 0, "The maxRetryDelay must be positive");
+        this.maxRetryDelay = maxRetryDelay;
+    }
+
+    @Override
+    public int getNumRemainingRetries() {
+        return remainingRetries;
+    }
+
+    @Override
+    public Duration getRetryDelay() {
+        return currentRetryDelay;
+    }
+
+    @Override
+    public RetryStrategy getNextRetryStrategy() {
+        int nextRemainingRetries = remainingRetries - 1;
+        Preconditions.checkState(
+                nextRemainingRetries >= 0, "The number of remaining retries must not be negative");
+        long nextRetryDelayMillis =
+                Math.min(currentRetryDelay.plus(increment).toMillis(), maxRetryDelay.toMillis());
+        return new IncrementalDelayRetryStrategy(
+                nextRemainingRetries,
+                Duration.ofMillis(nextRetryDelayMillis),
+                increment,
+                maxRetryDelay);
+    }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java b/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java
new file mode 100644
index 00000000000..7920d40a29d
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/concurrent/IncrementalDelayRetryStrategyTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.concurrent;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link IncrementalDelayRetryStrategy}. */
+public class IncrementalDelayRetryStrategyTest extends TestLogger {
+
+    @Test
+    public void testGettersNotCapped() throws Exception {
+        RetryStrategy retryStrategy =
+                new IncrementalDelayRetryStrategy(
+                        10, Duration.ofMillis(5L), Duration.ofMillis(4L), Duration.ofMillis(20L));
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(10);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L));
+
+        RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(9);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(9L));
+    }
+
+    @Test
+    public void testGettersHitCapped() throws Exception {
+        RetryStrategy retryStrategy =
+                new IncrementalDelayRetryStrategy(
+                        5, Duration.ofMillis(15L), Duration.ofMillis(10L), Duration.ofMillis(20L));
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(15L));
+
+        RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
+    }
+
+    @Test
+    public void testGettersAtCap() throws Exception {
+        RetryStrategy retryStrategy =
+                new IncrementalDelayRetryStrategy(
+                        5, Duration.ofMillis(20L), Duration.ofMillis(5L), Duration.ofMillis(20L));
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
+
+        RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
+    }
+
+    /** Tests that getting a next RetryStrategy below zero remaining retries fails. */
+    @Test
+    public void testRetryFailure() {
+        assertThatThrownBy(
+                        () ->
+                                new IncrementalDelayRetryStrategy(
+                                                0,
+                                                Duration.ofMillis(20L),
+                                                Duration.ofMillis(5L),
+                                                Duration.ofMillis(20L))
+                                        .getNextRetryStrategy())
+                .isInstanceOf(IllegalStateException.class);
+    }
+}


(flink) 01/02: [hotfix][tests][JUnit5 migration] Migrate FixedRetryStrategyTest/ExponentialBackoffRetryStrategyTest to Junit5 and Assertj

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

huweihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c5bc580e6c10fb3a2724a945847b5cc6b28df27
Author: Xiangyu Feng <xi...@gmail.com>
AuthorDate: Mon Dec 4 20:29:08 2023 +0800

    [hotfix][tests][JUnit5 migration] Migrate FixedRetryStrategyTest/ExponentialBackoffRetryStrategyTest to Junit5 and Assertj
---
 .../ExponentialBackoffRetryStrategyTest.java       | 47 ++++++++++++----------
 .../util/concurrent/FixedRetryStrategyTest.java    | 25 +++++++-----
 2 files changed, 41 insertions(+), 31 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/util/concurrent/ExponentialBackoffRetryStrategyTest.java b/flink-core/src/test/java/org/apache/flink/util/concurrent/ExponentialBackoffRetryStrategyTest.java
index 895332229f3..8430b978e78 100644
--- a/flink-core/src/test/java/org/apache/flink/util/concurrent/ExponentialBackoffRetryStrategyTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/concurrent/ExponentialBackoffRetryStrategyTest.java
@@ -20,58 +20,63 @@ package org.apache.flink.util.concurrent;
 
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link ExponentialBackoffRetryStrategy}. */
 public class ExponentialBackoffRetryStrategyTest extends TestLogger {
 
     @Test
-    public void testGettersNotCapped() throws Exception {
+    public void testGettersNotCapped() {
         RetryStrategy retryStrategy =
                 new ExponentialBackoffRetryStrategy(
                         10, Duration.ofMillis(5L), Duration.ofMillis(20L));
-        assertEquals(10, retryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(5L), retryStrategy.getRetryDelay());
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(10);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L));
 
         RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
-        assertEquals(9, nextRetryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(10L), nextRetryStrategy.getRetryDelay());
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(9);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(10L));
     }
 
     @Test
-    public void testGettersHitCapped() throws Exception {
+    public void testGettersHitCapped() {
         RetryStrategy retryStrategy =
                 new ExponentialBackoffRetryStrategy(
                         5, Duration.ofMillis(15L), Duration.ofMillis(20L));
-        assertEquals(5, retryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(15L), retryStrategy.getRetryDelay());
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(15L));
 
         RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
-        assertEquals(4, nextRetryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(20L), nextRetryStrategy.getRetryDelay());
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
     }
 
     @Test
-    public void testGettersAtCap() throws Exception {
+    public void testGettersAtCap() {
         RetryStrategy retryStrategy =
                 new ExponentialBackoffRetryStrategy(
                         5, Duration.ofMillis(20L), Duration.ofMillis(20L));
-        assertEquals(5, retryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(20L), retryStrategy.getRetryDelay());
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
 
         RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
-        assertEquals(4, nextRetryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(20L), nextRetryStrategy.getRetryDelay());
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
     }
 
     /** Tests that getting a next RetryStrategy below zero remaining retries fails. */
-    @Test(expected = IllegalStateException.class)
-    public void testRetryFailure() throws Throwable {
-        new ExponentialBackoffRetryStrategy(0, Duration.ofMillis(20L), Duration.ofMillis(20L))
-                .getNextRetryStrategy();
+    @Test
+    public void testRetryFailure() {
+        assertThatThrownBy(
+                        () ->
+                                new ExponentialBackoffRetryStrategy(
+                                                0, Duration.ofMillis(20L), Duration.ofMillis(20L))
+                                        .getNextRetryStrategy())
+                .isInstanceOf(IllegalStateException.class);
     }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/util/concurrent/FixedRetryStrategyTest.java b/flink-core/src/test/java/org/apache/flink/util/concurrent/FixedRetryStrategyTest.java
index e3b9a754fc1..e7078d77ce9 100644
--- a/flink-core/src/test/java/org/apache/flink/util/concurrent/FixedRetryStrategyTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/concurrent/FixedRetryStrategyTest.java
@@ -20,29 +20,34 @@ package org.apache.flink.util.concurrent;
 
 import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link FixedRetryStrategy}. */
 public class FixedRetryStrategyTest extends TestLogger {
 
     @Test
-    public void testGetters() throws Exception {
+    public void testGetters() {
         RetryStrategy retryStrategy = new FixedRetryStrategy(10, Duration.ofMillis(5L));
-        assertEquals(10, retryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(5L), retryStrategy.getRetryDelay());
+        assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(10);
+        assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L));
 
         RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
-        assertEquals(9, nextRetryStrategy.getNumRemainingRetries());
-        assertEquals(Duration.ofMillis(5L), nextRetryStrategy.getRetryDelay());
+        assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(9);
+        assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L));
     }
 
     /** Tests that getting a next RetryStrategy below zero remaining retries fails. */
-    @Test(expected = IllegalStateException.class)
-    public void testRetryFailure() throws Throwable {
-        new FixedRetryStrategy(0, Duration.ofMillis(5L)).getNextRetryStrategy();
+    @Test
+    public void testRetryFailure() {
+        assertThatThrownBy(
+                        () ->
+                                new FixedRetryStrategy(0, Duration.ofMillis(5L))
+                                        .getNextRetryStrategy())
+                .isInstanceOf(IllegalStateException.class);
     }
 }