You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/22 23:51:05 UTC

[2/4] flink git commit: [FLINK-9408] Let JM try to reconnect to RM

[FLINK-9408] Let JM try to reconnect to RM

This commit changes the behaviour of the JM to always try to reconnect to the latest known RM address.

This closes #6056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b630e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b630e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b630e5

Branch: refs/heads/release-1.5
Commit: 37b630e53f28c99e452faf7216d4be3577b956ce
Parents: e0b1b46
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue May 22 15:53:38 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 23 00:59:50 2018 +0200

----------------------------------------------------------------------
 .../EstablishedResourceManagerConnection.java   | 13 +--
 .../flink/runtime/jobmaster/JobMaster.java      | 91 ++++++++++++--------
 .../jobmaster/ResourceManagerAddress.java       | 77 +++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 90 +++++++++++++++++++
 4 files changed, 224 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
index 46c1b4b..e64754d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 
 import javax.annotation.Nonnull;
 
@@ -30,29 +29,25 @@ import javax.annotation.Nonnull;
  */
 class EstablishedResourceManagerConnection {
 
+	@Nonnull
 	private final ResourceManagerGateway resourceManagerGateway;
 
-	private final ResourceManagerId resourceManagerId;
-
+	@Nonnull
 	private final ResourceID resourceManagerResourceID;
 
 	EstablishedResourceManagerConnection(
 			@Nonnull ResourceManagerGateway resourceManagerGateway,
-			@Nonnull ResourceManagerId resourceManagerId,
 			@Nonnull ResourceID resourceManagerResourceID) {
 		this.resourceManagerGateway = resourceManagerGateway;
-		this.resourceManagerId = resourceManagerId;
 		this.resourceManagerResourceID = resourceManagerResourceID;
 	}
 
+	@Nonnull
 	public ResourceManagerGateway getResourceManagerGateway() {
 		return resourceManagerGateway;
 	}
 
-	public ResourceManagerId getResourceManagerId() {
-		return resourceManagerId;
-	}
-
+	@Nonnull
 	public ResourceID getResourceManagerResourceID() {
 		return resourceManagerResourceID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5f5a9a9..1df9d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -131,6 +131,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution of a single
@@ -209,6 +210,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	private String lastInternalSavepoint;
 
 	@Nullable
+	private ResourceManagerAddress resourceManagerAddress;
+
+	@Nullable
 	private ResourceManagerConnection resourceManagerConnection;
 
 	@Nullable
@@ -888,13 +892,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final Exception cause) {
 
 		if (isConnectingToResourceManager(resourceManagerId)) {
-			closeResourceManagerConnection(cause);
+			reconnectToResourceManager(cause);
 		}
 	}
 
 	private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
-		return resourceManagerConnection != null
-				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+		return resourceManagerAddress != null
+				&& resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
 	}
 
 	@Override
@@ -999,9 +1003,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		setNewFencingToken(newJobMasterId);
 
+		startJobMasterServices();
+
 		log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 
-		startJobMasterServices();
 		resetAndScheduleExecutionGraph();
 
 		return Acknowledge.get();
@@ -1011,6 +1016,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		// start the slot pool make sure the slot pool now accepts messages for this leader
 		slotPool.start(getFencingToken(), getAddress());
 
+		//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
+		// try to reconnect to previously known leader
+		reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
+
 		// job is ready to go, try to establish connection with resource manager
 		//   - activate leader retrieval for the resource manager
 		//   - on notification of the leader, the connection will be established and
@@ -1072,8 +1081,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			ExecutionGraph newExecutionGraph,
 			JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
 		validateRunsInMainThread();
-		Preconditions.checkState(executionGraph.getState().isTerminalState());
-		Preconditions.checkState(jobManagerJobMetricGroup == null);
+		checkState(executionGraph.getState().isTerminalState());
+		checkState(jobManagerJobMetricGroup == null);
 
 		executionGraph = newExecutionGraph;
 		jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
@@ -1103,7 +1112,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	private void scheduleExecutionGraph() {
-		Preconditions.checkState(jobStatusListener == null);
+		checkState(jobStatusListener == null);
 		// register self as job status change listener
 		jobStatusListener = new JobManagerJobStatusListener();
 		executionGraph.registerJobStatusListener(jobStatusListener);
@@ -1239,33 +1248,40 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		}
 	}
 
-	private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final ResourceManagerId resourceManagerId) {
-		if (resourceManagerConnection != null) {
-			if (resourceManagerAddress != null) {
-				if (Objects.equals(resourceManagerAddress, resourceManagerConnection.getTargetAddress())
-					&& Objects.equals(resourceManagerId, resourceManagerConnection.getTargetLeaderId())) {
-					// both address and leader id are not changed, we can keep the old connection
-					return;
-				}
+	private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
+		resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
 
-				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
-					resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+		reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
+	}
 
-				closeResourceManagerConnection(new Exception(
-					"ResourceManager leader changed to new address " + resourceManagerAddress));
-			} else {
-				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-					resourceManagerConnection.getTargetAddress());
-			}
+	@Nullable
+	private ResourceManagerAddress createResourceManagerAddress(@Nullable String newResourceManagerAddress, @Nullable ResourceManagerId resourceManagerId) {
+		if (newResourceManagerAddress != null) {
+			// the contract is: address == null <=> id == null
+			checkNotNull(resourceManagerId);
+			return new ResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
+		} else {
+			return null;
 		}
+	}
+
+	private void reconnectToResourceManager(Exception cause) {
+		closeResourceManagerConnection(cause);
+		tryConnectToResourceManager();
+	}
 
