You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/10 16:27:26 UTC

[2/5] flink git commit: [FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost

[FLINK-8393] [flip6] Reconnect to last known JobMaster when connection is lost

In case of a heartbeat timeout or a disconnect call, the TaskExecutor tries to
reconnect to the last known JobMaster location.

This closes #5267.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63d4819e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63d4819e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63d4819e

Branch: refs/heads/master
Commit: 63d4819e197b1df1651157fd8f86c8ca0540d0b1
Parents: 438e4e3
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 9 20:37:08 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jan 10 17:14:14 2018 +0100

----------------------------------------------------------------------
 .../registration/RegisteredRpcConnection.java   | 101 ++++++++++++++-----
 .../runtime/taskexecutor/JobLeaderService.java  |  45 ++++++++-
 .../runtime/taskexecutor/TaskExecutor.java      |  27 ++++-
 .../taskexecutor/slot/TaskSlotTable.java        |  13 ++-
 .../RegisteredRpcConnectionTest.java            |  79 ++++++++++++---
 5 files changed, 219 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index c76bcf8..7d2c35a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -27,6 +27,7 @@ import java.io.Serializable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -46,6 +47,11 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public abstract class RegisteredRpcConnection<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> {
 
+	private static final AtomicReferenceFieldUpdater<RegisteredRpcConnection, RetryingRegistration> REGISTRATION_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+		RegisteredRpcConnection.class,
+		RetryingRegistration.class,
+		"pendingRegistration");
+
 	/** The logger for all log messages of this class. */
 	protected final Logger log;
 
@@ -59,7 +65,7 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 	private final Executor executor;
 
 	/** The Registration of this RPC connection. */
-	private RetryingRegistration<F, G, S> pendingRegistration;
+	private volatile RetryingRegistration<F, G, S> pendingRegistration;
 
 	/** The gateway to register, it's null until the registration is completed. */
 	private volatile G targetGateway;
@@ -85,27 +91,47 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 		checkState(!closed, "The RPC connection is already closed");
 		checkState(!isConnected() && pendingRegistration == null, "The RPC connection is already started");
 
-		pendingRegistration = checkNotNull(generateRegistration());
-		pendingRegistration.startRegistration();
+		final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
 
-		CompletableFuture<Tuple2<G, S>> future = pendingRegistration.getFuture();
+		if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
+			newRegistration.startRegistration();
+		} else {
+			// concurrent start operation
+			newRegistration.cancel();
+		}
+	}
 
