You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/05/18 20:17:35 UTC
[flink] 01/02: [hotfix][tests] Move test checkpoint component
classes to upper level
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ee349451fc592991cccf3a4a9391fc38cc433739
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Feb 17 10:46:35 2021 +0100
[hotfix][tests] Move test checkpoint component classes to upper level
---
.../ExecutionGraphCheckpointCoordinatorTest.java | 86 ----------------------
.../checkpoint/TestingCheckpointIDCounter.java | 55 ++++++++++++++
.../TestingCheckpointRecoveryFactory.java | 44 +++++++++++
.../TestingCompletedCheckpointStore.java | 77 +++++++++++++++++++
4 files changed, 176 insertions(+), 86 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 6f0ec4b..5e08c4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -38,7 +38,6 @@ import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.Matchers.is;
@@ -168,89 +167,4 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
return executionGraph;
}
-
- private static final class TestingCheckpointIDCounter implements CheckpointIDCounter {
-
- private final CompletableFuture<JobStatus> shutdownStatus;
-
- private TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
- this.shutdownStatus = shutdownStatus;
- }
-
- @Override
- public void start() {}
-
- @Override
- public void shutdown(JobStatus jobStatus) {
- shutdownStatus.complete(jobStatus);
- }
-
- @Override
- public long getAndIncrement() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public long get() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public void setCount(long newId) {
- throw new UnsupportedOperationException("Not implemented.");
- }
- }
-
- private static final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
-
- private final CompletableFuture<JobStatus> shutdownStatus;
-
- private TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
- this.shutdownStatus = shutdownStatus;
- }
-
- @Override
- public void recover() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public void addCheckpoint(
- CompletedCheckpoint checkpoint,
- CheckpointsCleaner checkpointsCleaner,
- Runnable postCleanup) {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public void shutdown(
- JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) {
- shutdownStatus.complete(jobStatus);
- }
-
- @Override
- public List<CompletedCheckpoint> getAllCheckpoints() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public int getNumberOfRetainedCheckpoints() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public int getMaxNumberOfRetainedCheckpoints() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public boolean requiresExternalizedCheckpoints() {
- throw new UnsupportedOperationException("Not implemented.");
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
new file mode 100644
index 0000000..79df955
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointIDCounter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CheckpointIDCounter} implementation for testing the shutdown behavior. */
+public final class TestingCheckpointIDCounter implements CheckpointIDCounter {
+
+ private final CompletableFuture<JobStatus> shutdownStatus;
+
+ public TestingCheckpointIDCounter(CompletableFuture<JobStatus> shutdownStatus) {
+ this.shutdownStatus = shutdownStatus;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void shutdown(JobStatus jobStatus) {
+ shutdownStatus.complete(jobStatus);
+ }
+
+ @Override
+ public long getAndIncrement() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public long get() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public void setCount(long newId) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..cda32d6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+
+/** A {@link CheckpointRecoveryFactory} that pre-defined checkpointing components. */
+public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
+
+ private final CompletedCheckpointStore store;
+ private final CheckpointIDCounter counter;
+
+ public TestingCheckpointRecoveryFactory(
+ CompletedCheckpointStore store, CheckpointIDCounter counter) {
+ this.store = store;
+ this.counter = counter;
+ }
+
+ @Override
+ public CompletedCheckpointStore createCheckpointStore(
+ JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) {
+ return store;
+ }
+
+ @Override
+ public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+ return counter;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
new file mode 100644
index 0000000..4182330
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobStatus;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Test {@link CompletedCheckpointStore} implementation for testing the shutdown behavior. */
+public final class TestingCompletedCheckpointStore implements CompletedCheckpointStore {
+
+ private final CompletableFuture<JobStatus> shutdownStatus;
+
+ public TestingCompletedCheckpointStore(CompletableFuture<JobStatus> shutdownStatus) {
+ this.shutdownStatus = shutdownStatus;
+ }
+
+ @Override
+ public void recover() {}
+
+ @Override
+ public void addCheckpoint(
+ CompletedCheckpoint checkpoint,
+ CheckpointsCleaner checkpointsCleaner,
+ Runnable postCleanup) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) {
+ return null;
+ }
+
+ @Override
+ public void shutdown(
+ JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner, Runnable postCleanup)
+ throws Exception {
+ shutdownStatus.complete(jobStatus);
+ }
+
+ @Override
+ public List<CompletedCheckpoint> getAllCheckpoints() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public int getNumberOfRetainedCheckpoints() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public int getMaxNumberOfRetainedCheckpoints() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public boolean requiresExternalizedCheckpoints() {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+}