You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:55 UTC

[flink] 06/13: [FLINK-16177][refactor] Make test setup logic for OperatorCoordinatorSchedulerTest more flexible.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f30c7fc0b8dbbcbb9c9c8910fd8b48ad84bb9cd
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 11 22:15:50 2020 +0200

    [FLINK-16177][refactor] Make test setup logic for OperatorCoordinatorSchedulerTest more flexible.
---
 .../OperatorCoordinatorSchedulerTest.java          |  92 +++++++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |  49 +++++-
 .../TestingCheckpointStorageCoordinatorView.java   | 181 +++++++++++++++++++++
 3 files changed, 310 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 8ecf172..9ea633f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -29,10 +32,13 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -44,7 +50,11 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
 import static org.hamcrest.Matchers.contains;
@@ -69,7 +79,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
 
 	// ------------------------------------------------------------------------
-	//  tests
+	//  tests for scheduling
 	// ------------------------------------------------------------------------
 
 	@Test
@@ -165,6 +175,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	// ------------------------------------------------------------------------
+	//  tests for REST request delivery
+	// ------------------------------------------------------------------------
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testDeliveringClientRequestToRequestHandler() throws Exception {
@@ -216,39 +230,66 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
-		return setupTestJobAndScheduler(provider, null, false);
+		return setupTestJobAndScheduler(provider);
 	}
 
 	private DefaultScheduler createAndStartScheduler() throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, false);
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId));
 		scheduler.startScheduling();
 		return scheduler;
 	}
 
 	private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
-		final DefaultScheduler scheduler = createAndStartScheduler();
+		return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(testOperatorId));
+	}
+
+	private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);
+		scheduler.startScheduling();
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
 		return scheduler;
 	}
 
 	private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, false);
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null);
 		scheduler.startScheduling();
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
 		return scheduler;
 	}
 
-	private DefaultScheduler createSchedulerWithCheckpointing() throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, true);
+	private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinatorState) throws Exception {
+		final byte[] savepointMetadata = serializeAsCheckpointMetadata(testOperatorId, coordinatorState);
+		final String savepointPointer = "testingSavepointPointer";
+
+		final TestingCheckpointStorageCoordinatorView storage = new TestingCheckpointStorageCoordinatorView();
+		storage.registerSavepoint(savepointPointer, savepointMetadata);
+
+		final Consumer<JobGraph> savepointConfigurer = (jobGraph) -> {
+			SchedulerTestingUtils.enableCheckpointing(jobGraph, storage.asStateBackend());
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPointer));
+		};
+
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(
+				new TestingOperatorCoordinator.Provider(testOperatorId),
+				null,
+				savepointConfigurer);
+
 		scheduler.startScheduling();
-		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
 		return scheduler;
 	}
 
+	private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
+		return setupTestJobAndScheduler(provider, null, null);
+	}
+
 	private DefaultScheduler setupTestJobAndScheduler(
 			OperatorCoordinator.Provider provider,
 			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
-			boolean enableCheckpoints) throws Exception {
+			@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
 
 		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
 		vertex.setInvokableClass(NoOpInvokable.class);
@@ -256,8 +297,9 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		vertex.setParallelism(2);
 
 		final JobGraph jobGraph = new JobGraph("test job with OperatorCoordinator", vertex);
-		if (enableCheckpoints) {
-			SchedulerTestingUtils.enableCheckpointing(jobGraph);
+		SchedulerTestingUtils.enableCheckpointing(jobGraph);
+		if (jobGraphPreProcessing != null) {
+			jobGraphPreProcessing.accept(jobGraph);
 		}
 
 		final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
@@ -301,6 +343,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		return scheduler.getExecutionVertex(id).getJobVertex();
 	}
 
+	private static OperatorState createOperatorState(OperatorID id, byte[] coordinatorState) {
+		final OperatorState state = new OperatorState(id, 10, 16384);
+		state.setCoordinatorState(new ByteStreamStateHandle("name", coordinatorState));
+		return state;
+	}
+
+	private static byte[] serializeAsCheckpointMetadata(OperatorID id, byte[] coordinatorState) throws IOException {
+		final OperatorState state = createOperatorState(id, coordinatorState);
+		final CheckpointMetadata metadata = new CheckpointMetadata(
+			1337L, Collections.singletonList(state), Collections.emptyList());
+
+		final ByteArrayOutputStream out = new ByteArrayOutputStream();
+		Checkpoints.storeCheckpointMetadata(metadata, out);
+		return out.toByteArray();
+	}
+
 	// ------------------------------------------------------------------------
 	//  test mocks
 	// ------------------------------------------------------------------------
@@ -321,6 +379,18 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		}
 	}
 
