You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/14 14:37:42 UTC

[GitHub] [flink] rmetzger commented on a change in pull request #13639: [FLINK-19237] Fix rejected slot offer bug in JobMaster

rmetzger commented on a change in pull request #13639:
URL: https://github.com/apache/flink/pull/13639#discussion_r504731335



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
##########
@@ -280,6 +286,65 @@ public static void teardownClass() {
 		}
 	}
 
+	/**
+	 * This test ensures that the bookkeeping of TaskExecutors in the JobMaster handles cases where TaskExecutors with the same
+	 * ID re-register properly. FLINK-19237 was a bug where the TaskExecutors and the SlotPool got out of sync, and
+	 * slot offers were rejected.
+	 */
+	@Test
+	public void testAcceptSlotOfferAfterLeaderchange() throws Exception {
+
+		final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
+		final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);
+
+		final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);
+
+		final TestingJobMaster jobMaster = new TestingJobMaster(
+			rpcService,
+			jobMasterConfiguration,
+			jmResourceId,
+			jobGraph,
+			haServices,
+			SlotPoolFactory.fromConfiguration(configuration),
+			jobManagerSharedServices,
+			heartbeatServices,
+			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			new JobMasterBuilder.TestingOnCompletionActions(),
+			testingFatalErrorHandler,
+			JobMasterTest.class.getClassLoader(),
+			schedulerNGFactory,
+			NettyShuffleMaster.INSTANCE,
+			NoOpJobMasterPartitionTracker.FACTORY,
+			new DefaultExecutionDeploymentTracker(),
+			DefaultExecutionDeploymentReconciler::new,
+			System.currentTimeMillis());
+
+		jobMaster.start(jobMasterId).get();
+
+		log.info("Register TaskManager");
+
+		String testingTaskManagerAddress = "fake";
+		UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new LocalUnresolvedTaskManagerLocation();
+		TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+		rpcService.registerGateway(testingTaskManagerAddress, testingTaskExecutorGateway);
+		Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress, unresolvedTaskManagerLocation, testingTimeout).get(), instanceOf(RegistrationResponse.Success.class));
+
+		log.info("Revoke leadership & re-grant leadership");
+		jobMaster.suspend(new FlinkException("Lost leadership")).get();
+
+		jobMaster.start(JobMasterId.generate()).get();
+
+		log.info("re-register same TaskManager");
+		Assert.assertThat(jobMaster.registerTaskManager(testingTaskManagerAddress, unresolvedTaskManagerLocation, testingTimeout).get(), instanceOf(RegistrationResponse.Success.class));
+
+		log.info("Ensure JobMaster accepts slot offer");
+		final SlotOffer slotOffer = new SlotOffer(new AllocationID(), 0, ResourceProfile.ANY);
+
+		Collection<SlotOffer> acceptedSlots = jobMaster.executeInMainThreadExecutor(() -> jobMaster.offerSlots(unresolvedTaskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get()).get();
+		Assert.assertThat(acceptedSlots.size(), is(1));
+

Review comment:
       I will remove this empty line before merging or when addressing comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org