You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 08:12:55 UTC
[flink] 01/02: [hotfix][tests] Add a TestingSchedulerFactory to
help instantiating the scheduler in tests
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a44cf4d912cf85c03077c0b766814e877b41e7b1
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat Dec 7 14:47:43 2019 +0100
[hotfix][tests] Add a TestingSchedulerFactory to help instantiating the scheduler in tests
---
.../ManuallyTriggeredScheduledExecutorService.java | 98 ++++++++++++
.../runtime/scheduler/DefaultSchedulerTest.java | 56 +------
.../runtime/scheduler/SchedulerTestingUtils.java | 170 +++++++++++++++++++++
3 files changed, 271 insertions(+), 53 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
new file mode 100644
index 0000000..7524f76
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutorService} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutorService extends ManuallyTriggeredScheduledExecutor implements ScheduledExecutorService {
+
+ private boolean shutdown;
+
+ @Override
+ public void shutdown() {
+ shutdown = true;
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ shutdown();
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return shutdown;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) {
+ return true;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index cce795d..2e9a2fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -48,10 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
@@ -77,7 +73,6 @@ import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -88,6 +83,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.hamcrest.Matchers.contains;
@@ -688,54 +686,6 @@ public class DefaultSchedulerTest extends TestLogger {
new TaskExecutionState(scheduler.getJobGraph().getJobID(), attemptId, ExecutionState.FINISHED));
}
- private void acknowledgePendingCheckpoint(final SchedulerBase scheduler, final long checkpointId) throws Exception {
- final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
-
- for (ArchivedExecutionVertex executionVertex : scheduler.requestJob().getAllExecutionVertices()) {
- final ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
- final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- scheduler.getJobGraph().getJobID(),
- attemptId,
- checkpointId);
- checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
- }
- }
-
- private void enableCheckpointing(final JobGraph jobGraph) {
- final List<JobVertexID> triggerVertices = new ArrayList<>();
- final List<JobVertexID> ackVertices = new ArrayList<>();
- final List<JobVertexID> commitVertices = new ArrayList<>();
-
- for (JobVertex vertex : jobGraph.getVertices()) {
- if (vertex.isInputVertex()) {
- triggerVertices.add(vertex.getID());
- }
- commitVertices.add(vertex.getID());
- ackVertices.add(vertex.getID());
- }
-
- jobGraph.setSnapshotSettings(
- new JobCheckpointingSettings(
- triggerVertices,
- ackVertices,
- commitVertices,
- new CheckpointCoordinatorConfiguration(
- Long.MAX_VALUE, // disable periodical checkpointing
- 10 * 60 * 1000,
- 0,
- 1,
- CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- false,
- false,
- 0),
- null));
- }
-
- private CheckpointCoordinator getCheckpointCoordinator(final SchedulerBase scheduler) {
- // TODO: get CheckpointCoordinator from the scheduler directly after it is factored out from ExecutionGraph
- return scheduler.getExecutionGraph().getCheckpointCoordinator();
- }
-
private void waitForTermination(final DefaultScheduler scheduler) throws Exception {
scheduler.getTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
new file mode 100644
index 0000000..5297fac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A utility class to create {@link DefaultScheduler} instances for testing.
+ */
+public class SchedulerTestingUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class);
+
+ private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
+
+ public static DefaultScheduler createScheduler(
+ JobGraph jobGraph,
+ ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
+
+ return new DefaultScheduler(
+ LOG,
+ jobGraph,
+ VoidBackPressureStatsTracker.INSTANCE,
+ Executors.directExecutor(),
+ new Configuration(),
+ new SimpleSlotProvider(jobGraph.getJobID(), 0),
+ asyncExecutor,
+ asyncExecutor,
+ ClassLoader.getSystemClassLoader(),
+ new StandaloneCheckpointRecoveryFactory(),
+ Time.seconds(300),
+ VoidBlobWriter.getInstance(),
+ UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+ Time.seconds(300),
+ NettyShuffleMaster.INSTANCE,
+ NoOpJobMasterPartitionTracker.INSTANCE,
+ new EagerSchedulingStrategy.Factory(),
+ new RestartPipelinedRegionFailoverStrategy.Factory(),
+ new TestRestartBackoffTimeStrategy(true, 0),
+ new DefaultExecutionVertexOperations(),
+ new ExecutionVertexVersioner(),
+ new TestExecutionSlotAllocatorFactory());
+ }
+
+ public static void enableCheckpointing(final JobGraph jobGraph) {
+ final List<JobVertexID> triggerVertices = new ArrayList<>();
+ final List<JobVertexID> allVertices = new ArrayList<>();
+
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ if (vertex.isInputVertex()) {
+ triggerVertices.add(vertex.getID());
+ }
+ allVertices.add(vertex.getID());
+ }
+
+ final CheckpointCoordinatorConfiguration config = new CheckpointCoordinatorConfiguration(
+ Long.MAX_VALUE, // disable periodical checkpointing
+ DEFAULT_CHECKPOINT_TIMEOUT_MS,
+ 0,
+ 1,
+ CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+ false,
+ false,
+ 0);
+
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
+ triggerVertices, allVertices, allVertices,
+ config, null));
+ }
+
+ public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
+ return StreamSupport.stream(scheduler.requestJob().getAllExecutionVertices().spliterator(), false)
+ .map((vertex) -> vertex.getCurrentExecutionAttempt().getAttemptId())
+ .collect(Collectors.toList());
+ }
+
+ public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
+ final JobID jid = scheduler.requestJob().getJobID();
+ getAllCurrentExecutionAttempts(scheduler).forEach(
+ (attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.RUNNING))
+ );
+ }
+
+ public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
+ final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+ final JobID jid = scheduler.requestJob().getJobID();
+
+ for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
+ final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
+ }
+ }
+
+ public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
+ final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+ checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+
+ assertEquals("test setup inconsistent", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ final PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
+ final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture();
+
+ acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());
+
+ CompletedCheckpoint completed = future.getNow(null);
+ assertNotNull("checkpoint not complete", completed);
+ return completed;
+ }
+
+ @SuppressWarnings("deprecation")
+ public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) {
+ return scheduler.getExecutionGraph().getCheckpointCoordinator();
+ }
+}