You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2020/06/05 06:11:52 UTC

[flink] branch release-1.11 updated (37f436e -> 10b2842)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 37f436e  [FLINK-18143][python] Fix Python meter metric incorrect value problem (#12498)
     new 359ab03  [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
     new 10b2842  [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../TaskSubmissionTestEnvironment.java             |  34 ++--
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +++++++++++++++++++++
 3 files changed, 230 insertions(+), 15 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java


[flink] 02/02: [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10b284201337750422b663e8b69b7d867825da40
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 28 16:23:31 2020 +0800

    [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.
    
    This closes #12399.
---
 .../TaskSubmissionTestEnvironment.java             | 25 +++++++++++-----------
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 14adabf..e79b720 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 			jobMasterGateway = testingJobMasterGateway;
 		}
 
-		TaskManagerActions taskManagerActions;
-		if (taskManagerActionListeners.size() == 0) {
-			taskManagerActions = new NoOpTaskManagerActions();
-		} else {
-			TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(taskSlotTable, jobMasterGateway);
-			for (Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) {
-				testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2);
-			}
-			taskManagerActions = testTaskManagerActions;
-		}
-
 		this.testingRpcService = testingRpcService;
 		final DefaultJobTable jobTable = DefaultJobTable.create();
-		registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout);
 
 		TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager(
 			false,
@@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 		taskExecutor.waitUntilStarted();
 
 		this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting());
+
+		TaskManagerActions taskManagerActions;
+		if (taskManagerActionListeners.size() == 0) {
+			taskManagerActions = new NoOpTaskManagerActions();
+		} else {
+			TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway);
+			for (Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) {
+				testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2);
+			}
+			taskManagerActions = testTaskManagerActions;
+		}
+
+		registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout);
 	}
 
 	static void registerJobMasterConnection(


[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 359ab037349145b8eaadbd476d2f1a98bdab45ed
Author: Xintong Song <to...@gmail.com>
AuthorDate: Thu May 28 16:11:31 2020 +0800

    [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
---
 .../TaskSubmissionTestEnvironment.java             |   9 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 824e0f0..14adabf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
 	private final TestingHighAvailabilityServices haServices;
 	private final TemporaryFolder temporaryFolder;
-	private final TaskSlotTable<Task> taskSlotTable;
+	private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable;
 	private final JobMasterId jobMasterId;
 
 	private TestingTaskExecutor taskExecutor;
@@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
 		this.jobMasterId = jobMasterId;
 
-		this.taskSlotTable = slotSize > 0 ?
+		final TaskSlotTable<Task> taskSlotTable = slotSize > 0 ?
 			TaskSlotUtils.createTaskSlotTable(slotSize) :
 			TestingTaskSlotTable
 				.<Task>newBuilder()
@@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
 		taskExecutor.start();
 		taskExecutor.waitUntilStarted();
+
+		this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting());
 	}
 
 	static void registerJobMasterConnection(
@@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 	}
 
 	public TaskSlotTable<Task> getTaskSlotTable() {
-		return taskSlotTable;
+		return threadSafeTaskSlotTable;
 	}
 
 	public JobMasterId getJobMasterId() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 7a4a01f..95b2f2d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
@@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor {
 	void waitUntilStarted() {
 		startFuture.join();
 	}
+
+	MainThreadExecutable getMainThreadExecutableForTesting() {
+		return this.rpcServer;
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
new file mode 100644
index 0000000..0b9b668
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Testing implementation of {@link TaskSlotTable}.
+ * This class wraps a given {@link TaskSlotTable}, guarantees all the accesses are invoked on the given {@link MainThreadExecutable}.
+ */
+public class ThreadSafeTaskSlotTable<T extends TaskSlotPayload> implements TaskSlotTable<T> {
+
+	private final TaskSlotTable<T> taskSlotTable;
+	private final MainThreadExecutable mainThreadExecutable;
+
+	public ThreadSafeTaskSlotTable(
+			final TaskSlotTable<T> taskSlotTable,
+			final MainThreadExecutable mainThreadExecutable) {
+		this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
+		this.mainThreadExecutable = Preconditions.checkNotNull(mainThreadExecutable);
+	}
+
+	private void runAsync(Runnable runnable) {
+		mainThreadExecutable.runAsync(runnable);
+	}
+
+	private <V> V callAsync(Callable<V> callable) {
+		try {
+			return mainThreadExecutable.callAsync(
+				callable,
+				Time.days(1) // practically infinite timeout
+			).get();
+		} catch (InterruptedException | ExecutionException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
+		runAsync(() -> taskSlotTable.start(initialSlotActions, mainThreadExecutor));
+	}
+
+	@Override
+	public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getAllocationIdsPerJob(jobId));
+	}
+
+	@Override
+	public SlotReport createSlotReport(ResourceID resourceId) {
+		return callAsync(() -> taskSlotTable.createSlotReport(resourceId));
+	}
+
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
+		return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, slotTimeout));
+	}
+
+	@Override
+	public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+		return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, resourceProfile, slotTimeout));
+	}
+
+	@Override
+	public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.markSlotActive(allocationId));
+	}
+
+	@Override
+	public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.markSlotInactive(allocationId, slotTimeout));
+	}
+
+	@Override
+	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.freeSlot(allocationId));
+	}
+
+	@Override
+	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.freeSlot(allocationId, cause));
+	}
+
+	@Override
+	public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+		return callAsync(() -> taskSlotTable.isValidTimeout(allocationId, ticket));
+	}
+
+	@Override
+	public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) {
+		return callAsync(() -> taskSlotTable.isAllocated(index, jobId, allocationId));
+	}
+
+	@Override
+	public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) {
+		return callAsync(() -> taskSlotTable.tryMarkSlotActive(jobId, allocationId));
+	}
+
+	@Override
+	public boolean isSlotFree(int index) {
+		return callAsync(() -> taskSlotTable.isSlotFree(index));
+	}
+
+	@Override
+	public boolean hasAllocatedSlots(JobID jobId) {
+		return callAsync(() -> taskSlotTable.hasAllocatedSlots(jobId));
+	}
+
+	@Override
+	public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getAllocatedSlots(jobId));
+	}
+
+	@Override
+	public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getActiveSlots(jobId));
+	}
+
+	@Nullable
+	@Override
+	public JobID getOwningJob(AllocationID allocationId) {
+		return callAsync(() -> taskSlotTable.getOwningJob(allocationId));
+	}
+
+	@Override
+	public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
+		return callAsync(() -> taskSlotTable.addTask(task));
+	}
+
+	@Override
+	public T removeTask(ExecutionAttemptID executionAttemptID) {
+		return callAsync(() -> taskSlotTable.removeTask(executionAttemptID));
+	}
+
+	@Override
+	public T getTask(ExecutionAttemptID executionAttemptID) {
+		return callAsync(() -> taskSlotTable.getTask(executionAttemptID));
+	}
+
+	@Override
+	public Iterator<T> getTasks(JobID jobId) {
+		return callAsync(() -> taskSlotTable.getTasks(jobId));
+	}
+
+	@Override
+	public AllocationID getCurrentAllocation(int index) {
+		return callAsync(() -> taskSlotTable.getCurrentAllocation(index));
+	}
+
+	@Override
+	public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException {
+		return callAsync(() -> taskSlotTable.getTaskMemoryManager(allocationID));
+	}
+
+	@Override
+	public void notifyTimeout(AllocationID key, UUID ticket) {
+		runAsync(() -> taskSlotTable.notifyTimeout(key, ticket));
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return callAsync(taskSlotTable::closeAsync);
+	}
+
+	@Override
+	public void close() throws Exception {
+		callAsync(() -> {
+			taskSlotTable.close();
+			return null;
+		});
+	}
+}