+	private static final class CoordinatorThatFailsCheckpointing extends TestingOperatorCoordinator {
+
+		public CoordinatorThatFailsCheckpointing(Context context) {
+			super(context);
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+			throw new Error(new TestException());
+		}
+	}
+
 	private static final class FailingTaskExecutorOperatorEventGateway implements TaskExecutorOperatorEventGateway {
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 99458e4..40fe221 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
@@ -61,9 +62,11 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -72,6 +75,9 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -181,6 +187,10 @@ public class SchedulerTestingUtils {
 	}
 
 	public static void enableCheckpointing(final JobGraph jobGraph) {
+		enableCheckpointing(jobGraph, null);
+	}
+
+	public static void enableCheckpointing(final JobGraph jobGraph, @Nullable StateBackend stateBackend) {
 		final List<JobVertexID> triggerVertices = new ArrayList<>();
 		final List<JobVertexID> allVertices = new ArrayList<>();
 
@@ -202,9 +212,18 @@ public class SchedulerTestingUtils {
 			false,
 			0);
 
+		SerializedValue<StateBackend> serializedStateBackend = null;
+		if (stateBackend != null) {
+			try {
+				serializedStateBackend = new SerializedValue<>(stateBackend);
+			} catch (IOException e) {
+				throw new RuntimeException("could not serialize state backend", e);
+			}
+		}
+
 		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
 				triggerVertices, allVertices, allVertices,
-				config, null));
+				config, serializedStateBackend));
 	}
 
 	public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
@@ -213,6 +232,29 @@ public class SchedulerTestingUtils {
 			.collect(Collectors.toList());
 	}
 
+	public static ExecutionState getExecutionState(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getState();
+	}
+
+	public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		assert ejv != null;
+		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+			ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+	}
+
+	public static void setExecutionToRunning(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		assert ejv != null;
+		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+			ejv.getJobId(), attemptID, ExecutionState.RUNNING));
+	}
+
 	public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
 		final JobID jid = scheduler.requestJob().getJobID();
 		getAllCurrentExecutionAttempts(scheduler).forEach(
@@ -250,6 +292,11 @@ public class SchedulerTestingUtils {
 		return scheduler.getCheckpointCoordinator();
 	}
 
+	private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
+		final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
+		return scheduler.getExecutionVertex(id).getJobVertex();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java
new file mode 100644
index 0000000..e06d594
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+
+/**
+ * A testing implementation of the {@link CheckpointStorageCoordinatorView}.
+ */
+@SuppressWarnings("serial")
+public class TestingCheckpointStorageCoordinatorView implements CheckpointStorage, java.io.Serializable {
+
+	private final HashMap<String, TestingCompletedCheckpointStorageLocation> registeredSavepoints = new HashMap<>();
+
+	// ------------------------------------------------------------------------
+	//  test setup methods
+	// ------------------------------------------------------------------------
+
+	public void registerSavepoint(String pointer, byte[] metadata) {
+		registeredSavepoints.put(pointer, new TestingCompletedCheckpointStorageLocation(pointer, metadata));
+	}
+
+	// ------------------------------------------------------------------------
+	//  CheckpointStorageCoordinatorView methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsHighlyAvailableStorage() {
+		return false;
+	}
+
+	@Override
+	public boolean hasDefaultSavepointLocation() {
+		return false;
+	}
+
+	@Override
+	public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
+		final CompletedCheckpointStorageLocation location = registeredSavepoints.get(externalPointer);
+		if (location != null) {
+			return location;
+		} else {
+			throw new IOException("Could not find savepoint for pointer: " + externalPointer);
+		}
+	}
+
+	@Override
+	public void initializeBaseLocations() throws IOException {}
+
+	@Override
+	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+		return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException {
+		return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public CheckpointStreamFactory resolveCheckpointStorageLocation(
+			long checkpointId,
+			CheckpointStorageLocationReference reference) {
+		return new MemCheckpointStreamFactory(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+		return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+
+	public StateBackend asStateBackend() {
+		return new FactoringStateBackend(this);
+	}
+
+	// ------------------------------------------------------------------------
+	//  internal support classes
+	// ------------------------------------------------------------------------
+
+	private static final class TestingCompletedCheckpointStorageLocation
+			implements CompletedCheckpointStorageLocation, java.io.Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		private final String pointer;
+		private final byte[] metadata;
+
+		TestingCompletedCheckpointStorageLocation(String pointer, byte[] metadata) {
+			this.pointer = pointer;
+			this.metadata = metadata;
+		}
+
+		@Override
+		public String getExternalPointer() {
+			return pointer;
+		}
+
+		@Override
+		public StreamStateHandle getMetadataHandle() {
+			return new ByteStreamStateHandle(pointer, metadata);
+		}
+
+		@Override
+		public void disposeStorageLocation() throws IOException {}
+	}
+
+	// ------------------------------------------------------------------------
+	//   Everything below here is necessary only to make it possible to
+	//   pass the CheckpointStorageCoordinatorView to the CheckpointCoordinator
+	//   via the JobGraph, because that part expects a StateBackend
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A StateBackend whose only purpose is to create a given CheckpointStorage.
+	 */
+	private static final class FactoringStateBackend implements StateBackend {
+
+		private final TestingCheckpointStorageCoordinatorView testingCoordinatorView;
+
+		private FactoringStateBackend(TestingCheckpointStorageCoordinatorView testingCoordinatorView) {
+			this.testingCoordinatorView = testingCoordinatorView;
+		}
+
+
+		@Override
+		public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
+			return testingCoordinatorView.resolveCheckpoint(externalPointer);
+		}
+
+		@Override
+		public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+			return testingCoordinatorView;
+		}
+
+		@Override
+		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+}