You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/18 20:17:35 UTC

[flink] 01/02: [hotfix][tests] Move test checkpoint component classes to upper level

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee349451fc592991cccf3a4a9391fc38cc433739
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Feb 17 10:46:35 2021 +0100

    [hotfix][tests] Move test checkpoint component classes to upper level
---
 .../ExecutionGraphCheckpointCoordinatorTest.java   | 86 ----------------------
 .../checkpoint/TestingCheckpointIDCounter.java     | 55 ++++++++++++++
 .../TestingCheckpointRecoveryFactory.java          | 44 +++++++++++
 .../TestingCompletedCheckpointStore.java           | 77 +++++++++++++++++++
 4 files changed, 176 insertions(+), 86 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 6f0ec4b..5e08c4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -38,7 +38,6 @@ import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.Matchers.is;
@@ -168,89 +167,4 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 
         return executionGraph;
     }
-
-    private static final class TestingCheckpointIDCounter implements CheckpointIDCounter {
-
-        private final CompletableFuture<JobStatus> shutdownStatus;
-
-        private TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
-            this.shutdownStatus = shutdownStatus;
-        }
-
-        @Override
-        public void start() {}
-
-        @Override
-        public void shutdown(JobStatus jobStatus) {
-            shutdownStatus.complete(jobStatus);
-        }
-
-        @Override
-        public long getAndIncrement() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public long get() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void setCount(long newId) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-    }
-
-    private static final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
-
-        private final CompletableFuture<JobStatus> shutdownStatus;
-
-        private TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
-            this.shutdownStatus = shutdownStatus;
-        }
-
-        @Override
-        public void recover() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void addCheckpoint(
-                CompletedCheckpoint checkpoint,
-                CheckpointsCleaner checkpointsCleaner,
-                Runnable postCleanup) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public void shutdown(
-                JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) {
-            shutdownStatus.complete(jobStatus);
-        }
-
-        @Override
-        public List<CompletedCheckpoint> getAllCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public int getNumberOfRetainedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public int getMaxNumberOfRetainedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
-        public boolean requiresExternalizedCheckpoints() {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-    }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
new file mode 100644
index 0000000..79df955
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CheckpointIDCounter} implementation for testing the shutdown behavior. */
+public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
+
+    private final CompletableFuture<JobStatus> shutdownStatus;
+
+    public TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
+        this.shutdownStatus = shutdownStatus;
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void shutdown(JobStatus jobStatus) {
+        shutdownStatus.complete(jobStatus);
+    }
+
+    @Override
+    public long getAndIncrement() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public long get() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public void setCount(long newId) {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..cda32d6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/** A {@link CheckpointRecoveryFactory} that pre-defined checkpointing components. */
+public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+    private final CompletedCheckpointStore store;
+    private final CheckpointIDCounter counter;
+
+    public TestingCheckpointRecoveryFactory(
+            CompletedCheckpointStore store, CheckpointIDCounter counter) {
+        this.store = store;
+        this.counter = counter;
+    }
+
+    @Override
+    public CompletedCheckpointStore createCheckpointStore(
+            JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
+        return store;
+    }
+
+    @Override
+    public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+        return counter;
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
new file mode 100644
index 0000000..4182330
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CompletedCheckpointStore} implementation for testing the shutdown behavior. */
+public final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+    private final CompletableFuture<JobStatus> shutdownStatus;
+
+    public TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
+        this.shutdownStatus = shutdownStatus;
+    }
+
+    @Override
+    public void recover() {}
+
+    @Override
+    public void addCheckpoint(
+            CompletedCheckpoint checkpoint,
+            CheckpointsCleaner checkpointsCleaner,
+            Runnable postCleanup) {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
+        return null;
+    }
+
+    @Override
+    public void shutdown(
+            JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup)
+            throws Exception {
+        shutdownStatus.complete(jobStatus);
+    }
+
+    @Override
+    public List<CompletedCheckpoint> getAllCheckpoints() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public int getNumberOfRetainedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public int getMaxNumberOfRetainedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+
+    @Override
+    public boolean requiresExternalizedCheckpoints() {
+        throw new UnsupportedOperationException("Not implemented.");
+    }
+}