You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2022/12/04 13:57:48 UTC

[flink] 02/02: [hotfix][tests] Fixed checkstyle of UnalignedCheckpointStressITCase

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e1cefe250972b213ac69322531a4de69ca0e1a6
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Thu Dec 1 15:18:09 2022 +0100

    [hotfix][tests] Fixed checkstyle of UnalignedCheckpointStressITCase
---
 .../UnalignedCheckpointStressITCase.java           | 30 ++++++++++------------
 1 file changed, 14 insertions(+), 16 deletions(-)

diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
index 402e8c8bcda..0a11db9d6c3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointStressITCase.java
@@ -82,7 +82,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
 import static org.apache.flink.configuration.CheckpointingOptions.MAX_RETAINED_CHECKPOINTS;
 import static org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.base.Preconditions.checkState;
@@ -154,11 +156,10 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
     @Test
     public void runStressTest() throws Exception {
         Deadline deadline = Deadline.fromNow(Duration.ofMillis(TEST_DURATION));
-        Optional<File> externalizedCheckpoint = Optional.empty();
+        File externalizedCheckpoint = null;
         while (deadline.hasTimeLeft()) {
-            externalizedCheckpoint =
-                    Optional.of(runAndTakeExternalCheckpoint(externalizedCheckpoint));
-            cleanDirectoryExcept(externalizedCheckpoint.get());
+            externalizedCheckpoint = runAndTakeExternalCheckpoint(externalizedCheckpoint);
+            cleanDirectoryExcept(externalizedCheckpoint);
         }
     }
 
@@ -215,7 +216,7 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
 
         DataStream<Record> stream =
                 sources.rebalance()
-                        .map((MapFunction<Record, Record>) value -> value.validate())
+                        .map((MapFunction<Record, Record>) Record::validate)
                         .keyBy(Record::getSourceId)
                         // add small throttling to prevent WindowOperator from blowing up
                         .map(new ThrottlingMap(100));
@@ -233,20 +234,20 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
 
     private void cleanDirectoryExcept(File externalizedCheckpoint) throws IOException {
         File directoryToKeep = externalizedCheckpoint.getParentFile();
-        for (File directory : temporaryFolder.getRoot().listFiles()) {
+        for (File directory : requireNonNull(temporaryFolder.getRoot().listFiles())) {
             if (!directory.equals(directoryToKeep)) {
                 FileUtils.deleteDirectory(directory);
             }
         }
     }
 
-    private File runAndTakeExternalCheckpoint(Optional<File> startingCheckpoint) throws Exception {
+    private File runAndTakeExternalCheckpoint(@Nullable File startingCheckpoint) throws Exception {
 
         StreamExecutionEnvironment env = defineEnvironment();
         testProgram(env);
 
         StreamGraph streamGraph = env.getStreamGraph();
-        startingCheckpoint
+        Optional.ofNullable(startingCheckpoint)
                 .map(File::toString)
                 .map(SavepointRestoreSettings::forPath)
                 .ifPresent(streamGraph::setSavepointRestoreSettings);
@@ -284,9 +285,10 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
             checkpointDir = fileVisitor.getMaxCheckpointDir();
         }
         if (checkpointDir == null) {
-            List<Path> files =
-                    Files.walk(Paths.get(rootDir.getPath())).collect(Collectors.toList());
-            throw new IllegalStateException("Failed to find _metadata file among " + files);
+            try (Stream<Path> savepoint = Files.walk(Paths.get(rootDir.getPath()))) {
+                List<Path> files = savepoint.collect(Collectors.toList());
+                throw new IllegalStateException("Failed to find _metadata file among " + files);
+            }
         }
         return checkpointDir.toFile();
     }
@@ -439,7 +441,7 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
                     context.getOperatorStateStore()
                             .getListState(new ListStateDescriptor<>("state", Long.class));
             // We are not supporting rescaling
-            nextValue = getOnlyElement(nextState.get(), 0L);
+            nextValue = requireNonNull(getOnlyElement(nextState.get(), 0L));
         }
     }
 
@@ -491,10 +493,6 @@ public class UnalignedCheckpointStressITCase extends TestLogger {
         public long value;
         public byte[] payload;
 
-        public Record() {
-            this(0, 0, SMALL_RECORD_SIZE);
-        }
-
         public Record(int sourceId, long value, int payloadSize) {
             this.sourceId = sourceId;
             this.payload = new byte[payloadSize];