You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/12/08 20:24:08 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21368: [FLINK-30165][runtime][JUnit5 Migration] Migrate unaligned checkpoint related tests under flink-runtime module to junit5

rkhachatryan commented on code in PR #21368:
URL: https://github.com/apache/flink/pull/21368#discussion_r1043759772


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java:
##########
@@ -34,61 +34,66 @@
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateChunkReader} test. */
-public class ChannelStateChunkReaderTest {
+class ChannelStateChunkReaderTest {
 
-    @Test(expected = TestException.class)
-    public void testBufferRecycledOnFailure() throws IOException, InterruptedException {
+    @Test
+    void testBufferRecycledOnFailure() throws IOException {
         FailingChannelStateSerializer serializer = new FailingChannelStateSerializer();
         TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler();
 
         try (FSDataInputStream stream = getStream(serializer, 10)) {
-            new ChannelStateChunkReader(serializer)
-                    .readChunk(stream, serializer.getHeaderLength(), handler, "channelInfo", 0);
-        } finally {
-            checkState(serializer.failed);
-            checkState(!handler.requestedBuffers.isEmpty());
-            assertTrue(
-                    handler.requestedBuffers.stream()
-                            .allMatch(TestChannelStateByteBuffer::isRecycled));
+            assertThatThrownBy(
+                            () ->
+                                    new ChannelStateChunkReader(serializer)
+                                            .readChunk(
+                                                    stream,
+                                                    serializer.getHeaderLength(),
+                                                    handler,
+                                                    "channelInfo",
+                                                    0))
+                    .isInstanceOf(TestException.class);
+            assertThat(serializer.failed).isTrue();
+            assertThat(handler.requestedBuffers)
+                    .isNotEmpty()
+                    .allMatch(TestChannelStateByteBuffer::isRecycled);
         }
     }
 
     @Test
-    public void testBufferRecycledOnSuccess() throws IOException, InterruptedException {
+    void testBufferRecycledOnSuccess() throws IOException, InterruptedException {
         ChannelStateSerializer serializer = new ChannelStateSerializerImpl();
         TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler();
 
         try (FSDataInputStream stream = getStream(serializer, 10)) {
             new ChannelStateChunkReader(serializer)
                     .readChunk(stream, serializer.getHeaderLength(), handler, "channelInfo", 0);
         } finally {
-            checkState(!handler.requestedBuffers.isEmpty());
-            assertTrue(
-                    handler.requestedBuffers.stream()
-                            .allMatch(TestChannelStateByteBuffer::isRecycled));
+            assertThat(handler.requestedBuffers)
+                    .isNotEmpty()

Review Comment:
   I think it's better to keep `checkState` here for the same reason
   
   > - the intention of the test is more clear (make sure buffers are recycled)
   > - if a precondition fails, the first thing to look at is the test itself, rather than production code



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java:
##########
@@ -34,61 +34,66 @@
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link ChannelStateChunkReader} test. */
-public class ChannelStateChunkReaderTest {
+class ChannelStateChunkReaderTest {
 
-    @Test(expected = TestException.class)
-    public void testBufferRecycledOnFailure() throws IOException, InterruptedException {
+    @Test
+    void testBufferRecycledOnFailure() throws IOException {
         FailingChannelStateSerializer serializer = new FailingChannelStateSerializer();
         TestRecoveredChannelStateHandler handler = new TestRecoveredChannelStateHandler();
 
         try (FSDataInputStream stream = getStream(serializer, 10)) {
-            new ChannelStateChunkReader(serializer)
-                    .readChunk(stream, serializer.getHeaderLength(), handler, "channelInfo", 0);
-        } finally {
-            checkState(serializer.failed);
-            checkState(!handler.requestedBuffers.isEmpty());
-            assertTrue(
-                    handler.requestedBuffers.stream()
-                            .allMatch(TestChannelStateByteBuffer::isRecycled));
+            assertThatThrownBy(
+                            () ->
+                                    new ChannelStateChunkReader(serializer)
+                                            .readChunk(
+                                                    stream,
+                                                    serializer.getHeaderLength(),
+                                                    handler,
+                                                    "channelInfo",
+                                                    0))
+                    .isInstanceOf(TestException.class);
+            assertThat(serializer.failed).isTrue();

Review Comment:
   1. I'd keep `checkState` here instead of assertion so that:
   - the intention of the test is more clear (make sure buffers are recycled)
   - if a precondition fails, the first thing to look at is the test itself, rather than production code
   
   2. I'd keep the checks in order: `serializer.failed` then ExpectedException; that gives more details in case any of the checks fail



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java:
##########
@@ -135,100 +130,99 @@ public void testAbortOldAndStartNewCheckpoint() throws Exception {
                     worker.processAllRequests();
 
                     ChannelStateWriteResult result42 = writer.getAndRemoveWriteResult(checkpoint42);
-                    assertTrue(result42.isDone());
-                    try {
-                        result42.getInputChannelStateHandles().get();
-                        fail("The result should have failed.");
-                    } catch (Throwable throwable) {
-                        assertTrue(findThrowable(throwable, TestException.class).isPresent());
-                    }
+                    assertThat(result42.isDone()).isTrue();
+                    assertThatThrownBy(() -> result42.getInputChannelStateHandles().get())
+                            .as("The result should have failed.")
+                            .hasCauseInstanceOf(TestException.class);
 
                     ChannelStateWriteResult result43 = writer.getAndRemoveWriteResult(checkpoint43);
-                    assertFalse(result43.isDone());
+                    assertThat(result43.isDone()).isFalse();
                 });
     }
 
-    @Test(expected = TestException.class)
-    public void testBuffersRecycledOnError() throws Exception {
-        unwrappingError(
-                TestException.class,
-                () -> {
-                    NetworkBuffer buffer = getBuffer();
-                    try (ChannelStateWriterImpl writer =
-                            new ChannelStateWriterImpl(
-                                    TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) {
-                        writer.open();
-                        callAddInputData(writer, buffer);
-                    } finally {
-                        assertTrue(buffer.isRecycled());
-                    }
-                });
+    @Test
+    void testBuffersRecycledOnError() throws IOException {
+        NetworkBuffer buffer = getBuffer();
+        try (ChannelStateWriterImpl writer =
+                new ChannelStateWriterImpl(
+                        TASK_NAME, new ConcurrentHashMap<>(), failingWorker(), 5)) {
+            writer.open();
+            assertThatThrownBy(() -> callAddInputData(writer, buffer))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasCauseInstanceOf(TestException.class);
+            assertThat(buffer.isRecycled()).isTrue();
+        }
     }
 
     @Test
-    public void testBuffersRecycledOnClose() throws Exception {
+    void testBuffersRecycledOnClose() throws Exception {
         NetworkBuffer buffer = getBuffer();
-        runWithSyncWorker(
+        executeCallbackAndProcessWithSyncWorker(
                 writer -> {
                     callStart(writer);
                     callAddInputData(writer, buffer);
-                    assertFalse(buffer.isRecycled());
+                    assertThat(buffer.isRecycled()).isFalse();
                 });
-        assertTrue(buffer.isRecycled());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoAddDataAfterFinished() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    callStart(writer);
-                                    callFinish(writer);
-                                    callAddInputData(writer);
-                                }));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testAddDataNotStarted() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () -> runWithSyncWorker((Consumer<ChannelStateWriter>) this::callAddInputData));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testFinishNotStarted() throws Exception {
-        unwrappingError(IllegalArgumentException.class, () -> runWithSyncWorker(this::callFinish));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testRethrowOnClose() throws Exception {
-        unwrappingError(
-                IllegalArgumentException.class,
-                () ->
-                        runWithSyncWorker(
-                                writer -> {
-                                    try {
-                                        callFinish(writer);
-                                    } catch (IllegalArgumentException e) {
-                                        // ignore here - should rethrow in close
-                                    }
-                                }));
-    }
-
-    @Test(expected = TestException.class)
-    public void testRethrowOnNextCall() throws Exception {
+        assertThat(buffer.isRecycled()).isTrue();
+    }
+
+    @Test
+    void testNoAddDataAfterFinished() throws Exception {
+        executeCallbackWithSyncWorker(
+                (writer, worker) -> {
+                    callStart(writer);
+                    callFinish(writer);
+                    worker.processAllRequests();
+
+                    callAddInputData(writer);
+                    assertThatThrownBy(worker::processAllRequests)
+                            .isInstanceOf(IllegalArgumentException.class);
+                });
+    }
+
+    @Test
+    void testAddDataNotStarted() {
+        assertThatThrownBy(
+                        () ->
+                                executeCallbackAndProcessWithSyncWorker(
+                                        (Consumer<ChannelStateWriter>) this::callAddInputData))

Review Comment:
   nit: the cast here seems unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org