You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/22 14:45:49 UTC
flink git commit: [FLINK-5836] [flip6] Fix race condition between
offer slot and submit task
Repository: flink
Updated Branches:
refs/heads/master 8e1775afc -> d6aed38b3
[FLINK-5836] [flip6] Fix race condition between offer slot and submit task
Streamline test case
This closes #3371.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6aed38b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6aed38b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6aed38b
Branch: refs/heads/master
Commit: d6aed38b3a15946d383d762030b5f5c1418388de
Parents: 8e1775a
Author: wenlong.lwl <we...@alibaba-inc.com>
Authored: Fri Jan 6 16:32:08 2017 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Feb 22 14:25:38 2017 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 35 ++--
.../runtime/taskexecutor/TaskExecutorTest.java | 168 +++++++++++++++++++
2 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 58bbfac..2980376 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -659,7 +659,22 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
final Collection<SlotOffer> reservedSlots = new HashSet<>(2);
while (reservedSlotsIterator.hasNext()) {
- reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
+ SlotOffer offer = reservedSlotsIterator.next().generateSlotOffer();
+ try {
+ if (!taskSlotTable.markSlotActive(offer.getAllocationId())) {
+ // the slot is either free or releasing at the moment
+ final String message = "Could not mark slot " + jobId + " active.";
+ log.debug(message);
+ jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
+ leaderId, new Exception(message));
+ }
+ } catch (SlotNotFoundException e) {
+ final String message = "Could not mark slot " + jobId + " active.";
+ jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
+ leaderId, new Exception(message));
+ continue;
+ }
+ reservedSlots.add(offer);
}
Future<Iterable<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots(
@@ -674,22 +689,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// check if the response is still valid
if (isJobManagerConnectionValid(jobId, leaderId)) {
// mark accepted slots active
- for (SlotOffer acceptedSlot: acceptedSlots) {
- try {
- if (!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
- // the slot is either free or releasing at the moment
- final String message = "Could not mark slot " + jobId + " active.";
- log.debug(message);
- jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
- leaderId, new Exception(message));
- }
-
- // remove the assigned slots so that we can free the left overs
- reservedSlots.remove(acceptedSlot);
- } catch (SlotNotFoundException e) {
- log.debug("Could not mark slot {} active.", acceptedSlot, e);
- jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), leaderId, e);
- }
+ for (SlotOffer acceptedSlot : acceptedSlots) {
+ reservedSlots.remove(acceptedSlot);
}
final Exception e = new Exception("The slot was rejected by the JobManager.");
http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index aacd329..31bf9b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -696,4 +696,172 @@ public class TaskExecutorTest extends TestLogger {
}
}
+
+ /**
+ * This tests task executor receive SubmitTask before OfferSlot response.
+ */
+ @Test
+ public void testSubmitTaskBeforeAcceptSlot() throws Exception {
+ final JobID jobId = new JobID();
+
+ final TestingSerialRpcService rpc = new TestingSerialRpcService();
+ final Configuration configuration = new Configuration();
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ final ResourceID resourceId = new ResourceID("foobar");
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ final TimerService<AllocationID> timerService = mock(TimerService.class);
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), mock(ResourceProfile.class)), timerService);
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+ final String resourceManagerAddress = "rm";
+ final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+ final String jobManagerAddress = "jm";
+ final UUID jobManagerLeaderId = UUID.randomUUID();
+
+ final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
+ final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+ haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+ haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
+
+ final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
+ final InstanceID registrationId = new InstanceID();
+
+ when(resourceManagerGateway.registerTaskExecutor(
+ eq(resourceManagerLeaderId),
+ any(String.class),
+ eq(resourceId),
+ any(SlotReport.class),
+ any(Time.class))).thenReturn(
+ FlinkCompletableFuture.<RegistrationResponse>completed(new TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+ final ResourceID jmResourceId = new ResourceID(jobManagerAddress);
+ final int blobPort = 42;
+
+ final AllocationID allocationId1 = new AllocationID();
+ final AllocationID allocationId2 = new AllocationID();
+
+ final SlotOffer offer1 = new SlotOffer(allocationId1, 0, ResourceProfile.UNKNOWN);
+
+ final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+
+ when(jobMasterGateway.registerTaskManager(
+ any(String.class),
+ eq(taskManagerLocation),
+ eq(jobManagerLeaderId),
+ any(Time.class)
+ )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId, blobPort)));
+ when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+
+ rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
+ rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+ final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class);
+ when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+
+ final JobManagerConnection jobManagerConnection = new JobManagerConnection(
+ jobMasterGateway,
+ jobManagerLeaderId,
+ mock(TaskManagerActions.class),
+ mock(CheckpointResponder.class),
+ libraryCacheManager,
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(PartitionProducerStateChecker.class));
+
+ jobManagerTable.put(jobId, jobManagerConnection);
+
+ try {
+ final TaskExecutor taskManager = new TaskExecutor(
+ taskManagerConfiguration,
+ taskManagerLocation,
+ rpc,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ mock(NetworkEnvironment.class),
+ haServices,
+ mock(MetricRegistry.class),
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ jobManagerTable,
+ jobLeaderService,
+ testingFatalErrorHandler);
+ taskManager.start();
+ taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
+ taskSlotTable.allocateSlot(1, jobId, allocationId2, Time.milliseconds(10000L));
+
+ final JobVertexID jobVertexId = new JobVertexID();
+
+ JobInformation jobInformation = new JobInformation(
+ jobId,
+ name.getMethodName(),
+ new SerializedValue<>(new ExecutionConfig()),
+ new Configuration(),
+ Collections.<BlobKey>emptyList(),
+ Collections.<URL>emptyList());
+
+ TaskInformation taskInformation = new TaskInformation(
+ jobVertexId,
+ "test task",
+ 1,
+ 1,
+ TestInvokable.class.getName(),
+ new Configuration());
+
+ SerializedValue<JobInformation> serializedJobInformation = new SerializedValue<>(jobInformation);
+ SerializedValue<TaskInformation> serializedJobVertexInformation = new SerializedValue<>(taskInformation);
+
+ final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+ serializedJobInformation,
+ serializedJobVertexInformation,
+ new ExecutionAttemptID(),
+ allocationId1,
+ 0,
+ 0,
+ 0,
+ null,
+ Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+ Collections.<InputGateDeploymentDescriptor>emptyList());
+
+ CompletableFuture<Iterable<SlotOffer>> offerResultFuture = new FlinkCompletableFuture<>();
+
+ // submit task first and then return acceptance response
+ when(
+ jobMasterGateway.offerSlots(
+ any(ResourceID.class),
+ any(Iterable.class),
+ eq(jobManagerLeaderId),
+ any(Time.class)))
+ .thenReturn(offerResultFuture);
+
+ // we have to add the job after the TaskExecutor, because otherwise the service has not
+ // been properly started. This will also offer the slots to the job master
+ jobLeaderService.addJob(jobId, jobManagerAddress);
+
+ verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class));
+
+ // submit the task without having acknowledge the offered slots
+ taskManager.submitTask(tdd, jobManagerLeaderId);
+
+ // acknowledge the offered slots
+ offerResultFuture.complete(Collections.singleton(offer1));
+
+ verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), eq(registrationId), eq(new SlotID(resourceId, 1)));
+
+ assertTrue(taskSlotTable.existsActiveSlot(jobId, allocationId1));
+ assertFalse(taskSlotTable.existsActiveSlot(jobId, allocationId2));
+ assertTrue(taskSlotTable.isSlotFree(1));
+
+ // check if a concurrent error occurred
+ testingFatalErrorHandler.rethrowError();
+ } finally {
+ rpc.stopService();
+ }
+
+ }
}