You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/24 13:37:12 UTC

[GitHub] [flink] reswqa commented on a diff in pull request #21368: [FLINK-30165][runtime][JUnit5 Migration] Migrate unaligned checkpoint related tests under flink-runtime module to junit5

reswqa commented on code in PR #21368:
URL: https://github.com/apache/flink/pull/21368#discussion_r1031507436


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/CheckpointInProgressRequestTest.java:
##########
@@ -55,7 +55,7 @@ public void testNoCancelTwice() throws Exception {
             threads[i].join();
         }
 
-        assertEquals(1, counter.get());
+        assertThat(counter.get()).isOne();

Review Comment:
   ```suggestion
           assertThat(counter).hasValue(1);
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,145 +144,196 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        TestException.class,
+                                        () -> {
+                                            NetworkBuffer buffer = getBuffer();
+                                            try (ChannelStateWriterImpl writer =
+                                                    new ChannelStateWriterImpl(
+                                                            TASK_NAME,
+                                                            new ConcurrentHashMap<>(),
+                                                            failingWorker(),
+                                                            5)) {
+                                                writer.open();
+                                                callAddInputData(writer, buffer);
+                                            } finally {
+                                                assertThat(buffer.isRecycled()).isTrue();
+                                            }
+                                        }))
+                .isInstanceOf(TestException.class);
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        IllegalArgumentException.class,
+                                        () ->
+                                                runWithSyncWorker(
+                                                        writer -> {
+                                                            callStart(writer);
+                                                            callFinish(writer);
+                                                            callAddInputData(writer);
+                                                        })))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        IllegalArgumentException.class,
+                                        () ->
+                                                runWithSyncWorker(
+                                                        (Consumer<ChannelStateWriter>)
+                                                                this::callAddInputData)))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testFinishNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        IllegalArgumentException.class,
+                                        () -> runWithSyncWorker(this::callFinish)))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnClose() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        IllegalArgumentException.class,
+                                        () ->
+                                                runWithSyncWorker(
+                                                        writer -> {
+                                                            try {
+                                                                callFinish(writer);
+                                                            } catch (IllegalArgumentException e) {
+                                                                // ignore here - should rethrow in
+                                                                // close
+                                                            }
+                                                        })))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testRethrowOnNextCall() {
         SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor();
         ChannelStateWriterImpl writer =
                 new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap<>(), worker, 5);
-        writer.open();
-        worker.setThrown(new TestException());
-        unwrappingError(TestException.class, () -> callStart(writer));
+        assertThatThrownBy(
+                        () -> {
+                            writer.open();
+                            worker.setThrown(new TestException());
+                            unwrappingError(TestException.class, () -> callStart(writer));
+                        })
+                .isInstanceOf(TestException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testLimit() throws IOException {
+    @Test
+    void testLimit() {
         int maxCheckpoints = 3;
-        try (ChannelStateWriterImpl writer =
-                new ChannelStateWriterImpl(
-                        TASK_NAME, 0, getStreamFactoryFactory(), maxCheckpoints)) {
-            writer.open();
-            for (int i = 0; i < maxCheckpoints; i++) {
-                writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation());
-            }
-            writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation());
-        }
+        assertThatThrownBy(
+                        () -> {
+                            try (ChannelStateWriterImpl writer =
+                                    new ChannelStateWriterImpl(
+                                            TASK_NAME,
+                                            0,
+                                            getStreamFactoryFactory(),
+                                            maxCheckpoints)) {
+                                writer.open();
+                                for (int i = 0; i < maxCheckpoints; i++) {
+                                    writer.start(
+                                            i,
+                                            CheckpointOptions.forCheckpointWithDefaultLocation());
+                                }
+                                writer.start(
+                                        maxCheckpoints,
+                                        CheckpointOptions.forCheckpointWithDefaultLocation());
+                            }
+                        })
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testStartNotOpened() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) {
-                        callStart(writer);
-                    }
-                });
+    @Test
+    void testStartNotOpened() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        IllegalStateException.class,
+                                        () -> {
+                                            try (ChannelStateWriterImpl writer =
+                                                    new ChannelStateWriterImpl(
+                                                            TASK_NAME,
+                                                            0,
+                                                            getStreamFactoryFactory())) {
+                                                callStart(writer);
+                                            }
+                                        }))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoStartAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    writer.close();
-                    writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation());
-                });
+    @Test
+    void testNoStartAfterClose() {
+        assertThatThrownBy(
+                        () ->
+                                unwrappingError(
+                                        IllegalStateException.class,
+                                        () -> {
+                                            ChannelStateWriterImpl writer = openWriter();
+                                            writer.close();
+                                            writer.start(
+                                                    42,
+                                                    CheckpointOptions
+                                                            .forCheckpointWithDefaultLocation());
+                                        }))
+                .isInstanceOf(IllegalStateException.class);

Review Comment:
   ```suggestion
           Assertions.assertThatThrownBy(
                           () -> {
                               ChannelStateWriterImpl writer = openWriter();
                               writer.close();
                               writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation());
                           })
                   .hasCauseInstanceOf(IllegalStateException.class);
   ```
   Maybe we can using `hasCauseInstanceOf` directly instead of `unwrappingError`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org