You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/22 21:53:22 UTC

[3/3] flink git commit: [FLINK-8462] [flip6] Filter invalid heartbeat timeouts in TaskExecutor

[FLINK-8462] [flip6] Filter invalid heartbeat timeouts in TaskExecutor

This commit properly stops the heartbeating of disconnected RMs and additionally
ignores outdated heartbeat timeouts for old RM connections out.

This closes #5318.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776af4a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776af4a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776af4a8

Branch: refs/heads/master
Commit: 776af4a882c85926fc0764b702fec717c675e34c
Parents: 9a0399c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 19 14:56:42 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 18:49:16 2018 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  36 ++--
 .../utils/TestingResourceManagerGateway.java    |  30 +++-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 165 +++++++++++++++++++
 3 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e88cf58..0f98c49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -741,15 +741,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			}
 
 			// drop the current connection or connection attempt
-			if (resourceManagerConnection != null) {
-				resourceManagerConnection.close();
-				resourceManagerConnection = null;
-			}
+			closeResourceManagerConnection(
+				new FlinkException("New ResourceManager leader found under: " + newLeaderAddress +
+					'(' + newResourceManagerId + ')'));
 		}
 
 		// establish a connection to the new leader
 		if (newLeaderAddress != null) {
-			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+			log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
 			resourceManagerConnection =
 				new TaskExecutorToResourceManagerConnection(
 					log,
@@ -784,13 +783,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
-		if (isConnectedToResourceManager()) {
-			log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+		if (resourceManagerConnection != null) {
 
-			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+			if (resourceManagerConnection.isConnected()) {
+				log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+				resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
 
-			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-			resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+				resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+			} else {
+				log.info("Terminating registration attempts towards ResourceManager {}.", resourceManagerConnection.getTargetAddress(), cause);
+			}
 
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
@@ -1361,11 +1364,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		@Override
 		public void notifyHeartbeatTimeout(final ResourceID resourceId) {
 			runAsync(() -> {
-				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+				// first check whether the timeout is still valid
+				if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
+					log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
-				closeResourceManagerConnection(
-					new TimeoutException(
-						"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+					closeResourceManagerConnection(
+						new TimeoutException(
+							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+				} else {
+					log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
+				}
 			});
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index dc1635a..1428e02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -47,6 +48,7 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
@@ -55,7 +57,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	private final ResourceManagerId resourceManagerId;
 
-	private final ResourceID resourceId;
+	private final ResourceID ownResourceId;
 
 	private final long heartbeatInterval;
 
@@ -73,6 +75,8 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
 
+	private volatile Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction;
+
 	public TestingResourceManagerGateway() {
 		this(
 			ResourceManagerId.generate(),
@@ -89,7 +93,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 			String address,
 			String hostname) {
 		this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
-		this.resourceId = Preconditions.checkNotNull(resourceId);
+		this.ownResourceId = Preconditions.checkNotNull(resourceId);
 		this.heartbeatInterval = heartbeatInterval;
 		this.address = Preconditions.checkNotNull(address);
 		this.hostname = Preconditions.checkNotNull(hostname);
@@ -118,6 +122,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 		this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
 	}
 
+	public void setRegisterTaskExecutorFunction(Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction) {
+		this.registerTaskExecutorFunction = registerTaskExecutorFunction;
+	}
+
 	@Override
 	public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
 		final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
@@ -130,7 +138,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 			new JobMasterRegistrationSuccess(
 				heartbeatInterval,
 				resourceManagerId,
-				resourceId));
+				ownResourceId));
 	}
 
 	@Override
@@ -161,11 +169,17 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
 
 	@Override
 	public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
-		return CompletableFuture.completedFuture(
-			new TaskExecutorRegistrationSuccess(
-				new InstanceID(),
-				resourceId,
-				heartbeatInterval));
+		final Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> currentFunction = registerTaskExecutorFunction;
+
+		if (currentFunction != null) {
+			return currentFunction.apply(Tuple5.of(taskExecutorAddress, resourceId, slotReport, dataPort, hardwareDescription));
+		} else {
+			return CompletableFuture.completedFuture(
+				new TaskExecutorRegistrationSuccess(
+					new InstanceID(),
+					ownResourceId,
+					heartbeatInterval));
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2f0cbd6..0f08512 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -70,7 +70,9 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -78,10 +80,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
@@ -108,12 +112,16 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -137,11 +145,14 @@ public class TaskExecutorTest extends TestLogger {
 	private final Time timeout = Time.milliseconds(10000L);
 	private final File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
+	private TimerService<AllocationID> timerService;
+
 	private TestingRpcService rpc;
 
 	@Before
 	public void setup() {
 		rpc = new TestingRpcService();
+		timerService = new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
 	}
 
 	@After
@@ -150,6 +161,11 @@ public class TaskExecutorTest extends TestLogger {
 			rpc.stopService();
 			rpc = null;
 		}
+
+		if (timerService != null) {
+			timerService.stop();
+			timerService = null;
+		}
 	}
 
 	@Rule
@@ -1449,4 +1465,153 @@ public class TaskExecutorTest extends TestLogger {
 			taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
+
+	/**
+	 * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader.
+	 *
+	 * <p>See FLINK-8462
+	 */
+	@Test
+	public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
+		final long heartbeatInterval = 1L;
+		final long heartbeatTimeout = 10000L;
+		final long pollTimeout = 1000L;
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
+		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
+		final ResourceID rmResourceID = ResourceID.generate();
+
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+		final String rmAddress = "rm";
+		final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
+			ResourceManagerId.generate(),
+			rmResourceID,
+			heartbeatInterval,
+			rmAddress,
+			rmAddress);
+
+		rpc.registerGateway(rmAddress, rmGateway);
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			new TaskExecutorLocalStateStoresManager(),
+			mock(NetworkEnvironment.class),
+			haServices,
+			heartbeatServices,
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
+
+		try {
+			taskExecutor.start();
+
+			final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
+			final BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
+
+			rmLeaderRetrievalService.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());
+
+			// wait for TM registration by checking the registered heartbeat targets
+			assertThat(
+				monitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS),
+				equalTo(rmResourceID));
+
+			// let RM lose leadership
+			rmLeaderRetrievalService.notifyListener(null, null);
+
+			// the timeout should not have triggered since it is much higher
+			assertThat(unmonitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS), equalTo(rmResourceID));
+
+			testingFatalErrorHandler.rethrowError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+		}
+	}
+
+	/**
+	 * Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}.
+	 */
+	private static final class RecordingHeartbeatServices extends HeartbeatServices {
+
+		private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+		private final BlockingQueue<ResourceID> monitoredTargets;
+
+		public RecordingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
+			super(heartbeatInterval, heartbeatTimeout);
+
+			this.unmonitoredTargets = new ArrayBlockingQueue<>(1);
+			this.monitoredTargets = new ArrayBlockingQueue<>(1);
+		}
+
+		@Override
+		public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) {
+			return new RecordingHeartbeatManagerImpl<>(
+				heartbeatTimeout,
+				resourceId,
+				heartbeatListener,
+				scheduledExecutor,
+				scheduledExecutor,
+				log,
+				unmonitoredTargets,
+				monitoredTargets);
+		}
+
+		public BlockingQueue<ResourceID> getUnmonitoredTargets() {
+			return unmonitoredTargets;
+		}
+
+		public BlockingQueue<ResourceID> getMonitoredTargets() {
+			return monitoredTargets;
+		}
+	}
+
+	/**
+	 * {@link HeartbeatManagerImpl} which records the unmonitored targets.
+	 */
+	private static final class RecordingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
+
+		private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+		private final BlockingQueue<ResourceID> monitoredTargets;
+
+		public RecordingHeartbeatManagerImpl(
+				long heartbeatTimeoutIntervalMs,
+				ResourceID ownResourceID,
+				HeartbeatListener<I, O> heartbeatListener,
+				Executor executor,
+				ScheduledExecutor scheduledExecutor,
+				Logger log,
+				BlockingQueue<ResourceID> unmonitoredTargets,
+				BlockingQueue<ResourceID> monitoredTargets) {
+			super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
+			this.unmonitoredTargets = unmonitoredTargets;
+			this.monitoredTargets = monitoredTargets;
+		}
+
+		@Override
+		public void unmonitorTarget(ResourceID resourceID) {
+			super.unmonitorTarget(resourceID);
+			unmonitoredTargets.offer(resourceID);
+		}
+
+		@Override
+		public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
+			super.monitorTarget(resourceID, heartbeatTarget);
+			monitoredTargets.offer(resourceID);
+		}
+	}
 }