-		future.whenCompleteAsync(
-			(Tuple2<G, S> result, Throwable failure) -> {
-				if (failure != null) {
-					if (failure instanceof CancellationException) {
-						// we ignore cancellation exceptions because they originate from cancelling
-						// the RetryingRegistration
-						log.debug("Retrying registration towards {} was cancelled.", targetAddress);
-					} else {
-						// this future should only ever fail if there is a bug, not if the registration is declined
-						onRegistrationFailure(failure);
-					}
-				} else {
-					targetGateway = result.f0;
-					onRegistrationSuccess(result.f1);
-				}
-			}, executor);
+	public boolean tryReconnect() {
+		checkState(isConnected(), "Cannot reconnect to an unknown destination.");
+
+		if (closed) {
+			return false;
+		} else {
+			final RetryingRegistration<F, G, S> currentPendingRegistration = pendingRegistration;
+
+			if (currentPendingRegistration != null) {
+				currentPendingRegistration.cancel();
+			}
+
+			final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
+
+			if (REGISTRATION_UPDATER.compareAndSet(this, currentPendingRegistration, newRegistration)) {
+				newRegistration.startRegistration();
+			} else {
+				// concurrent modification
+				newRegistration.cancel();
+				return false;
+			}
+
+			// double check for concurrent close operations
+			if (closed) {
+				newRegistration.cancel();
+
+				return false;
+			} else {
+				return true;
+			}
+		}
 	}
 
 	/**
@@ -175,13 +201,42 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 		}
 
 		if (isClosed()) {
-			connectionInfo = connectionInfo + " is closed";
+			connectionInfo += " is closed";
 		} else if (isConnected()){
-			connectionInfo = connectionInfo + " is established";
+			connectionInfo += " is established";
 		} else {
-			connectionInfo = connectionInfo + " is connecting";
+			connectionInfo += " is connecting";
 		}
 
 		return connectionInfo;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Internal methods
+	// ------------------------------------------------------------------------
+
+	private RetryingRegistration<F, G, S> createNewRegistration() {
+		RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
+
+		CompletableFuture<Tuple2<G, S>> future = newRegistration.getFuture();
+
+		future.whenCompleteAsync(
+			(Tuple2<G, S> result, Throwable failure) -> {
+				if (failure != null) {
+					if (failure instanceof CancellationException) {
+						// we ignore cancellation exceptions because they originate from cancelling
+						// the RetryingRegistration
+						log.debug("Retrying registration towards {} was cancelled.", targetAddress);
+					} else {
+						// this future should only ever fail if there is a bug, not if the registration is declined
+						onRegistrationFailure(failure);
+					}
+				} else {
+					targetGateway = result.f0;
+					onRegistrationSuccess(result.f1);
+				}
+			}, executor);
+
+		return newRegistration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 77737e1..3b4da4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -205,6 +205,23 @@ public class JobLeaderService {
 	}
 
 	/**
+	 * Triggers reconnection to the last known leader of the given job.
+	 *
+	 * @param jobId specifying the job for which to trigger reconnection
+	 */
+	public void reconnect(final JobID jobId) {
+		Preconditions.checkNotNull(jobId, "JobID must not be null.");
+
+		final Tuple2<LeaderRetrievalService, JobManagerLeaderListener> jobLeaderService = jobLeaderServices.get(jobId);
+
+		if (jobLeaderService != null) {
+			jobLeaderService.f1.reconnect();
+		} else {
+			LOG.info("Cannot reconnect to job {} because it is not registered.", jobId);
+		}
+	}
+
+	/**
 	 * Leader listener which tries to establish a connection to a newly detected job leader.
 	 */
 	private final class JobManagerLeaderListener implements LeaderRetrievalListener {
@@ -213,7 +230,7 @@ public class JobLeaderService {
 		private final JobID jobId;
 
 		/** Rpc connection to the job leader. */
-		private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+		private volatile RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
 
 		/** State of the listener. */
 		private volatile boolean stopped;
@@ -237,6 +254,32 @@ public class JobLeaderService {
 			}
 		}
 
+		public void reconnect() {
+			if (stopped) {
+				LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
+			} else {
+				final RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> currentRpcConnection = rpcConnection;
+
+				if (currentRpcConnection != null) {
+					if (currentRpcConnection.isConnected()) {
+
+						if (currentRpcConnection.tryReconnect()) {
+							// double check for concurrent stop operation
+							if (stopped) {
+								currentRpcConnection.close();
+							}
+						} else {
+							LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress());
+						}
+					} else {
+						LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress());
+					}
+				} else {
+					LOG.debug("Cannot reconnect to an unknown JobMaster.");
+				}
+			}
+		}
+
 		@Override
 		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
 			if (stopped) {

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5577472..3c7d1cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -687,6 +687,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 	@Override
 	public void disconnectJobManager(JobID jobId, Exception cause) {
 		closeJobManagerConnection(jobId, cause);
+		jobLeaderService.reconnect(jobId);
 	}
 
 	@Override
@@ -1079,16 +1080,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 		Preconditions.checkNotNull(allocationId);
 
 		try {
-			int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause);
+			TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause);
 
-			if (freedSlotIndex != -1 && isConnectedToResourceManager()) {
+			if (taskSlot != null && isConnectedToResourceManager()) {
 				// the slot was freed. Tell the RM about it
 				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
 				resourceManagerGateway.notifySlotAvailable(
 					resourceManagerConnection.getRegistrationId(),
-					new SlotID(getResourceID(), freedSlotIndex),
+					new SlotID(getResourceID(), taskSlot.getIndex()),
 					allocationId);
+
+				// check whether we still have allocated slots for the same job
+				final JobID jobId = taskSlot.getJobId();
+				final Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+
+				if (!tasks.hasNext()) {
+					// we can remove the job from the job leader service
+					try {
+						jobLeaderService.removeJob(jobId);
+					} catch (Exception e) {
+						log.info("Could not remove job {} from JobLeaderService.", jobId, e);
+					}
+
+					closeJobManagerConnection(
+						jobId,
+						new FlinkException("TaskExecutor " + getAddress() +
+							" has no more allocated slots for job " + jobId + '.'));
+				}
 			}
 		} catch (SlotNotFoundException e) {
 			log.debug("Could not free slot for allocation id {}.", allocationId, e);
@@ -1295,6 +1314,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 						closeJobManagerConnection(
 							jobManagerConnection.getJobID(),
 							new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
+
+						jobLeaderService.reconnect(jobManagerConnection.getJobID());
 					}
 				}
 			});

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 62101e7..ab62a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -33,6 +33,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -266,7 +268,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
 	 * @return Index of the freed slot if the slot could be freed; otherwise -1
 	 */
-	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+	public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 		return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
 	}
 
@@ -278,9 +280,10 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	 * @param allocationId identifying the task slot to be freed
 	 * @param cause to fail the tasks with if slot is not empty
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
-	 * @return Index of the freed slot if the slot could be freed; otherwise -1
+	 * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
 	 */
-	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
+	@Nullable
+	public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException {
 		checkInit();
 
 		TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -314,7 +317,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 					slotsPerJob.remove(jobId);
 				}
 
