You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:18 UTC

[11/12] flink git commit: [FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

[FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

A NPE occurred when trying to disconnect an unestablished ResourceManager connection.
In order to fix this problem, we now check whether the connection has been established
or not.

This closes #6011.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a95ec5ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a95ec5ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a95ec5ac

Branch: refs/heads/master
Commit: a95ec5acf259884347ae539913bcffcad5bfc340
Parents: f4e0368
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 14:14:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 00:23:10 2018 +0200

----------------------------------------------------------------------
 .../EstablishedResourceManagerConnection.java   | 59 ++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 83 +++++++++++---------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 48 +++++++++++
 3 files changed, 155 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
new file mode 100644
index 0000000..46c1b4b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Class which contains the connection details of an established
+ * connection with the ResourceManager.
+ */
+class EstablishedResourceManagerConnection {
+
+	private final ResourceManagerGateway resourceManagerGateway;
+
+	private final ResourceManagerId resourceManagerId;
+
+	private final ResourceID resourceManagerResourceID;
+
+	EstablishedResourceManagerConnection(
+			@Nonnull ResourceManagerGateway resourceManagerGateway,
+			@Nonnull ResourceManagerId resourceManagerId,
+			@Nonnull ResourceID resourceManagerResourceID) {
+		this.resourceManagerGateway = resourceManagerGateway;
+		this.resourceManagerId = resourceManagerId;
+		this.resourceManagerResourceID = resourceManagerResourceID;
+	}
+
+	public ResourceManagerGateway getResourceManagerGateway() {
+		return resourceManagerGateway;
+	}
+
+	public ResourceManagerId getResourceManagerId() {
+		return resourceManagerId;
+	}
+
+	public ResourceID getResourceManagerResourceID() {
+		return resourceManagerResourceID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f30c119..aff3280 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -191,9 +191,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
 
-	@Nullable
-	private ResourceManagerConnection resourceManagerConnection;
-
 	// --------- TaskManagers --------
 
 	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
@@ -211,6 +208,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	@Nullable
 	private String lastInternalSavepoint;
 
+	@Nullable
+	private ResourceManagerConnection resourceManagerConnection;
+
+	@Nullable
+	private EstablishedResourceManagerConnection establishedResourceManagerConnection;
+
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
@@ -290,6 +293,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
 		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
 		this.jobStatusListener = null;
+
+		this.resourceManagerConnection = null;
+		this.establishedResourceManagerConnection = null;
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -881,12 +887,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final ResourceManagerId resourceManagerId,
 			final Exception cause) {
 
-		if (resourceManagerConnection != null
-				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
+		if (isConnectingToResourceManager(resourceManagerId)) {
 			closeResourceManagerConnection(cause);
 		}
 	}
 
+	private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
+		return resourceManagerConnection != null
+				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+	}
+
 	@Override
 	public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
 		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
@@ -1238,11 +1248,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					return;
 				}
 
-				closeResourceManagerConnection(new Exception(
-					"ResourceManager leader changed to new address " + resourceManagerAddress));
-
 				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
 					resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+
+				closeResourceManagerConnection(new Exception(
+					"ResourceManager leader changed to new address " + resourceManagerAddress));
 			} else {
 				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
 					resourceManagerConnection.getTargetAddress());
@@ -1277,9 +1287,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 			final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
+			final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
+
+			establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
+				resourceManagerGateway,
+				success.getResourceManagerId(),
+				resourceManagerResourceId);
+
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
-			resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() {
+			resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
 				@Override
 				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 					resourceManagerGateway.heartbeatFromJobManager(resourceID);
@@ -1297,22 +1314,31 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
-		if (resourceManagerConnection != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
-			} else {
-				log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage());
-			}
-
-			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
-
-			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-			resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause);
+		if (establishedResourceManagerConnection != null) {
+			dissolveResourceManagerConnection(establishedResourceManagerConnection, cause);
+			establishedResourceManagerConnection = null;
+		}
 
+		if (resourceManagerConnection != null) {
+			// stop a potentially ongoing registration process
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+	}
+
+	private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) {
+		final ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
 
+		if (log.isDebugEnabled()) {
+			log.debug("Close ResourceManager connection {}.", resourceManagerResourceID, cause);
+		} else {
+			log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID, cause.getMessage());
+		}
+
+		resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
+
+		ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
+		resourceManagerGateway.disconnectJobManager(jobGraph.getJobID(), cause);
 		slotPoolGateway.disconnectResourceManager();
 	}
 
@@ -1473,8 +1499,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		private final JobMasterId jobMasterId;
 
-		private ResourceID resourceManagerResourceID;
-
 		ResourceManagerConnection(
 				final Logger log,
 				final JobID jobID,
@@ -1498,7 +1522,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					getTargetAddress(), getTargetLeaderId()) {
 				@Override
 				protected CompletableFuture<RegistrationResponse> invokeRegistration(
-						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
+						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
 					Time timeout = Time.milliseconds(timeoutMillis);
 
 					return gateway.registerJobManager(
@@ -1513,24 +1537,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			runAsync(() -> {
-				resourceManagerResourceID = success.getResourceManagerResourceId();
-				establishResourceManagerConnection(success);
-			});
+			runAsync(() -> establishResourceManagerConnection(success));
 		}
 
 		@Override
 		protected void onRegistrationFailure(final Throwable failure) {
 			handleJobMasterError(failure);
 		}
-
-		public ResourceID getResourceManagerResourceID() {
-			return resourceManagerResourceID;
-		}
-
-		public JobID getJobID() {
-			return jobID;
-		}
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 2f61681..c0c9162 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -361,6 +362,53 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that we can close an unestablished ResourceManager connection.
+	 */
+	@Test
+	public void testCloseUnestablishedResourceManagerConnection() throws Exception {
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build());
+
+		try {
+			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+			final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+			final String firstResourceManagerAddress = "address1";
+			final String secondResourceManagerAddress = "address2";
+
+			final TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway();
+			final TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway();
+
+			rpcService.registerGateway(firstResourceManagerAddress, firstResourceManagerGateway);
+			rpcService.registerGateway(secondResourceManagerAddress, secondResourceManagerGateway);
+
+			final OneShotLatch firstJobManagerRegistration = new OneShotLatch();
+			final OneShotLatch secondJobManagerRegistration = new OneShotLatch();
+
+			firstResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> firstJobManagerRegistration.trigger());
+
+			secondResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger());
+
+			rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, resourceManagerId.toUUID());
+
+			// wait until we have seen the first registration attempt
+			firstJobManagerRegistration.await();
+
+			// this should stop the connection attempts towards the first RM
+			rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, resourceManagerId.toUUID());
+
+			// check that we start registering at the second RM
+			secondJobManagerRegistration.await();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private File createSavepoint(long savepointId) throws IOException {
 		final File savepointFile = temporaryFolder.newFile();
 		final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());