You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/21 13:36:26 UTC

[flink] branch release-1.15 updated (543e885 -> 71ee033)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 543e885  [FLINK-26592][state/changelog] Use mailbox in FsStateChangelogWriter instead of a lock
     new c336b84  [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.
     new 4aec117  [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.
     new 06b2b1d  [FLINK-26420][test] migrate FileWriterTest to AssertJ.
     new 71ee033  [FLINK-26420][test] migrate AsyncSinkWriterTest to AssertJ.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/sink/writer/AsyncSinkWriter.java          |  12 +-
 .../base/sink/writer/AsyncSinkWriterTest.java      | 224 ++++++++++++---------
 .../base/sink/writer/TestSinkInitContext.java      |   4 +-
 .../connector/file/sink/writer/FileWriter.java     |   7 +-
 .../connector/file/sink/writer/FileWriterTest.java | 176 ++++++++--------
 5 files changed, 237 insertions(+), 186 deletions(-)

[flink] 01/04: [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c336b848f98bd92df4960126c4335d45289cff21
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 16:00:02 2022 +0100

    [FLINK-26420][File] use numRecordsSendCounter from SinkWriterMetricGroup directly.
    
    (cherry picked from commit 3ca38240496d1d0f1289f5aecf1226f537e30233)
---
 .../flink/connector/file/sink/writer/FileWriter.java |  7 +++----
 .../connector/file/sink/writer/FileWriterTest.java   | 20 +++++++++++---------
 2 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index fad3b50..51cc6d8 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -92,7 +92,7 @@ public class FileWriter<IN>
 
     private final OutputFileConfig outputFileConfig;
 
-    private final Counter recordsOutCounter;
+    private final Counter numRecordsSendCounter;
 
     private boolean endOfInput;
 
@@ -128,8 +128,7 @@ public class FileWriter<IN>
         this.activeBuckets = new HashMap<>();
         this.bucketerContext = new BucketerContext();
 
-        this.recordsOutCounter =
-                checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
+        this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter();
         this.processingTimeService = checkNotNull(processingTimeService);
         checkArgument(
                 bucketCheckInterval > 0,
@@ -196,7 +195,7 @@ public class FileWriter<IN>
         final String bucketId = bucketAssigner.getBucketId(element, bucketerContext);
         final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId);
         bucket.write(element, processingTimeService.getCurrentProcessingTime());
-        recordsOutCounter.inc();
+        numRecordsSendCounter.inc();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index e521f61..8966ca7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -283,16 +283,19 @@ public class FileWriterTest {
     public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
+        final SinkWriterMetricGroup sinkWriterMetricGroup =
+                InternalSinkWriterMetricGroup.mock(
+                        metricListener.getMetricGroup(), operatorIOMetricGroup);
         File outDir = TEMP_FOLDER.newFolder();
         Path path = new Path(outDir.toURI());
-        Counter recordsCounter = operatorIOMetricGroup.getNumRecordsOutCounter();
+        Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
         SinkWriter.Context context = new ContextImpl();
         FileWriter<String> fileWriter =
                 createWriter(
                         path,
                         DefaultRollingPolicy.builder().build(),
                         new OutputFileConfig("part-", ""),
-                        operatorIOMetricGroup);
+                        sinkWriterMetricGroup);
 
         assertEquals(0, recordsCounter.getCount());
         fileWriter.write("1", context);
@@ -432,13 +435,8 @@ public class FileWriterTest {
             Path basePath,
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig,
-            OperatorIOMetricGroup operatorIOMetricGroup)
+            SinkWriterMetricGroup sinkWriterMetricGroup)
             throws IOException {
-        final SinkWriterMetricGroup sinkWriterMetricGroup =
-                operatorIOMetricGroup == null
-                        ? InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())
-                        : InternalSinkWriterMetricGroup.mock(
-                                metricListener.getMetricGroup(), operatorIOMetricGroup);
         return new FileWriter<>(
                 basePath,
                 sinkWriterMetricGroup,
@@ -458,7 +456,11 @@ public class FileWriterTest {
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig)
             throws IOException {
-        return createWriter(basePath, rollingPolicy, outputFileConfig, null);
+        return createWriter(
+                basePath,
+                rollingPolicy,
+                outputFileConfig,
+                InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
     }
 
     private FileWriter<String> createWriter(

[flink] 04/04: [FLINK-26420][test] migrate AsyncSinkWriterTest to AssertJ.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71ee033865a3adc3247c47d8636b16e99b924470
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 18:38:56 2022 +0100

    [FLINK-26420][test] migrate AsyncSinkWriterTest to AssertJ.
    
    (cherry picked from commit a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa)
---
 .../base/sink/writer/AsyncSinkWriterTest.java      | 224 ++++++++++++---------
 1 file changed, 130 insertions(+), 94 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index c6e95c7..47c437e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,11 +41,8 @@ import java.util.stream.Collectors;
 import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
@@ -57,7 +54,7 @@ public class AsyncSinkWriterTest {
     private TestSinkInitContext sinkInitContext;
     private TestSinkInitContextAnyThreadMailbox sinkInitContextAnyThreadMailbox;
 
-    @Before
+    @BeforeEach
     public void before() {
         res.clear();
         sinkInitContext = new TestSinkInitContext();
@@ -77,17 +74,20 @@ public class AsyncSinkWriterTest {
     public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten()
             throws IOException, InterruptedException {
         performNormalWriteOfEightyRecordsToMock();
-        assertEquals(80, res.size());
+
+        assertThat(res.size()).isEqualTo(80);
     }
 
     @Test
     public void testMetricsGroupHasLoggedNumberOfRecordsAndNumberOfBytesCorrectly()
             throws IOException, InterruptedException {
         performNormalWriteOfEightyRecordsToMock();
-        assertEquals(80, sinkInitContext.getNumRecordsOutCounter().getCount());
-        assertEquals(320, sinkInitContext.getNumBytesOutCounter().getCount());
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() >= 0);
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() < 1000);
+
+        assertThat(sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(80);
+        assertThat(sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(320);
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue())
+                .isGreaterThanOrEqualTo(0);
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(1000);
     }
 
     @Test
@@ -101,8 +101,10 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 4; i++) {
             sink.write(String.valueOf(i));
         }
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() >= 99);
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() < 110);
+
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue())
+                .isGreaterThanOrEqualTo(99);
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(110);
     }
 
     @Test
@@ -113,7 +115,8 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 23; i++) {
             sink.write(String.valueOf(i));
         }
-        assertEquals(20, res.size());
+
+        assertThat(res.size()).isEqualTo(20);
         assertThatBufferStatesAreEqual(sink.wrapRequests(20, 21, 22), getWriterState(sink));
     }
 
