You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:55 UTC
[flink] 06/13: [FLINK-16177][refactor] Make test setup logic for
OperatorCoordinatorSchedulerTest more flexible.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9f30c7fc0b8dbbcbb9c9c8910fd8b48ad84bb9cd
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 11 22:15:50 2020 +0200
[FLINK-16177][refactor] Make test setup logic for OperatorCoordinatorSchedulerTest more flexible.
---
.../OperatorCoordinatorSchedulerTest.java | 92 +++++++++--
.../runtime/scheduler/SchedulerTestingUtils.java | 49 +++++-
.../TestingCheckpointStorageCoordinatorView.java | 181 +++++++++++++++++++++
3 files changed, 310 insertions(+), 12 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 8ecf172..9ea633f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -19,6 +19,9 @@
package org.apache.flink.runtime.operators.coordination;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -29,10 +32,13 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -44,7 +50,11 @@ import org.junit.Test;
import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
import static org.hamcrest.Matchers.contains;
@@ -69,7 +79,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
// ------------------------------------------------------------------------
- // tests
+ // tests for scheduling
// ------------------------------------------------------------------------
@Test
@@ -165,6 +175,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
assertThat(result, futureFailedWith(TestException.class));
}
+ // ------------------------------------------------------------------------
+ // tests for REST request delivery
+ // ------------------------------------------------------------------------
+
@Test
@SuppressWarnings("unchecked")
public void testDeliveringClientRequestToRequestHandler() throws Exception {
@@ -216,39 +230,66 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
// ------------------------------------------------------------------------
private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
- return setupTestJobAndScheduler(provider, null, false);
+ return setupTestJobAndScheduler(provider);
}
private DefaultScheduler createAndStartScheduler() throws Exception {
- final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, false);
+ final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId));
scheduler.startScheduling();
return scheduler;
}
private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
- final DefaultScheduler scheduler = createAndStartScheduler();
+ return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(testOperatorId));
+ }
+
+ private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
+ final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);
+ scheduler.startScheduling();
+ executor.triggerAll();
+ executor.triggerScheduledTasks();
SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
return scheduler;
}
private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
- final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, false);
+ final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null);
scheduler.startScheduling();
+ executor.triggerAll();
+ executor.triggerScheduledTasks();
SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
return scheduler;
}
- private DefaultScheduler createSchedulerWithCheckpointing() throws Exception {
- final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, true);
+ private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinatorState) throws Exception {
+ final byte[] savepointMetadata = serializeAsCheckpointMetadata(testOperatorId, coordinatorState);
+ final String savepointPointer = "testingSavepointPointer";
+
+ final TestingCheckpointStorageCoordinatorView storage = new TestingCheckpointStorageCoordinatorView();
+ storage.registerSavepoint(savepointPointer, savepointMetadata);
+
+ final Consumer<JobGraph> savepointConfigurer = (jobGraph) -> {
+ SchedulerTestingUtils.enableCheckpointing(jobGraph, storage.asStateBackend());
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPointer));
+ };
+
+ final DefaultScheduler scheduler = setupTestJobAndScheduler(
+ new TestingOperatorCoordinator.Provider(testOperatorId),
+ null,
+ savepointConfigurer);
+
scheduler.startScheduling();
- SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
return scheduler;
}
+ private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
+ return setupTestJobAndScheduler(provider, null, null);
+ }
+
private DefaultScheduler setupTestJobAndScheduler(
OperatorCoordinator.Provider provider,
@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
- boolean enableCheckpoints) throws Exception {
+ @Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
vertex.setInvokableClass(NoOpInvokable.class);
@@ -256,8 +297,9 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
vertex.setParallelism(2);
final JobGraph jobGraph = new JobGraph("test job with OperatorCoordinator", vertex);
- if (enableCheckpoints) {
- SchedulerTestingUtils.enableCheckpointing(jobGraph);
+ SchedulerTestingUtils.enableCheckpointing(jobGraph);
+ if (jobGraphPreProcessing != null) {
+ jobGraphPreProcessing.accept(jobGraph);
}
final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
@@ -301,6 +343,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
return scheduler.getExecutionVertex(id).getJobVertex();
}
+ private static OperatorState createOperatorState(OperatorID id, byte[] coordinatorState) {
+ final OperatorState state = new OperatorState(id, 10, 16384);
+ state.setCoordinatorState(new ByteStreamStateHandle("name", coordinatorState));
+ return state;
+ }
+
+ private static byte[] serializeAsCheckpointMetadata(OperatorID id, byte[] coordinatorState) throws IOException {
+ final OperatorState state = createOperatorState(id, coordinatorState);
+ final CheckpointMetadata metadata = new CheckpointMetadata(
+ 1337L, Collections.singletonList(state), Collections.emptyList());
+
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Checkpoints.storeCheckpointMetadata(metadata, out);
+ return out.toByteArray();
+ }
+
// ------------------------------------------------------------------------
// test mocks
// ------------------------------------------------------------------------
@@ -321,6 +379,18 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
}
}
+ private static final class CoordinatorThatFailsCheckpointing extends TestingOperatorCoordinator {
+
+ public CoordinatorThatFailsCheckpointing(Context context) {
+ super(context);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+ throw new Error(new TestException());
+ }
+ }
+
private static final class FailingTaskExecutorOperatorEventGateway implements TaskExecutorOperatorEventGateway {
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 99458e4..40fe221 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
@@ -61,9 +62,11 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -72,6 +75,9 @@ import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -181,6 +187,10 @@ public class SchedulerTestingUtils {
}
public static void enableCheckpointing(final JobGraph jobGraph) {
+ enableCheckpointing(jobGraph, null);
+ }
+
+ public static void enableCheckpointing(final JobGraph jobGraph, @Nullable StateBackend stateBackend) {
final List<JobVertexID> triggerVertices = new ArrayList<>();
final List<JobVertexID> allVertices = new ArrayList<>();
@@ -202,9 +212,18 @@ public class SchedulerTestingUtils {
false,
0);
+ SerializedValue<StateBackend> serializedStateBackend = null;
+ if (stateBackend != null) {
+ try {
+ serializedStateBackend = new SerializedValue<>(stateBackend);
+ } catch (IOException e) {
+ throw new RuntimeException("could not serialize state backend", e);
+ }
+ }
+
jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
triggerVertices, allVertices, allVertices,
- config, null));
+ config, serializedStateBackend));
}
public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
@@ -213,6 +232,29 @@ public class SchedulerTestingUtils {
.collect(Collectors.toList());
}
+ public static ExecutionState getExecutionState(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+ final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+ return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getState();
+ }
+
+ public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+ final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+ assert ejv != null;
+ final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+ scheduler.updateTaskExecutionState(new TaskExecutionState(
+ ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+ }
+
+ public static void setExecutionToRunning(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+ final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+ assert ejv != null;
+ final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+ scheduler.updateTaskExecutionState(new TaskExecutionState(
+ ejv.getJobId(), attemptID, ExecutionState.RUNNING));
+ }
+
public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
final JobID jid = scheduler.requestJob().getJobID();
getAllCurrentExecutionAttempts(scheduler).forEach(
@@ -250,6 +292,11 @@ public class SchedulerTestingUtils {
return scheduler.getCheckpointCoordinator();
}
+ private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
+ final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
+ return scheduler.getExecutionVertex(id).getJobVertex();
+ }
+
// ------------------------------------------------------------------------
private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java
new file mode 100644
index 0000000..e06d594
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+
+/**
+ * A testing implementation of the {@link CheckpointStorageCoordinatorView}.
+ */
+@SuppressWarnings("serial")
+public class TestingCheckpointStorageCoordinatorView implements CheckpointStorage, java.io.Serializable {
+
+ private final HashMap<String, TestingCompletedCheckpointStorageLocation> registeredSavepoints = new HashMap<>();
+
+ // ------------------------------------------------------------------------
+ // test setup methods
+ // ------------------------------------------------------------------------
+
+ public void registerSavepoint(String pointer, byte[] metadata) {
+ registeredSavepoints.put(pointer, new TestingCompletedCheckpointStorageLocation(pointer, metadata));
+ }
+
+ // ------------------------------------------------------------------------
+ // CheckpointStorageCoordinatorView methods
+ // ------------------------------------------------------------------------
+
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ return false;
+ }
+
+ @Override
+ public boolean hasDefaultSavepointLocation() {
+ return false;
+ }
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
+ final CompletedCheckpointStorageLocation location = registeredSavepoints.get(externalPointer);
+ if (location != null) {
+ return location;
+ } else {
+ throw new IOException("Could not find savepoint for pointer: " + externalPointer);
+ }
+ }
+
+ @Override
+ public void initializeBaseLocations() throws IOException {}
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+ return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException {
+ return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(
+ long checkpointId,
+ CheckpointStorageLocationReference reference) {
+ return new MemCheckpointStreamFactory(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+ return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils
+ // ------------------------------------------------------------------------
+
+ public StateBackend asStateBackend() {
+ return new FactoringStateBackend(this);
+ }
+
+ // ------------------------------------------------------------------------
+ // internal support classes
+ // ------------------------------------------------------------------------
+
+ private static final class TestingCompletedCheckpointStorageLocation
+ implements CompletedCheckpointStorageLocation, java.io.Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String pointer;
+ private final byte[] metadata;
+
+ TestingCompletedCheckpointStorageLocation(String pointer, byte[] metadata) {
+ this.pointer = pointer;
+ this.metadata = metadata;
+ }
+
+ @Override
+ public String getExternalPointer() {
+ return pointer;
+ }
+
+ @Override
+ public StreamStateHandle getMetadataHandle() {
+ return new ByteStreamStateHandle(pointer, metadata);
+ }
+
+ @Override
+ public void disposeStorageLocation() throws IOException {}
+ }
+
+ // ------------------------------------------------------------------------
+ // Everything below here is necessary only to make it possible to
+ // pass the CheckpointStorageCoordinatorView to the CheckpointCoordinator
+ // via the JobGraph, because that part expects a StateBackend
+ // ------------------------------------------------------------------------
+
+ /**
+ * A StateBackend whose only purpose is to create a given CheckpointStorage.
+ */
+ private static final class FactoringStateBackend implements StateBackend {
+
+ private final TestingCheckpointStorageCoordinatorView testingCoordinatorView;
+
+ private FactoringStateBackend(TestingCheckpointStorageCoordinatorView testingCoordinatorView) {
+ this.testingCoordinatorView = testingCoordinatorView;
+ }
+
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
+ return testingCoordinatorView.resolveCheckpoint(externalPointer);
+ }
+
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ return testingCoordinatorView;
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+ }
+}