You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/14 08:12:55 UTC

[flink] 01/02: [hotfix][tests] Add a TestingSchedulerFactory to help instantiating the scheduler in tests

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a44cf4d912cf85c03077c0b766814e877b41e7b1
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat Dec 7 14:47:43 2019 +0100

    [hotfix][tests] Add a TestingSchedulerFactory to help instantiating the scheduler in tests
---
 .../ManuallyTriggeredScheduledExecutorService.java |  98 ++++++++++++
 .../runtime/scheduler/DefaultSchedulerTest.java    |  56 +------
 .../runtime/scheduler/SchedulerTestingUtils.java   | 170 +++++++++++++++++++++
 3 files changed, 271 insertions(+), 53 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
new file mode 100644
index 0000000..7524f76
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutorService.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Simple {@link ScheduledExecutorService} implementation for testing purposes.
+ */
+public class ManuallyTriggeredScheduledExecutorService extends ManuallyTriggeredScheduledExecutor implements ScheduledExecutorService {
+
+	private boolean shutdown;
+
+	@Override
+	public void shutdown() {
+		shutdown = true;
+	}
+
+	@Override
+	public List<Runnable> shutdownNow() {
+		shutdown();
+		return Collections.emptyList();
+	}
+
+	@Override
+	public boolean isShutdown() {
+		return false;
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return shutdown;
+	}
+
+	@Override
+	public boolean awaitTermination(long timeout, TimeUnit unit) {
+		return true;
+	}
+
+	@Override
+	public <T> Future<T> submit(Callable<T> task) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> Future<T> submit(Runnable task, T result) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Future<?> submit(Runnable task) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index cce795d..2e9a2fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.hooks.TestMasterHook;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -48,10 +47,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
@@ -77,7 +73,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -88,6 +83,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
+import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.getCheckpointCoordinator;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.hamcrest.Matchers.contains;
@@ -688,54 +686,6 @@ public class DefaultSchedulerTest extends TestLogger {
 			new TaskExecutionState(scheduler.getJobGraph().getJobID(), attemptId, ExecutionState.FINISHED));
 	}
 
-	private void acknowledgePendingCheckpoint(final SchedulerBase scheduler, final long checkpointId) throws Exception {
-		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
-
-		for (ArchivedExecutionVertex executionVertex : scheduler.requestJob().getAllExecutionVertices()) {
-			final ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
-			final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
-				scheduler.getJobGraph().getJobID(),
-				attemptId,
-				checkpointId);
-			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
-		}
-	}
-
-	private void enableCheckpointing(final JobGraph jobGraph) {
-		final List<JobVertexID> triggerVertices = new ArrayList<>();
-		final List<JobVertexID> ackVertices = new ArrayList<>();
-		final List<JobVertexID> commitVertices = new ArrayList<>();
-
-		for (JobVertex vertex : jobGraph.getVertices()) {
-			if (vertex.isInputVertex()) {
-				triggerVertices.add(vertex.getID());
-			}
-			commitVertices.add(vertex.getID());
-			ackVertices.add(vertex.getID());
-		}
-
-		jobGraph.setSnapshotSettings(
-			new JobCheckpointingSettings(
-				triggerVertices,
-				ackVertices,
-				commitVertices,
-				new CheckpointCoordinatorConfiguration(
-					Long.MAX_VALUE, // disable periodical checkpointing
-					10 * 60 * 1000,
-					0,
-					1,
-					CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
-					false,
-					false,
-					0),
-				null));
-	}
-
-	private CheckpointCoordinator getCheckpointCoordinator(final SchedulerBase scheduler) {
-		// TODO: get CheckpointCoordinator from the scheduler directly after it is factored out from ExecutionGraph
-		return scheduler.getExecutionGraph().getCheckpointCoordinator();
-	}
-
 	private void waitForTermination(final DefaultScheduler scheduler) throws Exception {
 		scheduler.getTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
new file mode 100644
index 0000000..5297fac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
+import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A utility class to create {@link DefaultScheduler} instances for testing.
+ */
+public class SchedulerTestingUtils {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class);
+
+	private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000;
+
+	public static DefaultScheduler createScheduler(
+			JobGraph jobGraph,
+			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
+
+		return new DefaultScheduler(
+			LOG,
+			jobGraph,
+			VoidBackPressureStatsTracker.INSTANCE,
+			Executors.directExecutor(),
+			new Configuration(),
+			new SimpleSlotProvider(jobGraph.getJobID(), 0),
+			asyncExecutor,
+			asyncExecutor,
+			ClassLoader.getSystemClassLoader(),
+			new StandaloneCheckpointRecoveryFactory(),
+			Time.seconds(300),
+			VoidBlobWriter.getInstance(),
+			UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(),
+			Time.seconds(300),
+			NettyShuffleMaster.INSTANCE,
+			NoOpJobMasterPartitionTracker.INSTANCE,
+			new EagerSchedulingStrategy.Factory(),
+			new RestartPipelinedRegionFailoverStrategy.Factory(),
+			new TestRestartBackoffTimeStrategy(true, 0),
+			new DefaultExecutionVertexOperations(),
+			new ExecutionVertexVersioner(),
+			new TestExecutionSlotAllocatorFactory());
+	}
+
+	public static void enableCheckpointing(final JobGraph jobGraph) {
+		final List<JobVertexID> triggerVertices = new ArrayList<>();
+		final List<JobVertexID> allVertices = new ArrayList<>();
+
+		for (JobVertex vertex : jobGraph.getVertices()) {
+			if (vertex.isInputVertex()) {
+				triggerVertices.add(vertex.getID());
+			}
+			allVertices.add(vertex.getID());
+		}
+
+		final CheckpointCoordinatorConfiguration config = new CheckpointCoordinatorConfiguration(
+			Long.MAX_VALUE, // disable periodical checkpointing
+			DEFAULT_CHECKPOINT_TIMEOUT_MS,
+			0,
+			1,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			false,
+			false,
+			0);
+
+		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
+				triggerVertices, allVertices, allVertices,
+				config, null));
+	}
+
+	public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
+		return StreamSupport.stream(scheduler.requestJob().getAllExecutionVertices().spliterator(), false)
+			.map((vertex) -> vertex.getCurrentExecutionAttempt().getAttemptId())
+			.collect(Collectors.toList());
+	}
+
+	public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
+		final JobID jid = scheduler.requestJob().getJobID();
+		getAllCurrentExecutionAttempts(scheduler).forEach(
+			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.RUNNING))
+		);
+	}
+
+	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		final JobID jid = scheduler.requestJob().getJobID();
+
+		for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
+			final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
+			checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
+		}
+	}
+
+	public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+
+		assertEquals("test setup inconsistent", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		final PendingCheckpoint checkpoint = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
+		final CompletableFuture<CompletedCheckpoint> future = checkpoint.getCompletionFuture();
+
+		acknowledgePendingCheckpoint(scheduler, checkpoint.getCheckpointId());
+
+		CompletedCheckpoint completed = future.getNow(null);
+		assertNotNull("checkpoint not complete", completed);
+		return completed;
+	}
+
+	@SuppressWarnings("deprecation")
+	public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase scheduler) {
+		return scheduler.getExecutionGraph().getCheckpointCoordinator();
+	}
+}