@@ -129,7 +132,8 @@ public class AsyncSinkWriterTest {
         sink.write("1"); // 4 bytes per record
         sink.write("2"); // to give 12 bytes in final flush
         sink.write("3");
-        assertEquals(3, res.size());
+
+        assertThat(res.size()).isEqualTo(3);
     }
 
     @Test
@@ -141,7 +145,8 @@ public class AsyncSinkWriterTest {
             sink.write(String.valueOf(i));
         }
         sink.flush(true);
-        assertEquals(23, res.size());
+
+        assertThat(res.size()).isEqualTo(23);
     }
 
     @Test
@@ -151,7 +156,8 @@ public class AsyncSinkWriterTest {
                 new AsyncSinkWriterImplBuilder().context(sinkInitContext).build();
         sink.write(String.valueOf(0));
         sink.flush(true);
-        assertEquals(1, res.size());
+
+        assertThat(res.size()).isEqualTo(1);
     }
 
     @Test
@@ -162,12 +168,14 @@ public class AsyncSinkWriterTest {
 
         sink.write("25");
         sink.write("55");
+
         assertThatBufferStatesAreEqual(sink.wrapRequests(25, 55), getWriterState(sink));
-        assertEquals(0, res.size());
+        assertThat(res.size()).isEqualTo(0);
 
         sink.write("75");
+
         assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
-        assertEquals(3, res.size());
+        assertThat(res.size()).isEqualTo(3);
     }
 
     public void writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing()
