You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/06/05 06:10:56 UTC

[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 84c290581694f17e08f9ba91be6eac70264b9d70
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 28 16:11:31 2020 +0800

    [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
---
 .../TaskSubmissionTestEnvironment.java             |   9 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 824e0f0..14adabf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
 	private final TestingHighAvailabilityServices haServices;
 	private final TemporaryFolder temporaryFolder;
-	private final TaskSlotTable<Task> taskSlotTable;
+	private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable;
 	private final JobMasterId jobMasterId;
 
 	private TestingTaskExecutor taskExecutor;
@@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
 		this.jobMasterId = jobMasterId;
 
-		this.taskSlotTable = slotSize > 0 ?
+		final TaskSlotTable<Task> taskSlotTable = slotSize > 0 ?
 			TaskSlotUtils.createTaskSlotTable(slotSize) :
 			TestingTaskSlotTable
 				.<Task>newBuilder()
@@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
 		taskExecutor.start();
 		taskExecutor.waitUntilStarted();
+
+		this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting());
 	}
 
 	static void registerJobMasterConnection(
@@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 	}
 
 	public TaskSlotTable<Task> getTaskSlotTable() {
-		return taskSlotTable;
+		return threadSafeTaskSlotTable;
 	}
 
 	public JobMasterId getJobMasterId() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 7a4a01f..95b2f2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
@@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor {
 	void waitUntilStarted() {
 		startFuture.join();
 	}
+
+	MainThreadExecutable getMainThreadExecutableForTesting() {
+		return this.rpcServer;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
new file mode 100644
index 0000000..0b9b668
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Testing implementation of {@link TaskSlotTable}.
+ * This class wraps a given {@link TaskSlotTable}, guarantees all the accesses are invoked on the given {@link MainThreadExecutable}.
+ */
+public class ThreadSafeTaskSlotTable<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+
+	private final TaskSlotTable<T> taskSlotTable;
+	private final MainThreadExecutable mainThreadExecutable;
+
+	public ThreadSafeTaskSlotTable(
+			final TaskSlotTable<T> taskSlotTable,
+			final MainThreadExecutable mainThreadExecutable) {
+		this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
+		this.mainThreadExecutable = Preconditions.checkNotNull(mainThreadExecutable);
+	}
+
+	private void runAsync(Runnable runnable) {
+		mainThreadExecutable.runAsync(runnable);
+	}
+
+	private <V> V callAsync(Callable<V> callable) {
+		try {
+			return mainThreadExecutable.callAsync(
+				callable,
+				Time.days(1) // practically infinite timeout
+			).get();
+		} catch (InterruptedException | ExecutionException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+		runAsync(() -> taskSlotTable.start(initialSlotActions, mainThreadExecutor));
+	}
+
+	@Override
+	public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getAllocationIdsPerJob(jobId));
+	}
+
+	@Override
+	public SlotReport createSlotReport(ResourceID resourceId) {
+		return callAsync(() -> taskSlotTable.createSlotReport(resourceId));
+	}
+
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+		return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, slotTimeout));
+	}
+
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+		return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, resourceProfile, slotTimeout));
+	}
+
+	@Override
+	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.markSlotActive(allocationId));
+	}
+
+	@Override
+	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.markSlotInactive(allocationId, slotTimeout));
+	}
+
+	@Override
+	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.freeSlot(allocationId));
+	}
+
+	@Override
+	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.freeSlot(allocationId, cause));
+	}
+
+	@Override
+	public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+		return callAsync(() -> taskSlotTable.isValidTimeout(allocationId, ticket));
+	}
+
+	@Override
+	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+		return callAsync(() -> taskSlotTable.isAllocated(index, jobId, allocationId));
+	}
+
+	@Override
+	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+		return callAsync(() -> taskSlotTable.tryMarkSlotActive(jobId, allocationId));
+	}
+
+	@Override
+	public boolean isSlotFree(int index) {
+		return callAsync(() -> taskSlotTable.isSlotFree(index));
+	}
+
+	@Override
+	public boolean hasAllocatedSlots(JobID jobId) {
+		return callAsync(() -> taskSlotTable.hasAllocatedSlots(jobId));
+	}
+
+	@Override
+	public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getAllocatedSlots(jobId));
+	}
+
+	@Override
+	public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getActiveSlots(jobId));
+	}
+
+	@Nullable
+	@Override
+	public JobID getOwningJob(AllocationID allocationId) {
+		return callAsync(() -> taskSlotTable.getOwningJob(allocationId));
+	}
+
+	@Override
+	public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
+		return callAsync(() -> taskSlotTable.addTask(task));
+	}
+
+	@Override
+	public T removeTask(ExecutionAttemptID executionAttemptID) {
+		return callAsync(() -> taskSlotTable.removeTask(executionAttemptID));
+	}
+
+	@Override
+	public T getTask(ExecutionAttemptID executionAttemptID) {
+		return callAsync(() -> taskSlotTable.getTask(executionAttemptID));
+	}
+
+	@Override
+	public Iterator<T> getTasks(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getTasks(jobId));
+	}
+
+	@Override
+	public AllocationID getCurrentAllocation(int index) {
+		return callAsync(() -> taskSlotTable.getCurrentAllocation(index));
+	}
+
+	@Override
+	public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.getTaskMemoryManager(allocationID));
+	}
+
+	@Override
+	public void notifyTimeout(AllocationID key, UUID ticket) {
+		runAsync(() -> taskSlotTable.notifyTimeout(key, ticket));
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return callAsync(taskSlotTable::closeAsync);
+	}
+
+	@Override
+	public void close() throws Exception {
+		callAsync(() -> {
+			taskSlotTable.close();
+			return null;
+		});
+	}
+}