You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/01 15:21:26 UTC

[GitHub] [flink] XComp commented on a diff in pull request #21368: [FLINK-30165][runtime][JUnit5 Migration] Migrate unaligned checkpoint related tests under flink-runtime module to junit5

XComp commented on code in PR #21368:
URL: https://github.com/apache/flink/pull/21368#discussion_r1037196683


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,49 @@
 
 import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {
     private static final long CHECKPOINT_ID = 42L;
     private static final String TASK_NAME = "test";
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddEventBuffer() throws Exception {
+    @Test
+    void testAddEventBuffer() throws Exception {
 
         NetworkBuffer dataBuf = getBuffer();
         NetworkBuffer eventBuf = getBuffer();
         eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-        try {
-            runWithSyncWorker(
-                    writer -> {
-                        callStart(writer);
-                        writer.addInputData(
-                                CHECKPOINT_ID,
-                                new InputChannelInfo(1, 1),
-                                1,
-                                ofElements(Buffer::recycleBuffer, eventBuf, dataBuf));
-                    });
-        } finally {
-            assertTrue(dataBuf.isRecycled());
-        }
+
+        runWithSyncWorker(
+                (writer, worker) -> {
+                    callStart(writer);
+                    callAddInputData(writer, dataBuf);

Review Comment:
   ```suggestion
                       callAddInputData(writer, eventBuf, dataBuf);
   ```
   Are you sure that we're still testing the same thing? The original code didn't call `worker.processAllRequests()` twice, as far as I can see. :thinking: 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,49 @@
 
 import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {
     private static final long CHECKPOINT_ID = 42L;
     private static final String TASK_NAME = "test";
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddEventBuffer() throws Exception {
+    @Test
+    void testAddEventBuffer() throws Exception {
 
         NetworkBuffer dataBuf = getBuffer();
         NetworkBuffer eventBuf = getBuffer();
         eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
-        try {
-            runWithSyncWorker(
-                    writer -> {
-                        callStart(writer);
-                        writer.addInputData(
-                                CHECKPOINT_ID,
-                                new InputChannelInfo(1, 1),
-                                1,
-                                ofElements(Buffer::recycleBuffer, eventBuf, dataBuf));
-                    });
-        } finally {
-            assertTrue(dataBuf.isRecycled());
-        }
+
+        runWithSyncWorker(

Review Comment:
   I also noticed the following: Refactoring this code in a way that we're using `assertThatThrownBy` is used changes the behavior of `runWithSyncWorker` slightly because of `worker.processAllRequests()` in [ChannelStateWriterImplTest:313](https://github.com/apache/flink/blob/772de6fc8698eea8b7b42c3a1de4282a61ba7e8f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java#L313). The old implementation didn't call `processAllRequests()` if an exception was thrown. The refactored version would actually cause this method to be called which might change the behavior. You might want to double-check whether that's something we want.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,100 +134,98 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() throws IOException {
+        NetworkBuffer buffer = getBuffer();
+        try (ChannelStateWriterImpl writer =
+                new ChannelStateWriterImpl(
+                        TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) {
+            writer.open();
+            assertThatThrownBy(() -> callAddInputData(writer, buffer))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasCauseInstanceOf(TestException.class);
+            assertThat(buffer.isRecycled()).isTrue();
+        }
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
         runWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() {

Review Comment:
   This entire change block has examples where we are too broad in asserting the exception.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -36,52 +35,49 @@
 
 import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamFactoryFactory;
 import static org.apache.flink.util.CloseableIterator.ofElements;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link ChannelStateWriterImpl} lifecycle tests. */
-public class ChannelStateWriterImplTest {
+class ChannelStateWriterImplTest {

Review Comment:
   nit: I cannot comment on that line because it wasn't touched in this PR but we could also change the signature of `runWithSyncWorker(BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception> testFn)` to `runWithSyncWorker(BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor> testFn)` since the exception throwing is not utilized anymore when relying on a more specific `assertThatThrownBy` in each test.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -237,52 +234,41 @@ public void testLimit() throws IOException {
             for (int i = 0; i < maxCheckpoints; i++) {
                 writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation());
             }
-            writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation());
+            assertThatThrownBy(
+                            () ->
+                                    writer.start(
+                                            maxCheckpoints,
+                                            CheckpointOptions.forCheckpointWithDefaultLocation()))
+                    .isInstanceOf(IllegalStateException.class);
         }
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testStartNotOpened() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) {
-                        callStart(writer);
-                    }
-                });
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoStartAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    writer.close();
-                    writer.start(42, CheckpointOptions.forCheckpointWithDefaultLocation());
-                });
+    @Test
+    void testStartNotOpened() throws IOException {
+        try (ChannelStateWriterImpl writer =
+                new ChannelStateWriterImpl(TASK_NAME, 0, getStreamFactoryFactory())) {
+            assertThatThrownBy(() -> callStart(writer))
+                    .hasCauseInstanceOf(IllegalStateException.class);
+        }
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testNoAddDataAfterClose() throws Exception {
-        unwrappingError(
-                IllegalStateException.class,
-                () -> {
-                    ChannelStateWriterImpl writer = openWriter();
-                    callStart(writer);
-                    writer.close();
-                    callAddInputData(writer);
-                });
+    @Test
+    void testNoStartAfterClose() throws IOException {
+        ChannelStateWriterImpl writer = openWriter();
+        writer.close();
+        assertThatThrownBy(
+                        () ->
+                                writer.start(
+                                        42, CheckpointOptions.forCheckpointWithDefaultLocation()))
+                .hasCauseInstanceOf(IllegalStateException.class);
     }
 
-    private static <T extends Throwable> void unwrappingError(
-            Class<T> clazz, RunnableWithException r) throws Exception {
-        try {
-            r.run();
-        } catch (Exception e) {
-            throw findThrowable(e, clazz).map(te -> (Exception) te).orElse(e);
-        }
+    @Test
+    void testNoAddDataAfterClose() throws IOException {
+        ChannelStateWriterImpl writer = openWriter();
+        callStart(writer);
+        writer.close();
+        assertThatThrownBy(() -> callAddInputData(writer));

Review Comment:
   We're missing the expected exception here



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -90,23 +86,26 @@ public void testAbort() throws Exception {
                     callAddInputData(writer, buffer);
                     callAbort(writer);
                     worker.processAllRequests();
-                    assertTrue(result.isDone());
-                    assertTrue(buffer.isRecycled());
+                    assertThat(result.isDone()).isTrue();
+                    assertThat(buffer.isRecycled()).isTrue();
                 });
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testAbortClearsResults() throws Exception {
-        runWithSyncWorker(
-                (writer, worker) -> {
-                    callStart(writer);
-                    writer.abort(CHECKPOINT_ID, new TestException(), true);
-                    writer.getAndRemoveWriteResult(CHECKPOINT_ID);
-                });
+    @Test
+    void testAbortClearsResults() {
+        assertThatThrownBy(

Review Comment:
   the assert on the exception can be made more specific here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org