@@ -180,6 +188,7 @@ public class AsyncSinkWriterTest {
         sink.write("75");
         sink.write("95");
         sink.write("955");
+
         assertThatBufferStatesAreEqual(sink.wrapRequests(95, 955), getWriterState(sink));
         sink.flush(true);
         assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
@@ -189,15 +198,17 @@ public class AsyncSinkWriterTest {
     public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush()
             throws IOException, InterruptedException {
         writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
-        assertEquals(5, res.size());
+
+        assertThat(res.size()).isEqualTo(5);
     }
 
     @Test
     public void metricsAreLoggedEachTimeSubmitRequestEntriesIsCalled()
             throws IOException, InterruptedException {
         writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
-        assertEquals(5, sinkInitContext.getNumRecordsOutCounter().getCount());
-        assertEquals(20, sinkInitContext.getNumBytesOutCounter().getCount());
+
+        assertThat(sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(5);
+        assertThat(sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(20);
     }
 
     @Test
@@ -215,11 +226,11 @@ public class AsyncSinkWriterTest {
         sink.write("75");
         sink.write("95");
         sink.write("35");
-        Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
-        assertEquals(
-                "Deliberate runtime exception occurred in SinkWriterImplementation.",
-                e.getMessage());
-        assertEquals(3, res.size());
+
+        assertThatThrownBy(() -> sink.write("135"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("Deliberate runtime exception occurred in SinkWriterImplementation.");
+        assertThat(res.size()).isEqualTo(3);
     }
 
     @Test
@@ -292,8 +303,8 @@ public class AsyncSinkWriterTest {
         sink.flush(true);
 
         // Everything is saved
-        assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535), res);
-        assertEquals(0, getWriterState(sink).getStateSize());
+        assertThat(res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535));
+        assertThat(getWriterState(sink).getStateSize()).isEqualTo(0);
     }
 
     @Test
@@ -342,40 +353,39 @@ public class AsyncSinkWriterTest {
         // Buffer continues to fill up without blocking on write, until eventually yield is called
         // on the mailbox thread during the prepare commit
         sink.flush(true);
-        assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505), res);
+
+        assertThat(res)
+                .isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505));
     }
 
     @Test
     public void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() {
-        Exception e =
-                assertThrows(
-                        IllegalArgumentException.class,
+        assertThatThrownBy(
                         () ->
                                 new AsyncSinkWriterImplBuilder()
                                         .context(sinkInitContext)
                                         .maxBufferedRequests(10)
-                                        .build());
-        assertEquals(
-                "The maximum number of requests that may be buffered should be "
-                        + "strictly greater than the maximum number of requests per batch.",
-                e.getMessage());
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The maximum number of requests that may be buffered should be "
+                                + "strictly greater than the maximum number of requests per batch.");
     }
 
     @Test
     public void maxRecordSizeSetMustBeSmallerThanOrEqualToMaxBatchSize() {
-        Exception e =
-                assertThrows(
-                        IllegalArgumentException.class,
+        assertThatThrownBy(
                         () ->
                                 new AsyncSinkWriterImplBuilder()
                                         .context(sinkInitContext)
                                         .maxBufferedRequests(11)
                                         .maxBatchSizeInBytes(10_000)
                                         .maxRecordSizeInBytes(10_001)
-                                        .build());
-        assertEquals(
-                "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.",
-                e.getMessage());
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "The maximum allowed size in bytes per flush must be greater than or equal to"
+                                + " the maximum allowed size in bytes of a single record.");
     }
 
     @Test
@@ -388,17 +398,20 @@ public class AsyncSinkWriterTest {
                         .maxBatchSizeInBytes(10_000)
                         .maxRecordSizeInBytes(3)
                         .build();
-        Exception e = assertThrows(IllegalArgumentException.class, () -> sink.write("3"));
-        assertEquals(
-                "The request entry sent to the buffer was of size [4], when the maxRecordSizeInBytes was set to [3].",
-                e.getMessage());
+
+        assertThatThrownBy(() -> sink.write("3"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The request entry sent to the buffer was of size [4], when "
+                                + "the maxRecordSizeInBytes was set to [3].");
     }
 
     private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
             AsyncSinkWriterImpl sink, String x, List<Integer> y, List<Integer> z)
             throws IOException, InterruptedException {
         sink.write(x);
-        assertEquals(y, res);
+
+        assertThat(res).isEqualTo(y);
         assertThatBufferStatesAreEqual(sink.wrapRequests(z), getWriterState(sink));
     }
 
@@ -418,7 +431,8 @@ public class AsyncSinkWriterTest {
          */
         for (int i = 0; i < 100; i++) {
             sink.write(String.valueOf(i));
-            assertEquals((i / 7) * 7, res.size());
+
+            assertThat(res.size()).isEqualTo((i / 7) * 7);
         }
     }
 
@@ -430,7 +444,8 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1));
         sink.write(String.valueOf(2));
         sink.flush(false);
-        assertEquals(0, res.size());
+
+        assertThat(res.size()).isEqualTo(0);
     }
 
     @Test
@@ -447,11 +462,14 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 7; i++) {
             sink.write(String.valueOf(i));
         }
-        assertEquals(7, res.size());
+
+        assertThat(res.size()).isEqualTo(7);
+
         for (int i = 7; i < 14; i++) {
             sink.write(String.valueOf(i));
         }
-        assertEquals(14, res.size());
+
+        assertThat(res.size()).isEqualTo(14);
     }
 
     @Test
@@ -468,13 +486,17 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); //   Buffer: 104/110B; 2/10 elements; 0 inflight
         sink.write(String.valueOf(2)); //   Buffer: 108/110B; 3/10 elements; 0 inflight
         sink.write(String.valueOf(3)); //   Buffer: 112/110B; 4/10 elements; 0 inflight -- flushing
