You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/06/05 06:10:56 UTC
[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing
TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 84c290581694f17e08f9ba91be6eac70264b9d70
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 28 16:11:31 2020 +0800
[FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
---
.../TaskSubmissionTestEnvironment.java | 9 +-
.../runtime/taskexecutor/TestingTaskExecutor.java | 5 +
.../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +++++++++++++++++++++
3 files changed, 217 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 824e0f0..14adabf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
@@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
private final TestingHighAvailabilityServices haServices;
private final TemporaryFolder temporaryFolder;
- private final TaskSlotTable<Task> taskSlotTable;
+ private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable;
private final JobMasterId jobMasterId;
private TestingTaskExecutor taskExecutor;
@@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
this.jobMasterId = jobMasterId;
- this.taskSlotTable = slotSize > 0 ?
+ final TaskSlotTable<Task> taskSlotTable = slotSize > 0 ?
TaskSlotUtils.createTaskSlotTable(slotSize) :
TestingTaskSlotTable
.<Task>newBuilder()
@@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
taskExecutor.start();
taskExecutor.waitUntilStarted();
+
+ this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting());
}
static void registerJobMasterConnection(
@@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
}
public TaskSlotTable<Task> getTaskSlotTable() {
- return taskSlotTable;
+ return threadSafeTaskSlotTable;
}
public JobMasterId getJobMasterId() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 7a4a01f..95b2f2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
import javax.annotation.Nullable;
@@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor {
void waitUntilStarted() {
startFuture.join();
}
+
+ MainThreadExecutable getMainThreadExecutableForTesting() {
+ return this.rpcServer;
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
new file mode 100644
index 0000000..0b9b668
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Testing implementation of {@link TaskSlotTable}.
+ * This class wraps a given {@link TaskSlotTable}, guarantees all the accesses are invoked on the given {@link MainThreadExecutable}.
+ */
+public class ThreadSafeTaskSlotTable<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+
+ private final TaskSlotTable<T> taskSlotTable;
+ private final MainThreadExecutable mainThreadExecutable;
+
+ public ThreadSafeTaskSlotTable(
+ final TaskSlotTable<T> taskSlotTable,
+ final MainThreadExecutable mainThreadExecutable) {
+ this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
+ this.mainThreadExecutable = Preconditions.checkNotNull(mainThreadExecutable);
+ }
+
+ private void runAsync(Runnable runnable) {
+ mainThreadExecutable.runAsync(runnable);
+ }
+
+ private <V> V callAsync(Callable<V> callable) {
+ try {
+ return mainThreadExecutable.callAsync(
+ callable,
+ Time.days(1) // practically infinite timeout
+ ).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+ runAsync(() -> taskSlotTable.start(initialSlotActions, mainThreadExecutor));
+ }
+
+ @Override
+ public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+ return callAsync(() -> taskSlotTable.getAllocationIdsPerJob(jobId));
+ }
+
+ @Override
+ public SlotReport createSlotReport(ResourceID resourceId) {
+ return callAsync(() -> taskSlotTable.createSlotReport(resourceId));
+ }
+
+ @Override
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+ return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, slotTimeout));
+ }
+
+ @Override
+ public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+ return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, resourceProfile, slotTimeout));
+ }
+
+ @Override
+ public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+ return callAsync(() -> taskSlotTable.markSlotActive(allocationId));
+ }
+
+ @Override
+ public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
+ return callAsync(() -> taskSlotTable.markSlotInactive(allocationId, slotTimeout));
+ }
+
+ @Override
+ public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+ return callAsync(() -> taskSlotTable.freeSlot(allocationId));
+ }
+
+ @Override
+ public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+ return callAsync(() -> taskSlotTable.freeSlot(allocationId, cause));
+ }
+
+ @Override
+ public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+ return callAsync(() -> taskSlotTable.isValidTimeout(allocationId, ticket));
+ }
+
+ @Override
+ public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+ return callAsync(() -> taskSlotTable.isAllocated(index, jobId, allocationId));
+ }
+
+ @Override
+ public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+ return callAsync(() -> taskSlotTable.tryMarkSlotActive(jobId, allocationId));
+ }
+
+ @Override
+ public boolean isSlotFree(int index) {
+ return callAsync(() -> taskSlotTable.isSlotFree(index));
+ }
+
+ @Override
+ public boolean hasAllocatedSlots(JobID jobId) {
+ return callAsync(() -> taskSlotTable.hasAllocatedSlots(jobId));
+ }
+
+ @Override
+ public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+ return callAsync(() -> taskSlotTable.getAllocatedSlots(jobId));
+ }
+
+ @Override
+ public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+ return callAsync(() -> taskSlotTable.getActiveSlots(jobId));
+ }
+
+ @Nullable
+ @Override
+ public JobID getOwningJob(AllocationID allocationId) {
+ return callAsync(() -> taskSlotTable.getOwningJob(allocationId));
+ }
+
+ @Override
+ public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
+ return callAsync(() -> taskSlotTable.addTask(task));
+ }
+
+ @Override
+ public T removeTask(ExecutionAttemptID executionAttemptID) {
+ return callAsync(() -> taskSlotTable.removeTask(executionAttemptID));
+ }
+
+ @Override
+ public T getTask(ExecutionAttemptID executionAttemptID) {
+ return callAsync(() -> taskSlotTable.getTask(executionAttemptID));
+ }
+
+ @Override
+ public Iterator<T> getTasks(JobID jobId) {
+ return callAsync(() -> taskSlotTable.getTasks(jobId));
+ }
+
+ @Override
+ public AllocationID getCurrentAllocation(int index) {
+ return callAsync(() -> taskSlotTable.getCurrentAllocation(index));
+ }
+
+ @Override
+ public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
+ return callAsync(() -> taskSlotTable.getTaskMemoryManager(allocationID));
+ }
+
+ @Override
+ public void notifyTimeout(AllocationID key, UUID ticket) {
+ runAsync(() -> taskSlotTable.notifyTimeout(key, ticket));
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return callAsync(taskSlotTable::closeAsync);
+ }
+
+ @Override
+ public void close() throws Exception {
+ callAsync(() -> {
+ taskSlotTable.close();
+ return null;
+ });
+ }
+}