-				return taskSlot.getIndex();
+				return taskSlot;
 			} else {
 				// we couldn't free the task slot because it still contains task, fail the tasks
 				// and set the slot state to releasing so that it gets eventually freed
@@ -326,7 +329,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 					taskIterator.next().failExternally(cause);
 				}
 
-				return -1;
+				return null;
 			}
 		} else {
 			throw new SlotNotFoundException(allocationId);

http://git-wip-us.apache.org/repos/asf/flink/blob/63d4819e/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 19a5756..650a0f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -27,12 +27,15 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -60,14 +63,14 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 			connection.start();
 
 			//wait for connection established
-			Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+			final String actualConnectionId = connection.getConnectionFuture().get();
 
 			// validate correct invocation and result
 			assertTrue(connection.isConnected());
 			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
 			assertEquals(leaderId, connection.getTargetLeaderId());
 			assertEquals(testGateway, connection.getTargetGateway());
-			assertEquals(connectionID, connection.getConnectionId());
+			assertEquals(connectionID, actualConnectionId);
 		}
 		finally {
 			testGateway.stop();
@@ -86,8 +89,9 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 		try {
 			// gateway that upon calls Throw an exception
 			TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class);
+			final RuntimeException registrationException = new RuntimeException(connectionFailureMessage);
 			when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow(
-				new RuntimeException(connectionFailureMessage));
+				registrationException);
 
 			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
 
@@ -95,14 +99,18 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 			connection.start();
 
 			//wait for connection failure
-			Thread.sleep(RetryingRegistrationTest.TestRetryingRegistration.MAX_TIMEOUT);
+			try {
+				connection.getConnectionFuture().get();
+				fail("expected failure.");
+			} catch (ExecutionException ee) {
+				assertEquals(registrationException, ee.getCause());
+			}
 
 			// validate correct invocation and result
 			assertFalse(connection.isConnected());
 			assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress());
 			assertEquals(leaderId, connection.getTargetLeaderId());
 			assertNull(connection.getTargetGateway());
-			assertEquals(connectionFailureMessage, connection.getFailareMessage());
 		}
 		finally {
 			rpcService.stopService();
@@ -137,21 +145,53 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testReconnect() throws Exception {
+		final String connectionId1 = "Test RPC Connection ID 1";
+		final String connectionId2 = "Test RPC Connection ID 2";
+		final TestingRpcService rpcService = new TestingRpcService();
+		final String testRpcConnectionEndpointAddress = "<TestRpcConnectionEndpointAddress>";
+		final UUID leaderId = UUID.randomUUID();
+		final TestRegistrationGateway testGateway = new TestRegistrationGateway(
+			new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1),
+			new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2));
+
+		try {
+			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
+
+			TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
+			connection.start();
+
+			final String actualConnectionId1 = connection.getConnectionFuture().get();
+
+			assertEquals(actualConnectionId1, connectionId1);
+
+			assertTrue(connection.tryReconnect());
+
+			final String actualConnectionId2 = connection.getConnectionFuture().get();
+
+			assertEquals(actualConnectionId2, connectionId2);
+		} finally {
+			rpcService.stopService();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  test RegisteredRpcConnection
 	// ------------------------------------------------------------------------
 
 	private static class TestRpcConnection extends RegisteredRpcConnection<UUID, TestRegistrationGateway, TestRegistrationSuccess> {
 
-		private final RpcService rpcService;
+		private final Object lock = new Object();
 
-		private String connectionId;
+		private final RpcService rpcService;
 
-		private String failureMessage;
+		private CompletableFuture<String> connectionFuture;
 
 		public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor,  RpcService rpcService) {
 			super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
 			this.rpcService = rpcService;
+			this.connectionFuture = new CompletableFuture<>();
 		}
 
 		@Override
@@ -161,20 +201,31 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 
 		@Override
 		protected void onRegistrationSuccess(RetryingRegistrationTest.TestRegistrationSuccess success) {
-			connectionId = success.getCorrelationId();
+			synchronized (lock) {
+				connectionFuture.complete(success.getCorrelationId());
+			}
 		}
 
 		@Override
 		protected void onRegistrationFailure(Throwable failure) {
-			failureMessage = failure.getMessage();
+			synchronized (lock) {
+				connectionFuture.completeExceptionally(failure);
+			}
 		}
 
-		public String getConnectionId() {
-			return connectionId;
+		@Override
+		public boolean tryReconnect() {
+			synchronized (lock) {
+				connectionFuture.cancel(false);
+				connectionFuture = new CompletableFuture<>();
+			}
+			return super.tryReconnect();
 		}
 
-		public String getFailareMessage() {
-			return failureMessage;
+		public CompletableFuture<String> getConnectionFuture() {
+			synchronized (lock) {
+				return connectionFuture;
+			}
 		}
 	}
 }