-        assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed on first attempt
+
+        assertThat(res.size())
+                .isEqualTo(2); // Request was [225, 1, 2], element 225 failed on first attempt
+
         sink.write(String.valueOf(4)); //   Buffer:   8/110B; 2/10 elements; 1 inflight
         sink.write(String.valueOf(5)); //   Buffer:  12/110B; 3/10 elements; 1 inflight
         sink.write(String.valueOf(6)); //   Buffer:  16/110B; 4/10 elements; 1 inflight
         sink.write(String.valueOf(325)); // Buffer: 116/110B; 5/10 elements; 1 inflight -- flushing
+
         // inflight request is processed, buffer: [225, 3, 4, 5, 6, 325]
-        assertEquals(Arrays.asList(1, 2, 225, 3, 4), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
         // Buffer: [5, 6, 325]; 0 inflight
     }
 
@@ -494,14 +516,18 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); //   Buffer: 204/210B; 3/10 elements; 0 inflight
         sink.write(String.valueOf(2)); //   Buffer: 208/210B; 4/10 elements; 0 inflight
         sink.write(String.valueOf(3)); //   Buffer: 212/210B; 5/10 elements; 0 inflight -- flushing
-        assertEquals(2, res.size()); // Request was [228, 225, 1, 2], element 228, 225 failed
+
+        assertThat(res.size())
+                .isEqualTo(2); // Request was [228, 225, 1, 2], element 228, 225 failed
+
         sink.write(String.valueOf(4)); //   Buffer:   8/210B; 2/10 elements; 2 inflight
         sink.write(String.valueOf(5)); //   Buffer:  12/210B; 3/10 elements; 2 inflight
         sink.write(String.valueOf(6)); //   Buffer:  16/210B; 4/10 elements; 2 inflight
         sink.write(String.valueOf(328)); // Buffer: 116/210B; 5/10 elements; 2 inflight
         sink.write(String.valueOf(325)); // Buffer: 216/210B; 6/10 elements; 2 inflight -- flushing
+
         // inflight request is processed, buffer: [228, 225, 3, 4, 5, 6, 328, 325]
-        assertEquals(Arrays.asList(1, 2, 228, 225, 3, 4), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 228, 225, 3, 4));
         // Buffer: [5, 6, 328, 325]; 0 inflight
     }
 
@@ -526,9 +552,12 @@ public class AsyncSinkWriterTest {
         }
 
         tpts.setCurrentTime(99L);
-        assertEquals(0, res.size());
+
+        assertThat(res.size()).isEqualTo(0);
+
         tpts.setCurrentTime(100L);
-        assertEquals(8, res.size());
+
+        assertThat(res.size()).isEqualTo(8);
     }
 
     @Test
@@ -551,9 +580,9 @@ public class AsyncSinkWriterTest {
             sink.write(String.valueOf(i));
         }
         tpts.setCurrentTime(99L);
-        assertEquals(90, res.size());
+        assertThat(res.size()).isEqualTo(90);
         tpts.setCurrentTime(100L);
-        assertEquals(98, res.size());
+        assertThat(res.size()).isEqualTo(98);
     }
 
     @Test
@@ -571,15 +600,17 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
         sink.write(String.valueOf(2)); // buffer: [2], inflight: [225], destination: [0, 1]
 
-        assertEquals(Arrays.asList(0, 1), res);
+        assertThat(res).isEqualTo(Arrays.asList(0, 1));
         assertThatBufferStatesAreEqual(sink.wrapRequests(2), getWriterState(sink));
 
         sink.flush(false); // buffer: [225, 2], inflight: [], destination: [0, 1]
-        assertEquals(Arrays.asList(0, 1), res);
+
+        assertThat(res).isEqualTo(Arrays.asList(0, 1));
         assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), getWriterState(sink));
 
         sink.flush(true); // buffer: [], inflight: [], destination: [0, 1, 225, 2]
-        assertEquals(Arrays.asList(0, 1, 225, 2), res);
+
+        assertThat(res).isEqualTo(Arrays.asList(0, 1, 225, 2));
     }
 
     @Test
@@ -596,11 +627,14 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); //   Buffer: 104/110B; 3/10 elements; 0 inflight
         sink.write(String.valueOf(2)); //   Buffer: 108/110B; 4/10 elements; 0 inflight
         sink.write(String.valueOf(3)); //   Buffer: 112/110B; 5/10 elements; 0 inflight -- flushing
