You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 16:27:26 UTC
[2/5] flink git commit: [FLINK-8393] [flip6] Reconnect to last known
JobMaster when connection is lost
[FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost
In case of a heartbeat timeout or a disconnect call, the TaskExecutor tries to
reconnect to the last known JobMaster location.
This closes #5267.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63d4819e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63d4819e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63d4819e
Branch: refs/heads/master
Commit: 63d4819e197b1df1651157fd8f86c8ca0540d0b1
Parents: 438e4e3
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 20:37:08 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:14 2018 +0100
----------------------------------------------------------------------
.../registration/RegisteredRpcConnection.java | 101 ++++++++++++++-----
.../runtime/taskexecutor/JobLeaderService.java | 45 ++++++++-
.../runtime/taskexecutor/TaskExecutor.java | 27 ++++-
.../taskexecutor/slot/TaskSlotTable.java | 13 ++-
.../RegisteredRpcConnectionTest.java | 79 ++++++++++++---
5 files changed, 219 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index c76bcf8..7d2c35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -46,6 +47,11 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
public abstract class RegisteredRpcConnection<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> {
+ private static final AtomicReferenceFieldUpdater<RegisteredRpcConnection, RetryingRegistration> REGISTRATION_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+ RegisteredRpcConnection.class,
+ RetryingRegistration.class,
+ "pendingRegistration");
+
/** The logger for all log messages of this class. */
protected final Logger log;
@@ -59,7 +65,7 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
private final Executor executor;
/** The Registration of this RPC connection. */
- private RetryingRegistration<F, G, S> pendingRegistration;
+ private volatile RetryingRegistration<F, G, S> pendingRegistration;
/** The gateway to register, it's null until the registration is completed. */
private volatile G targetGateway;
@@ -85,27 +91,47 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
- pendingRegistration = checkNotNull(generateRegistration());
- pendingRegistration.startRegistration();
+ final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
- CompletableFuture<Tuple2<G, S>> future = pendingRegistration.getFuture();
+ if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
+ newRegistration.startRegistration();
+ } else {
+ // concurrent start operation
+ newRegistration.cancel();
+ }
+ }
- future.whenCompleteAsync(
- (Tuple2<G, S> result, Throwable failure) -> {
- if (failure != null) {
- if (failure instanceof CancellationException) {
- // we ignore cancellation exceptions because they originate from cancelling
- // the RetryingRegistration
- log.debug("Retrying registration towards {} was cancelled.", targetAddress);
- } else {
- // this future should only ever fail if there is a bug, not if the registration is declined
- onRegistrationFailure(failure);
- }
- } else {
- targetGateway = result.f0;
- onRegistrationSuccess(result.f1);
- }
- }, executor);
+ public boolean tryReconnect() {
+ checkState(isConnected(), "Cannot reconnect to an unknown destination.");
+
+ if (closed) {
+ return false;
+ } else {
+ final RetryingRegistration<F, G, S> currentPendingRegistration = pendingRegistration;
+
+ if (currentPendingRegistration != null) {
+ currentPendingRegistration.cancel();
+ }
+
+ final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
+
+ if (REGISTRATION_UPDATER.compareAndSet(this, currentPendingRegistration, newRegistration)) {
+ newRegistration.startRegistration();
+ } else {
+ // concurrent modification
+ newRegistration.cancel();
+ return false;
+ }
+
+ // double check for concurrent close operations
+ if (closed) {
+ newRegistration.cancel();
+
+ return false;
+ } else {
+ return true;
+ }
+ }
}
/**
@@ -175,13 +201,42 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
}
if (isClosed()) {
- connectionInfo = connectionInfo + " is closed";
+ connectionInfo += " is closed";
} else if (isConnected()){
- connectionInfo = connectionInfo + " is established";
+ connectionInfo += " is established";
} else {
- connectionInfo = connectionInfo + " is connecting";
+ connectionInfo += " is connecting";
}
return connectionInfo;
}
+
+ // ------------------------------------------------------------------------
+ // Internal methods
+ // ------------------------------------------------------------------------
+
+ private RetryingRegistration<F, G, S> createNewRegistration() {
+ RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
+
+ CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();
+
+ future.whenCompleteAsync(
+ (Tuple2<G, S> result, Throwable failure) -> {
+ if (failure != null) {
+ if (failure instanceof CancellationException) {
+ // we ignore cancellation exceptions because they originate from cancelling
+ // the RetryingRegistration
+ log.debug("Retrying registration towards {} was cancelled.", targetAddress);
+ } else {
+ // this future should only ever fail if there is a bug, not if the registration is declined
+ onRegistrationFailure(failure);
+ }
+ } else {
+ targetGateway = result.f0;
+ onRegistrationSuccess(result.f1);
+ }
+ }, executor);
+
+ return newRegistration;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 77737e1..3b4da4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -205,6 +205,23 @@ public class JobLeaderService {
}
/**
+ * Triggers reconnection to the last known leader of the given job.
+ *
+ * @param jobId specifying the job for which to trigger reconnection
+ */
+ public void reconnect(final JobID jobId) {
+ Preconditions.checkNotNull(jobId, "JobID must not be null.");
+
+ final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> jobLeaderService = jobLeaderServices.get(jobId);
+
+ if (jobLeaderService != null) {
+ jobLeaderService.f1.reconnect();
+ } else {
+ LOG.info("Cannot reconnect to job {} because it is not registered.", jobId);
+ }
+ }
+
+ /**
* Leader listener which tries to establish a connection to a newly detected job leader.
*/
private final class JobManagerLeaderListener implements LeaderRetrievalListener {
@@ -213,7 +230,7 @@ public class JobLeaderService {
private final JobID jobId;
/** Rpc connection to the job leader. */
- private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+ private volatile RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
/** State of the listener. */
private volatile boolean stopped;
@@ -237,6 +254,32 @@ public class JobLeaderService {
}
}
+ public void reconnect() {
+ if (stopped) {
+ LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
+ } else {
+ final RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> currentRpcConnection = rpcConnection;
+
+ if (currentRpcConnection != null) {
+ if (currentRpcConnection.isConnected()) {
+
+ if (currentRpcConnection.tryReconnect()) {
+ // double check for concurrent stop operation
+ if (stopped) {
+ currentRpcConnection.close();
+ }
+ } else {
+ LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress());
+ }
+ } else {
+ LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress());
+ }
+ } else {
+ LOG.debug("Cannot reconnect to an unknown JobMaster.");
+ }
+ }
+ }
+
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
if (stopped) {
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5577472..3c7d1cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -687,6 +687,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
closeJobManagerConnection(jobId, cause);
+ jobLeaderService.reconnect(jobId);
}
@Override
@@ -1079,16 +1080,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
Preconditions.checkNotNull(allocationId);
try {
- int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause);
+ TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause);
- if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
+ if (taskSlot != null && isConnectedToResourceManager()) {
// the slot was freed. Tell the RM about it
ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
resourceManagerGateway.notifySlotAvailable(
resourceManagerConnection.getRegistrationId(),
- new SlotID(getResourceID(), freedSlotIndex),
+ new SlotID(getResourceID(), taskSlot.getIndex()),
allocationId);
+
+ // check whether we still have allocated slots for the same job
+ final JobID jobId = taskSlot.getJobId();
+ final Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+
+ if (!tasks.hasNext()) {
+ // we can remove the job from the job leader service
+ try {
+ jobLeaderService.removeJob(jobId);
+ } catch (Exception e) {
+ log.info("Could not remove job {} from JobLeaderService.", jobId, e);
+ }
+
+ closeJobManagerConnection(
+ jobId,
+ new FlinkException("TaskExecutor " + getAddress() +
+ " has no more allocated slots for job " + jobId + '.'));
+ }
}
} catch (SlotNotFoundException e) {
log.debug("Could not free slot for allocation id {}.", allocationId, e);
@@ -1295,6 +1314,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
closeJobManagerConnection(
jobManagerConnection.getJobID(),
new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
+
+ jobLeaderService.reconnect(jobManagerConnection.getJobID());
}
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 62101e7..ab62a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -266,7 +268,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @throws SlotNotFoundException if there is not task slot for the given allocation id
* @return Index of the freed slot if the slot could be freed; otherwise -1
*/
- public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+ public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException {
return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
}
@@ -278,9 +280,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
* @param allocationId identifying the task slot to be freed
* @param cause to fail the tasks with if slot is not empty
* @throws SlotNotFoundException if there is not task slot for the given allocation id
- * @return Index of the freed slot if the slot could be freed; otherwise -1
+ * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
*/
- public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+ @Nullable
+ public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
checkInit();
TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -314,7 +317,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
slotsPerJob.remove(jobId);
}
- return taskSlot.getIndex();
+ return taskSlot;
} else {
// we couldn't free the task slot because it still contains task, fail the tasks
// and set the slot state to releasing so that it gets eventually freed
@@ -326,7 +329,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
taskIterator.next().failExternally(cause);
}
- return -1;
+ return null;
}
} else {
throw new SlotNotFoundException(allocationId);
http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 19a5756..650a0f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -27,12 +27,15 @@ import org.junit.Test;
import org.slf4j.LoggerFactory;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
@@ -60,14 +63,14 @@ public class RegisteredRpcConnectionTest extends TestLogger {
connection.start();
//wait for connection established
- Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+ final String actualConnectionId = connection.getConnectionFuture().get();
// validate correct invocation and result
assertTrue(connection.isConnected());
assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
assertEquals(leaderId, connection.getTargetLeaderId());
assertEquals(testGateway, connection.getTargetGateway());
- assertEquals(connectionID, connection.getConnectionId());
+ assertEquals(connectionID, actualConnectionId);
}
finally {
testGateway.stop();
@@ -86,8 +89,9 @@ public class RegisteredRpcConnectionTest extends TestLogger {
try {
// gateway that upon calls Throw an exception
TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+ final RuntimeException registrationException = new RuntimeException(connectionFailureMessage);
when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow(
- new RuntimeException(connectionFailureMessage));
+ registrationException);
rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
@@ -95,14 +99,18 @@ public class RegisteredRpcConnectionTest extends TestLogger {
connection.start();
//wait for connection failure
- Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+ try {
+ connection.getConnectionFuture().get();
+ fail("expected failure.");
+ } catch (ExecutionException ee) {
+ assertEquals(registrationException, ee.getCause());
+ }
// validate correct invocation and result
assertFalse(connection.isConnected());
assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
assertEquals(leaderId, connection.getTargetLeaderId());
assertNull(connection.getTargetGateway());
- assertEquals(connectionFailureMessage, connection.getFailareMessage());
}
finally {
rpcService.stopService();
@@ -137,21 +145,53 @@ public class RegisteredRpcConnectionTest extends TestLogger {
}
}
+ @Test
+ public void testReconnect() throws Exception {
+ final String connectionId1 = "Test RPC Connection ID 1";
+ final String connectionId2 = "Test RPC Connection ID 2";
+ final TestingRpcService rpcService = new TestingRpcService();
+ final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+ final UUID leaderId = UUID.randomUUID();
+ final TestRegistrationGateway testGateway = new TestRegistrationGateway(
+ new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1),
+ new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2));
+
+ try {
+ rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+ TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+ connection.start();
+
+ final String actualConnectionId1 = connection.getConnectionFuture().get();
+
+ assertEquals(actualConnectionId1, connectionId1);
+
+ assertTrue(connection.tryReconnect());
+
+ final String actualConnectionId2 = connection.getConnectionFuture().get();
+
+ assertEquals(actualConnectionId2, connectionId2);
+ } finally {
+ rpcService.stopService();
+ }
+ }
+
// ------------------------------------------------------------------------
// test RegisteredRpcConnection
// ------------------------------------------------------------------------
private static class TestRpcConnection extends RegisteredRpcConnection<UUID, TestRegistrationGateway, TestRegistrationSuccess> {
- private final RpcService rpcService;
+ private final Object lock = new Object();
- private String connectionId;
+ private final RpcService rpcService;
- private String failureMessage;
+ private CompletableFuture<String> connectionFuture;
public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor, RpcService rpcService) {
super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
this.rpcService = rpcService;
+ this.connectionFuture = new CompletableFuture<>();
}
@Override
@@ -161,20 +201,31 @@ public class RegisteredRpcConnectionTest extends TestLogger {
@Override
protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) {
- connectionId = success.getCorrelationId();
+ synchronized (lock) {
+ connectionFuture.complete(success.getCorrelationId());
+ }
}
@Override
protected void onRegistrationFailure(Throwable failure) {
- failureMessage = failure.getMessage();
+ synchronized (lock) {
+ connectionFuture.completeExceptionally(failure);
+ }
}
- public String getConnectionId() {
- return connectionId;
+ @Override
+ public boolean tryReconnect() {
+ synchronized (lock) {
+ connectionFuture.cancel(false);
+ connectionFuture = new CompletableFuture<>();
+ }
+ return super.tryReconnect();
}
- public String getFailareMessage() {
- return failureMessage;
+ public CompletableFuture<String> getConnectionFuture() {
+ synchronized (lock) {
+ return connectionFuture;
+ }
}
}
}