+	private void tryConnectToResourceManager() {
 		if (resourceManagerAddress != null) {
-			createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+			connectToResourceManager();
 		}
 	}
 
-	private void createResourceManagerConnection(String resourceManagerAddress, ResourceManagerId resourceManagerId) {
-		log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+	private void connectToResourceManager() {
+		assert(resourceManagerAddress != null);
+		assert(resourceManagerConnection == null);
+		assert(establishedResourceManagerConnection == null);
+
+		log.info("Connecting to ResourceManager {}", resourceManagerAddress);
 
 		resourceManagerConnection = new ResourceManagerConnection(
 			log,
@@ -1273,8 +1289,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			resourceId,
 			getAddress(),
 			getFencingToken(),
-			resourceManagerAddress,
-			resourceManagerId,
+			resourceManagerAddress.getAddress(),
+			resourceManagerAddress.getResourceManagerId(),
 			scheduledExecutorService);
 
 		resourceManagerConnection.start();
@@ -1295,7 +1311,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 			establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
 				resourceManagerGateway,
-				success.getResourceManagerId(),
 				resourceManagerResourceId);
 
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
@@ -1541,7 +1556,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			runAsync(() -> establishResourceManagerConnection(success));
+			runAsync(() -> {
+				// filter out replace connections
+				if (this == resourceManagerConnection) {
+					establishResourceManagerConnection(success);
+				}
+			});
 		}
 
 		@Override
@@ -1610,14 +1630,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 				log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
 
 				if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
-					final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
-					final ResourceManagerId resourceManagerId = establishedResourceManagerConnection.getResourceManagerId();
-
-					closeResourceManagerConnection(
-						new TimeoutException(
-							"The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-
-					createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+					reconnectToResourceManager(
+						new JobMasterException(
+							String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
 				}
 			});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
new file mode 100644
index 0000000..d549f7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/**
+ * Current address and fencing token of the leading ResourceManager.
+ */
+public class ResourceManagerAddress {
+
+	@Nonnull
+	private final String address;
+
+	@Nonnull
+	private final ResourceManagerId resourceManagerId;
+
+	public ResourceManagerAddress(@Nonnull String address, @Nonnull ResourceManagerId resourceManagerId) {
+		this.address = address;
+		this.resourceManagerId = resourceManagerId;
+	}
+
+	@Nonnull
+	public String getAddress() {
+		return address;
+	}
+
+	@Nonnull
+	public ResourceManagerId getResourceManagerId() {
+		return resourceManagerId;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (this == obj) {
+			return true;
+		}
+
+		if (obj == null || getClass() != obj.getClass()) {
+			return false;
+		}
+
+		ResourceManagerAddress that = (ResourceManagerAddress) obj;
+		return Objects.equals(address, that.address) &&
+			Objects.equals(resourceManagerId, that.resourceManagerId);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(address, resourceManagerId);
+	}
+
+	@Override
+	public String toString() {
+		return address + '(' + resourceManagerId + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 09640d5..99cdc16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
@@ -89,6 +90,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -525,6 +527,94 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that we continue reconnecting to the latest known RM after a disconnection
+	 * message.
+	 */
+	@Test
+	public void testReconnectionAfterDisconnect() throws Exception {
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build());
+
+		final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+		jobMaster.start(jobMasterId, testingTimeout);
+
+		try {
+			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+			final BlockingQueue<JobMasterId> registrationsQueue = new ArrayBlockingQueue<>(1);
+
+			testingResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> registrationsQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
+
+			rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+
+			final ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken();
+			rmLeaderRetrievalService.notifyListener(
+				testingResourceManagerGateway.getAddress(),
+				resourceManagerId.toUUID());
+
+			// wait for first registration attempt
+			final JobMasterId firstRegistrationAttempt = registrationsQueue.take();
+
+			assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+
+			assertThat(registrationsQueue.isEmpty(), is(true));
+			jobMasterGateway.disconnectResourceManager(resourceManagerId, new FlinkException("Test exception"));
+
+			// wait for the second registration attempt after the disconnect call
+			assertThat(registrationsQueue.take(), equalTo(jobMasterId));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
+	/**
+	 * Tests that the a JM connects to the leading RM after regaining leadership.
+	 */
+	@Test
+	public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build());
+
+		jobMaster.start(jobMasterId, testingTimeout);
+
+		try {
+			final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+			final BlockingQueue<JobMasterId> registrationQueue = new ArrayBlockingQueue<>(1);
+			testingResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> registrationQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
+
+			final String resourceManagerAddress = testingResourceManagerGateway.getAddress();
+			rpcService.registerGateway(resourceManagerAddress, testingResourceManagerGateway);
+
+			rmLeaderRetrievalService.notifyListener(resourceManagerAddress, testingResourceManagerGateway.getFencingToken().toUUID());
+
+			final JobMasterId firstRegistrationAttempt = registrationQueue.take();
+
+			assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+
+			jobMaster.suspend(new FlinkException("Test exception."), testingTimeout).get();
+
+			final JobMasterId jobMasterId2 = JobMasterId.generate();
+
+			jobMaster.start(jobMasterId2, testingTimeout).get();
+
+			final JobMasterId secondRegistrationAttempt = registrationQueue.take();
+
+			assertThat(secondRegistrationAttempt, equalTo(jobMasterId2));
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private File createSavepoint(long savepointId) throws IOException {
 		final File savepointFile = temporaryFolder.newFile();
 		final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());