You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/22 14:45:49 UTC

flink git commit: [FLINK-5836] [flip6] Fix race condition between offer slot and submit task

Repository: flink
Updated Branches:
  refs/heads/master 8e1775afc -> d6aed38b3


[FLINK-5836] [flip6] Fix race condition between offer slot and submit task

Streamline test case

This closes #3371.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6aed38b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6aed38b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6aed38b

Branch: refs/heads/master
Commit: d6aed38b3a15946d383d762030b5f5c1418388de
Parents: 8e1775a
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Fri Jan 6 16:32:08 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Feb 22 14:25:38 2017 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  35 ++--
 .../runtime/taskexecutor/TaskExecutorTest.java  | 168 +++++++++++++++++++
 2 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 58bbfac..2980376 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -659,7 +659,22 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 				final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
 
 				while (reservedSlotsIterator.hasNext()) {
-					reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
+					SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
+					try {
+						if (!taskSlotTable.markSlotActive(offer.getAllocationId())) {
+							// the slot is either free or releasing at the moment
+							final String message = "Could not mark slot " + jobId + " active.";
+							log.debug(message);
+							jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
+								leaderId, new Exception(message));
+						}
+					} catch (SlotNotFoundException e) {
+						final String message = "Could not mark slot " + jobId + " active.";
+						jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
+							leaderId, new Exception(message));
+						continue;
+					}
+					reservedSlots.add(offer);
 				}
 
 				Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
@@ -674,22 +689,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 						// check if the response is still valid
 						if (isJobManagerConnectionValid(jobId, leaderId)) {
 							// mark accepted slots active
-							for (SlotOffer acceptedSlot: acceptedSlots) {
-								try {
-									if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
-										// the slot is either free or releasing at the moment
-										final String message = "Could not mark slot " + jobId + " active.";
-										log.debug(message);
-										jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
-												leaderId, new Exception(message));
-									}
-
-									// remove the assigned slots so that we can free the left overs
-									reservedSlots.remove(acceptedSlot);
-								} catch (SlotNotFoundException e) {
-									log.debug("Could not mark slot {} active.", acceptedSlot,  e);
-									jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e);
-								}
+							for (SlotOffer acceptedSlot : acceptedSlots) {
+								reservedSlots.remove(acceptedSlot);
 							}
 
 							final Exception e = new Exception("The slot was rejected by the JobManager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index aacd329..31bf9b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -696,4 +696,172 @@ public class TaskExecutorTest extends TestLogger {
 		}
 
 	}
+
+	/**
+	 * This tests task executor receive SubmitTask before OfferSlot response.
+	 */
+	@Test
+	public void testSubmitTaskBeforeAcceptSlot() throws Exception {
+		final JobID jobId = new JobID();
+
+		final TestingSerialRpcService rpc = new TestingSerialRpcService();
+		final Configuration configuration = new Configuration();
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final ResourceID resourceId = new ResourceID("foobar");
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TimerService<AllocationID> timerService = mock(TimerService.class);
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		final String resourceManagerAddress = "rm";
+		final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+		final String jobManagerAddress = "jm";
+		final UUID jobManagerLeaderId = UUID.randomUUID();
+
+		final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
+		final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
+
+		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+		final InstanceID registrationId = new InstanceID();
+
+		when(resourceManagerGateway.registerTaskExecutor(
+			eq(resourceManagerLeaderId),
+			any(String.class),
+			eq(resourceId),
+			any(SlotReport.class),
+			any(Time.class))).thenReturn(
+				FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+		final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+		final int blobPort = 42;
+
+		final AllocationID allocationId1 = new AllocationID();
+		final AllocationID allocationId2 = new AllocationID();
+
+		final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
+
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+		when(jobMasterGateway.registerTaskManager(
+			any(String.class),
+			eq(taskManagerLocation),
+			eq(jobManagerLeaderId),
+			any(Time.class)
+		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+		when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+
+		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+		final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
+		when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+
+		final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+			jobMasterGateway,
+			jobManagerLeaderId,
+			mock(TaskManagerActions.class),
+			mock(CheckpointResponder.class),
+			libraryCacheManager,
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(PartitionProducerStateChecker.class));
+
+		jobManagerTable.put(jobId, jobManagerConnection);
+
+		try {
+			final TaskExecutor taskManager = new TaskExecutor(
+				taskManagerConfiguration,
+				taskManagerLocation,
+				rpc,
+				mock(MemoryManager.class),
+				mock(IOManager.class),
+				mock(NetworkEnvironment.class),
+				haServices,
+				mock(MetricRegistry.class),
+				mock(TaskManagerMetricGroup.class),
+				mock(BroadcastVariableManager.class),
+				mock(FileCache.class),
+				taskSlotTable,
+				jobManagerTable,
+				jobLeaderService,
+				testingFatalErrorHandler);
+			taskManager.start();
+			taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
+			taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L));
+
+			final JobVertexID jobVertexId = new JobVertexID();
+
+			JobInformation jobInformation = new JobInformation(
+				jobId,
+				name.getMethodName(),
+				new SerializedValue<>(new ExecutionConfig()),
+				new Configuration(),
+				Collections.<BlobKey>emptyList(),
+				Collections.<URL>emptyList());
+
+			TaskInformation taskInformation = new TaskInformation(
+				jobVertexId,
+				"test task",
+				1,
+				1,
+				TestInvokable.class.getName(),
+				new Configuration());
+
+			SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
+			SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
+
+			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+				serializedJobInformation,
+				serializedJobVertexInformation,
+				new ExecutionAttemptID(),
+				allocationId1,
+				0,
+				0,
+				0,
+				null,
+				Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+				Collections.<InputGateDeploymentDescriptor>emptyList());
+
+			CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new FlinkCompletableFuture<>();
+
+			// submit task first and then return acceptance response
+			when(
+				jobMasterGateway.offerSlots(
+					any(ResourceID.class),
+					any(Iterable.class),
+					eq(jobManagerLeaderId),
+					any(Time.class)))
+				.thenReturn(offerResultFuture);
+
+			// we have to add the job after the TaskExecutor, because otherwise the service has not
+			// been properly started. This will also offer the slots to the job master
+			jobLeaderService.addJob(jobId, jobManagerAddress);
+
+			verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class));
+
+			// submit the task without having acknowledge the offered slots
+			taskManager.submitTask(tdd, jobManagerLeaderId);
+
+			// acknowledge the offered slots
+			offerResultFuture.complete(Collections.singleton(offer1));
+
+			verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+
+			assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
+			assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
+			assertTrue(taskSlotTable.isSlotFree(1));
+
+			// check if a concurrent error occurred
+			testingFatalErrorHandler.rethrowError();
+		} finally {
+			rpc.stopService();
+		}
+
+	}
 }