You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/22 21:53:22 UTC
[3/3] flink git commit: [FLINK-8462] [flip6] Filter invalid heartbeat
timeouts in TaskExecutor
[FLINK-8462] [flip6] Filter invalid heartbeat timeouts in TaskExecutor
This commit properly stops the heartbeating of disconnected RMs and additionally
ignores outdated heartbeat timeouts for old RM connections out.
This closes #5318.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/776af4a8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/776af4a8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/776af4a8
Branch: refs/heads/master
Commit: 776af4a882c85926fc0764b702fec717c675e34c
Parents: 9a0399c
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Jan 19 14:56:42 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Jan 22 18:49:16 2018 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 36 ++--
.../utils/TestingResourceManagerGateway.java | 30 +++-
.../runtime/taskexecutor/TaskExecutorTest.java | 165 +++++++++++++++++++
3 files changed, 209 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e88cf58..0f98c49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -741,15 +741,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
// drop the current connection or connection attempt
- if (resourceManagerConnection != null) {
- resourceManagerConnection.close();
- resourceManagerConnection = null;
- }
+ closeResourceManagerConnection(
+ new FlinkException("New ResourceManager leader found under: " + newLeaderAddress +
+ '(' + newResourceManagerId + ')'));
}
// establish a connection to the new leader
if (newLeaderAddress != null) {
- log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
+ log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId);
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
@@ -784,13 +783,17 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
}
private void closeResourceManagerConnection(Exception cause) {
- if (isConnectedToResourceManager()) {
- log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+ if (resourceManagerConnection != null) {
- resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+ if (resourceManagerConnection.isConnected()) {
+ log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerId(), cause);
+ resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
- ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
- resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ } else {
+ log.info("Terminating registration attempts towards ResourceManager {}.", resourceManagerConnection.getTargetAddress(), cause);
+ }
resourceManagerConnection.close();
resourceManagerConnection = null;
@@ -1361,11 +1364,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceId) {
runAsync(() -> {
- log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
+ // first check whether the timeout is still valid
+ if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
+ log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
- closeResourceManagerConnection(
- new TimeoutException(
- "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+ closeResourceManagerConnection(
+ new TimeoutException(
+ "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
+ } else {
+ log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
+ }
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index dc1635a..1428e02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -47,6 +48,7 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
/**
* Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
@@ -55,7 +57,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
private final ResourceManagerId resourceManagerId;
- private final ResourceID resourceId;
+ private final ResourceID ownResourceId;
private final long heartbeatInterval;
@@ -73,6 +75,8 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
private volatile Consumer<Tuple2<JobID, Throwable>> disconnectJobManagerConsumer;
+ private volatile Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction;
+
public TestingResourceManagerGateway() {
this(
ResourceManagerId.generate(),
@@ -89,7 +93,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
String address,
String hostname) {
this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
- this.resourceId = Preconditions.checkNotNull(resourceId);
+ this.ownResourceId = Preconditions.checkNotNull(resourceId);
this.heartbeatInterval = heartbeatInterval;
this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
@@ -118,6 +122,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
this.disconnectJobManagerConsumer = disconnectJobManagerConsumer;
}
+ public void setRegisterTaskExecutorFunction(Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> registerTaskExecutorFunction) {
+ this.registerTaskExecutorFunction = registerTaskExecutorFunction;
+ }
+
@Override
public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
final Consumer<Tuple4<JobMasterId, ResourceID, String, JobID>> currentConsumer = registerJobManagerConsumer;
@@ -130,7 +138,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
new JobMasterRegistrationSuccess(
heartbeatInterval,
resourceManagerId,
- resourceId));
+ ownResourceId));
}
@Override
@@ -161,11 +169,17 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {
@Override
public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
- return CompletableFuture.completedFuture(
- new TaskExecutorRegistrationSuccess(
- new InstanceID(),
- resourceId,
- heartbeatInterval));
+ final Function<Tuple5<String, ResourceID, SlotReport, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>> currentFunction = registerTaskExecutorFunction;
+
+ if (currentFunction != null) {
+ return currentFunction.apply(Tuple5.of(taskExecutorAddress, resourceId, slotReport, dataPort, hardwareDescription));
+ } else {
+ return CompletableFuture.completedFuture(
+ new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ ownResourceId,
+ heartbeatInterval));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/776af4a8/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2f0cbd6..0f08512 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -70,7 +70,9 @@ import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -78,10 +80,12 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.ExceptionUtils;
@@ -108,12 +112,16 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -137,11 +145,14 @@ public class TaskExecutorTest extends TestLogger {
private final Time timeout = Time.milliseconds(10000L);
private final File tempDir = new File(System.getProperty("java.io.tmpdir"));
+ private TimerService<AllocationID> timerService;
+
private TestingRpcService rpc;
@Before
public void setup() {
rpc = new TestingRpcService();
+ timerService = new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
}
@After
@@ -150,6 +161,11 @@ public class TaskExecutorTest extends TestLogger {
rpc.stopService();
rpc = null;
}
+
+ if (timerService != null) {
+ timerService.stop();
+ timerService = null;
+ }
}
@Rule
@@ -1449,4 +1465,153 @@ public class TaskExecutorTest extends TestLogger {
taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}
+
+ /**
+ * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is no longer leader.
+ *
+ * <p>See FLINK-8462
+ */
+ @Test
+ public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
+ final long heartbeatInterval = 1L;
+ final long heartbeatTimeout = 10000L;
+ final long pollTimeout = 1000L;
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
+ final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
+ final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+ final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval, heartbeatTimeout);
+ final ResourceID rmResourceID = ResourceID.generate();
+
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+ final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+ final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+ haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+
+ final String rmAddress = "rm";
+ final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
+ ResourceManagerId.generate(),
+ rmResourceID,
+ heartbeatInterval,
+ rmAddress,
+ rmAddress);
+
+ rpc.registerGateway(rmAddress, rmGateway);
+
+ final TaskExecutor taskExecutor = new TaskExecutor(
+ rpc,
+ taskManagerConfiguration,
+ taskManagerLocation,
+ mock(MemoryManager.class),
+ mock(IOManager.class),
+ new TaskExecutorLocalStateStoresManager(),
+ mock(NetworkEnvironment.class),
+ haServices,
+ heartbeatServices,
+ mock(TaskManagerMetricGroup.class),
+ mock(BroadcastVariableManager.class),
+ mock(FileCache.class),
+ taskSlotTable,
+ mock(JobManagerTable.class),
+ mock(JobLeaderService.class),
+ testingFatalErrorHandler);
+
+ try {
+ taskExecutor.start();
+
+ final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
+ final BlockingQueue<ResourceID> monitoredTargets = heartbeatServices.getMonitoredTargets();
+
+ rmLeaderRetrievalService.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());
+
+ // wait for TM registration by checking the registered heartbeat targets
+ assertThat(
+ monitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS),
+ equalTo(rmResourceID));
+
+ // let RM lose leadership
+ rmLeaderRetrievalService.notifyListener(null, null);
+
+ // the timeout should not have triggered since it is much higher
+ assertThat(unmonitoredTargets.poll(pollTimeout, TimeUnit.MILLISECONDS), equalTo(rmResourceID));
+
+ testingFatalErrorHandler.rethrowError();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+ }
+ }
+
+ /**
+ * Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}.
+ */
+ private static final class RecordingHeartbeatServices extends HeartbeatServices {
+
+ private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+ private final BlockingQueue<ResourceID> monitoredTargets;
+
+ public RecordingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
+ super(heartbeatInterval, heartbeatTimeout);
+
+ this.unmonitoredTargets = new ArrayBlockingQueue<>(1);
+ this.monitoredTargets = new ArrayBlockingQueue<>(1);
+ }
+
+ @Override
+ public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger log) {
+ return new RecordingHeartbeatManagerImpl<>(
+ heartbeatTimeout,
+ resourceId,
+ heartbeatListener,
+ scheduledExecutor,
+ scheduledExecutor,
+ log,
+ unmonitoredTargets,
+ monitoredTargets);
+ }
+
+ public BlockingQueue<ResourceID> getUnmonitoredTargets() {
+ return unmonitoredTargets;
+ }
+
+ public BlockingQueue<ResourceID> getMonitoredTargets() {
+ return monitoredTargets;
+ }
+ }
+
+ /**
+ * {@link HeartbeatManagerImpl} which records the unmonitored targets.
+ */
+ private static final class RecordingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
+
+ private final BlockingQueue<ResourceID> unmonitoredTargets;
+
+ private final BlockingQueue<ResourceID> monitoredTargets;
+
+ public RecordingHeartbeatManagerImpl(
+ long heartbeatTimeoutIntervalMs,
+ ResourceID ownResourceID,
+ HeartbeatListener<I, O> heartbeatListener,
+ Executor executor,
+ ScheduledExecutor scheduledExecutor,
+ Logger log,
+ BlockingQueue<ResourceID> unmonitoredTargets,
+ BlockingQueue<ResourceID> monitoredTargets) {
+ super(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
+ this.unmonitoredTargets = unmonitoredTargets;
+ this.monitoredTargets = monitoredTargets;
+ }
+
+ @Override
+ public void unmonitorTarget(ResourceID resourceID) {
+ super.unmonitorTarget(resourceID);
+ unmonitoredTargets.offer(resourceID);
+ }
+
+ @Override
+ public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
+ super.monitorTarget(resourceID, heartbeatTarget);
+ monitoredTargets.offer(resourceID);
+ }
+ }
}