-        assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed
+
+        assertThat(res.size()).isEqualTo(2); // Request was [225, 1, 2], element 225 failed
 
         // buffer should be [3] with [225] inflight
         sink.flush(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight
-        assertEquals(2, res.size()); //
+
+        assertThat(res.size()).isEqualTo(2); //
+
         List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
         AsyncSinkWriterImpl newSink =
                 new AsyncSinkWriterImplBuilder()
@@ -610,7 +644,8 @@ public class AsyncSinkWriterTest {
 
         newSink.write(String.valueOf(4)); //   Buffer:   12/15B; 3/10 elements; 0 inflight
         newSink.write(String.valueOf(5)); //   Buffer:  16/15B; 4/10 elements; 0 inflight --flushing
-        assertEquals(Arrays.asList(1, 2, 225, 3, 4), res);
+
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
         // Buffer: [5]; 0 inflight
     }
 
@@ -687,20 +722,20 @@ public class AsyncSinkWriterTest {
         TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
         tpts.setCurrentTime(0L);
         sink.write("1"); // A timer is registered here to elapse at t=100
-        assertEquals(0, res.size());
+        assertThat(res.size()).isEqualTo(0);
         tpts.setCurrentTime(10L);
         sink.flush(true);
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
         sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         tpts.setCurrentTime(100L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
         sink.write("3");
         tpts.setCurrentTime(199L); // At t=199s, our third element has not been written
-        assertEquals(2, res.size()); // therefore, no timer fired at 120s.
+        assertThat(res.size()).isEqualTo(2); // therefore, no timer fired at 120s.
         tpts.setCurrentTime(200L);
-        assertEquals(3, res.size());
+        assertThat(res.size()).isEqualTo(3);
     }
 
     @Test
@@ -721,13 +756,13 @@ public class AsyncSinkWriterTest {
         sink.write("2");
         sink.write("225");
         tpts.setCurrentTime(100L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
         sink.write("3");
         sink.write("4");
         tpts.setCurrentTime(199L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
         tpts.setCurrentTime(200L);
-        assertEquals(5, res.size());
+        assertThat(res.size()).isEqualTo(5);
     }
 
     @Test
@@ -748,7 +783,7 @@ public class AsyncSinkWriterTest {
         sink.write("1");
         tpts.setCurrentTime(50L);
         sink.flush(true);
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         tpts.setCurrentTime(200L);
     }
 
@@ -769,10 +804,10 @@ public class AsyncSinkWriterTest {
         tpts.setCurrentTime(0L);
         sink.write("1");
         tpts.setCurrentTime(100L);
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         sink.write("2");
         tpts.setCurrentTime(200L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
     }
 
     /**
@@ -806,7 +841,7 @@ public class AsyncSinkWriterTest {
                         true);
 
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
-        assertEquals(Arrays.asList(1, 2, 3, 4), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 3, 4));
     }
 
     /**
@@ -837,7 +872,7 @@ public class AsyncSinkWriterTest {
                         false);
 
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
-        assertEquals(Arrays.asList(4, 1, 2, 3), res);
+        assertThat(res).isEqualTo(Arrays.asList(4, 1, 2, 3));
     }
 
     private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -866,9 +901,9 @@ public class AsyncSinkWriterTest {
         tpts.setCurrentTime(100L);
         blockedWriteLatch.countDown();
         es.shutdown();
-        assertTrue(
-                es.awaitTermination(500, TimeUnit.MILLISECONDS),
-                "Executor Service stuck at termination, not terminated after 500ms!");
+        assertThat(es.awaitTermination(500, TimeUnit.MILLISECONDS))
+                .as("Executor Service stuck at termination, not terminated after 500ms!")
+                .isTrue();
     }
 
     /**
@@ -924,19 +959,19 @@ public class AsyncSinkWriterTest {
                             }
                         });
         Thread.sleep(300);
-        assertFalse(s.isInterrupted());
+        assertThat(s.isInterrupted()).isFalse();
         s.interrupt();
         blockedWriteLatch.countDown();
 
         t.join();
 
-        assertEquals(Arrays.asList(1, 2, 3), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 3));
     }
 
     private BufferedRequestState<Integer> getWriterState(
             AsyncSinkWriter<String, Integer> sinkWriter) {
         List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState(1);
-        assertEquals(states.size(), 1);
+        assertThat(states.size()).isEqualTo(1);
         return states.get(0);
     }
 
@@ -1205,10 +1240,11 @@ public class AsyncSinkWriterTest {
                 try {
                     delayedStartLatch.countDown();
                     if (blockForLimitedTime) {
-                        assertFalse(
-                                blockedThreadLatch.await(500, TimeUnit.MILLISECONDS),
-                                "The countdown latch was released before the full amount"
-                                        + "of time was reached.");
+                        assertThat(blockedThreadLatch.await(500, TimeUnit.MILLISECONDS))
+                                .as(
+                                        "The countdown latch was released before the full amount"
+                                                + "of time was reached.")
+                                .isFalse();
                     } else {
                         blockedThreadLatch.await();
                     }

[flink] 02/04: [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4aec117eb4693f978f2d95580d72a12d14638e85
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 16:16:30 2022 +0100

    [FLINK-26420][Connector-base] use numRecordsSendCounter from SinkWriterMetricGroup directly.
    
    (cherry picked from commit 4e0c24a82e9b0fd35ca23610ba396a932b0f41b8)
---
 .../flink/connector/base/sink/writer/AsyncSinkWriter.java    | 12 ++++++------
 .../connector/base/sink/writer/TestSinkInitContext.java      |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f45a3a5..090504a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     private final SinkWriterMetricGroup metrics;
 
     /* Counter for number of bytes this sink has attempted to send to the destination. */
-    private final Counter numBytesOutCounter;
+    private final Counter numBytesSendCounter;
 
     /* Counter for number of records this sink has attempted to send to the destination. */
-    private final Counter numRecordsOutCounter;
+    private final Counter numRecordsSendCounter;
 
     /**
      * Rate limiting strategy {@code inflightMessages} at any given time, {@code
@@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
 
         this.metrics = context.metricGroup();
         this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
-        this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
-        this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
+        this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
+        this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
 
         this.fatalExceptionCons =
                 exception ->
@@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
             batchSizeBytes += requestEntrySize;
         }
 
-        numRecordsOutCounter.inc(batch.size());
-        numBytesOutCounter.inc(batchSizeBytes);
+        numRecordsSendCounter.inc(batch.size());
+        numBytesSendCounter.inc(batchSizeBytes);
 
         return batch;
     }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index b146190..a7e4979 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext {
     }
 
     public Counter getNumRecordsOutCounter() {
-        return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
+        return metricGroup.getNumRecordsSendCounter();
     }
 
     public Counter getNumBytesOutCounter() {
-        return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
+        return metricGroup.getNumBytesSendCounter();
     }
 }

[flink] 03/04: [FLINK-26420][test] migrate FileWriterTest to AssertJ.

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06b2b1d81f851a77f7b39af5bbceaed3248126f0
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 17:35:00 2022 +0100

    [FLINK-26420][test] migrate FileWriterTest to AssertJ.
    
    (cherry picked from commit c34160ba643cfe562ff37241da007b1d28570c70)
---
 .../connector/file/sink/writer/FileWriterTest.java | 156 +++++++++++----------
 1 file changed, 85 insertions(+), 71 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index 8966ca7..c0a5ac6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -42,13 +42,10 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
 import org.apache.flink.util.ExceptionUtils;
 
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -62,27 +59,21 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.concurrent.ScheduledFuture;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link FileWriter}. */
 public class FileWriterTest {
 
-    @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
-
     private MetricListener metricListener;
 
-    @Before
+    @BeforeEach
     public void setUp() {
         metricListener = new MetricListener();
     }
 
     @Test
-    public void testPreCommit() throws Exception {
-        File outDir = TEMP_FOLDER.newFolder();
-        Path path = new Path(outDir.toURI());
+    public void testPreCommit(@TempDir java.nio.file.Path tempDir) throws Exception {
+        Path path = new Path(tempDir.toUri());
 
         FileWriter<String> fileWriter =
                 createWriter(
@@ -95,13 +86,13 @@ public class FileWriterTest {
         fileWriter.write("test3", new ContextImpl());
 
         Collection<FileSinkCommittable> committables = fileWriter.prepareCommit();
-        assertEquals(3, committables.size());
+
+        assertThat(committables.size()).isEqualTo(3);
     }
 
     @Test
-    public void testSnapshotAndRestore() throws Exception {
-        File outDir = TEMP_FOLDER.newFolder();
-        Path path = new Path(outDir.toURI());
+    public void testSnapshotAndRestore(@TempDir java.nio.file.Path tempDir) throws Exception {
+        Path path = new Path(tempDir.toUri());
 
         FileWriter<String> fileWriter =
                 createWriter(
@@ -112,11 +103,12 @@ public class FileWriterTest {
         fileWriter.write("test1", new ContextImpl());
         fileWriter.write("test2", new ContextImpl());
         fileWriter.write("test3", new ContextImpl());
-        assertEquals(3, fileWriter.getActiveBuckets().size());
+        assertThat(fileWriter.getActiveBuckets().size()).isEqualTo(3);
 
         fileWriter.prepareCommit();
         List<FileWriterBucketState> states = fileWriter.snapshotState(1L);
-        assertEquals(3, states.size());
+
+        assertThat(states.size()).isEqualTo(3);
 
         fileWriter =
                 restoreWriter(
@@ -124,18 +116,20 @@ public class FileWriterTest {
                         path,
                         OnCheckpointRollingPolicy.build(),
                         new OutputFileConfig("part-", ""));
-        assertEquals(
-                fileWriter.getActiveBuckets().keySet(),
-                new HashSet<>(Arrays.asList("test1", "test2", "test3")));
+
+        assertThat(fileWriter.getActiveBuckets().keySet())
+                .isEqualTo(new HashSet<>(Arrays.asList("test1", "test2", "test3")));
+
         for (FileWriterBucket<String> bucket : fileWriter.getActiveBuckets().values()) {
-            assertNotNull("The in-progress file should be recovered", bucket.getInProgressPart());
+            assertThat(bucket.getInProgressPart())
+                    .as("The in-progress file should be recovered")
+                    .isNotNull();
         }
     }
 
     @Test
-    public void testMergingForRescaling() throws Exception {
-        File outDir = TEMP_FOLDER.newFolder();
-        Path path = new Path(outDir.toURI());
+    public void testMergingForRescaling(@TempDir java.nio.file.Path tempDir) throws Exception {
+        Path path = new Path(tempDir.toUri());
 
         FileWriter<String> firstFileWriter =
                 createWriter(
@@ -172,27 +166,34 @@ public class FileWriterTest {
                         path,
                         DefaultRollingPolicy.builder().build(),
                         new OutputFileConfig("part-", ""));
-        assertEquals(3, restoredWriter.getActiveBuckets().size());
+
+        assertThat(restoredWriter.getActiveBuckets().size()).isEqualTo(3);
 
         // Merged buckets
         for (String bucketId : Arrays.asList("test1", "test2")) {
             FileWriterBucket<String> bucket = restoredWriter.getActiveBuckets().get(bucketId);
-            assertNotNull("The in-progress file should be recovered", bucket.getInProgressPart());
-            assertEquals(1, bucket.getPendingFiles().size());
+
+            assertThat(bucket.getInProgressPart())
+                    .as("The in-progress file should be recovered")
+                    .isNotNull();
+            assertThat(bucket.getPendingFiles().size()).isEqualTo(1);
         }
 
         // Not merged buckets
         for (String bucketId : Collections.singletonList("test3")) {
             FileWriterBucket<String> bucket = restoredWriter.getActiveBuckets().get(bucketId);
-            assertNotNull("The in-progress file should be recovered", bucket.getInProgressPart());
-            assertEquals(0, bucket.getPendingFiles().size());
+
+            assertThat(bucket.getInProgressPart())
+                    .as("The in-progress file should be recovered")
+                    .isNotNull();
+            assertThat(bucket.getPendingFiles().size()).isEqualTo(0);
         }
     }
 
     @Test
-    public void testBucketIsRemovedWhenNotActive() throws Exception {
-        File outDir = TEMP_FOLDER.newFolder();
-        Path path = new Path(outDir.toURI());
+    public void testBucketIsRemovedWhenNotActive(@TempDir java.nio.file.Path tempDir)
+            throws Exception {
+        Path path = new Path(tempDir.toUri());
 
         FileWriter<String> fileWriter =
                 createWriter(
@@ -205,13 +206,12 @@ public class FileWriterTest {
         // No more records and another call to prepareCommit will makes it inactive
         fileWriter.prepareCommit();
 
-        assertTrue(fileWriter.getActiveBuckets().isEmpty());
+        assertThat(fileWriter.getActiveBuckets().isEmpty()).isTrue();
     }
 
     @Test
-    public void testOnProcessingTime() throws Exception {
-        File outDir = TEMP_FOLDER.newFolder();
-        Path path = new Path(outDir.toURI());
+    public void testOnProcessingTime(@TempDir java.nio.file.Path tempDir) throws Exception {
+        Path path = new Path(tempDir.toUri());
 
         // Create the processing timer service starts from 10.
         ManuallyTriggeredProcessingTimeService processingTimeService =
@@ -237,15 +237,18 @@ public class FileWriterTest {
         processingTimeService.advanceTo(20);
 
         FileWriterBucket<String> test1Bucket = fileWriter.getActiveBuckets().get("test1");
-        assertNull(
-                "The in-progress part of test1 should be rolled", test1Bucket.getInProgressPart());
-        assertEquals(1, test1Bucket.getPendingFiles().size());
+
+        assertThat(test1Bucket.getInProgressPart())
+                .as("The in-progress part of test1 should be rolled")
+                .isNull();
+        assertThat(test1Bucket.getPendingFiles().size()).isEqualTo(1);
 
         FileWriterBucket<String> test2Bucket = fileWriter.getActiveBuckets().get("test2");
-        assertNotNull(
-                "The in-progress part of test2 should not be rolled",
-                test2Bucket.getInProgressPart());
-        assertEquals(0, test2Bucket.getPendingFiles().size());
+
+        assertThat(test2Bucket.getInProgressPart())
+                .as("The in-progress part of test2 should not be rolled")
+                .isNotNull();
+        assertThat(test2Bucket.getPendingFiles().size()).isEqualTo(0);
 
         // Close, pre-commit & clear all the pending records.
         processingTimeService.advanceTo(30);
@@ -258,36 +261,43 @@ public class FileWriterTest {
         processingTimeService.advanceTo(40);
 
         test1Bucket = fileWriter.getActiveBuckets().get("test1");
-        assertNull(
-                "The in-progress part of test1 should be rolled", test1Bucket.getInProgressPart());
-        assertEquals(1, test1Bucket.getPendingFiles().size());
+
+        assertThat(test1Bucket.getInProgressPart())
+                .as("The in-progress part of test1 should be rolled")
+                .isNull();
+        assertThat(test1Bucket.getPendingFiles().size()).isEqualTo(1);
 
         test2Bucket = fileWriter.getActiveBuckets().get("test2");
-        assertNotNull(
-                "The in-progress part of test2 should not be rolled",
-                test2Bucket.getInProgressPart());
-        assertEquals(0, test2Bucket.getPendingFiles().size());
+
+        assertThat(test2Bucket.getInProgressPart())
+                .as("The in-progress part of test2 should not be rolled")
+                .isNotNull();
+        assertThat(test2Bucket.getPendingFiles().size()).isEqualTo(0);
     }
 
     @Test
-    public void testContextPassingNormalExecution() throws Exception {
-        testCorrectTimestampPassingInContext(1L, 2L, 3L);
+    public void testContextPassingNormalExecution(@TempDir java.nio.file.Path tempDir)
+            throws Exception {
+        testCorrectTimestampPassingInContext(1L, 2L, 3L, tempDir);
     }
 
     @Test
-    public void testContextPassingNullTimestamp() throws Exception {
-        testCorrectTimestampPassingInContext(null, 4L, 5L);
+    public void testContextPassingNullTimestamp(@TempDir java.nio.file.Path tempDir)
+            throws Exception {
+        testCorrectTimestampPassingInContext(null, 4L, 5L, tempDir);
     }
 
     @Test
-    public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
+    public void testNumberRecordsOutCounter(@TempDir java.nio.file.Path tempDir)
+            throws IOException, InterruptedException {
+        Path path = new Path(tempDir.toUri());
+
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
         final SinkWriterMetricGroup sinkWriterMetricGroup =
                 InternalSinkWriterMetricGroup.mock(
                         metricListener.getMetricGroup(), operatorIOMetricGroup);
-        File outDir = TEMP_FOLDER.newFolder();
-        Path path = new Path(outDir.toURI());
+
         Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
         SinkWriter.Context context = new ContextImpl();
         FileWriter<String> fileWriter =
@@ -297,18 +307,22 @@ public class FileWriterTest {
                         new OutputFileConfig("part-", ""),
                         sinkWriterMetricGroup);
 
-        assertEquals(0, recordsCounter.getCount());
+        assertThat(recordsCounter.getCount()).isEqualTo(0);
+
         fileWriter.write("1", context);
-        assertEquals(1, recordsCounter.getCount());
+
+        assertThat(recordsCounter.getCount()).isEqualTo(1);
+
         fileWriter.write("2", context);
         fileWriter.write("3", context);
-        assertEquals(3, recordsCounter.getCount());
+
+        assertThat(recordsCounter.getCount()).isEqualTo(3);
     }
 
     private void testCorrectTimestampPassingInContext(
-            Long timestamp, long watermark, long processingTime) throws Exception {
-        final File outDir = TEMP_FOLDER.newFolder();
-        final Path path = new Path(outDir.toURI());
+            Long timestamp, long watermark, long processingTime, java.nio.file.Path tempDir)
+            throws Exception {
+        Path path = new Path(tempDir.toUri());
 
         // Create the processing timer service starts from 10.
         ManuallyTriggeredProcessingTimeService processingTimeService =
@@ -416,9 +430,9 @@ public class FileWriterTest {
             long watermark = context.currentWatermark();
             long processingTime = context.currentProcessingTime();
 
-            Assert.assertEquals(expectedTimestamp, elementTimestamp);
-            Assert.assertEquals(expectedProcessingTime, processingTime);
-            Assert.assertEquals(expectedWatermark, watermark);
+            assertThat(elementTimestamp).isEqualTo(expectedTimestamp);
+            assertThat(processingTime).isEqualTo(expectedProcessingTime);
+            assertThat(watermark).isEqualTo(expectedWatermark);
 
             return element;
         }