You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:18 UTC
[11/12] flink git commit: [FLINK-9358] Avoid NPE when closing an
unestablished ResourceManager connection
[FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection
A NPE occurred when trying to disconnect an unestablished ResourceManager connection.
In order to fix this problem, we now check whether the connection has been established
or not.
This closes #6011.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a95ec5ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a95ec5ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a95ec5ac
Branch: refs/heads/master
Commit: a95ec5acf259884347ae539913bcffcad5bfc340
Parents: f4e0368
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 14:14:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 00:23:10 2018 +0200
----------------------------------------------------------------------
.../EstablishedResourceManagerConnection.java | 59 ++++++++++++++
.../flink/runtime/jobmaster/JobMaster.java | 83 +++++++++++---------
.../flink/runtime/jobmaster/JobMasterTest.java | 48 +++++++++++
3 files changed, 155 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
new file mode 100644
index 0000000..46c1b4b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Class which contains the connection details of an established
+ * connection with the ResourceManager.
+ */
+class EstablishedResourceManagerConnection {
+
+ private final ResourceManagerGateway resourceManagerGateway;
+
+ private final ResourceManagerId resourceManagerId;
+
+ private final ResourceID resourceManagerResourceID;
+
+ EstablishedResourceManagerConnection(
+ @Nonnull ResourceManagerGateway resourceManagerGateway,
+ @Nonnull ResourceManagerId resourceManagerId,
+ @Nonnull ResourceID resourceManagerResourceID) {
+ this.resourceManagerGateway = resourceManagerGateway;
+ this.resourceManagerId = resourceManagerId;
+ this.resourceManagerResourceID = resourceManagerResourceID;
+ }
+
+ public ResourceManagerGateway getResourceManagerGateway() {
+ return resourceManagerGateway;
+ }
+
+ public ResourceManagerId getResourceManagerId() {
+ return resourceManagerId;
+ }
+
+ public ResourceID getResourceManagerResourceID() {
+ return resourceManagerResourceID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f30c119..aff3280 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -191,9 +191,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private LeaderRetrievalService resourceManagerLeaderRetriever;
- @Nullable
- private ResourceManagerConnection resourceManagerConnection;
-
// --------- TaskManagers --------
private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
@@ -211,6 +208,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Nullable
private String lastInternalSavepoint;
+ @Nullable
+ private ResourceManagerConnection resourceManagerConnection;
+
+ @Nullable
+ private EstablishedResourceManagerConnection establishedResourceManagerConnection;
+
// ------------------------------------------------------------------------
public JobMaster(
@@ -290,6 +293,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
this.jobStatusListener = null;
+
+ this.resourceManagerConnection = null;
+ this.establishedResourceManagerConnection = null;
}
//----------------------------------------------------------------------------------------------
@@ -881,12 +887,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
final ResourceManagerId resourceManagerId,
final Exception cause) {
- if (resourceManagerConnection != null
- && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
+ if (isConnectingToResourceManager(resourceManagerId)) {
closeResourceManagerConnection(cause);
}
}
+ private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
+ return resourceManagerConnection != null
+ && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+ }
+
@Override
public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
@@ -1238,11 +1248,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
return;
}
- closeResourceManagerConnection(new Exception(
- "ResourceManager leader changed to new address " + resourceManagerAddress));
-
log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+
+ closeResourceManagerConnection(new Exception(
+ "ResourceManager leader changed to new address " + resourceManagerAddress));
} else {
log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
resourceManagerConnection.getTargetAddress());
@@ -1277,9 +1287,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+ final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
+
+ establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
+ resourceManagerGateway,
+ success.getResourceManagerId(),
+ resourceManagerResourceId);
+
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
- resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() {
+ resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {
resourceManagerGateway.heartbeatFromJobManager(resourceID);
@@ -1297,22 +1314,31 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
private void closeResourceManagerConnection(Exception cause) {
- if (resourceManagerConnection != null) {
- if (log.isDebugEnabled()) {
- log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
- } else {
- log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage());
- }
-
- resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
-
- ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
- resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause);
+ if (establishedResourceManagerConnection != null) {
+ dissolveResourceManagerConnection(establishedResourceManagerConnection, cause);
+ establishedResourceManagerConnection = null;
+ }
+ if (resourceManagerConnection != null) {
+ // stop a potentially ongoing registration process
resourceManagerConnection.close();
resourceManagerConnection = null;
}
+ }
+
+ private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) {
+ final ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
+ if (log.isDebugEnabled()) {
+ log.debug("Close ResourceManager connection {}.", resourceManagerResourceID, cause);
+ } else {
+ log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID, cause.getMessage());
+ }
+
+ resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
+
+ ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
+ resourceManagerGateway.disconnectJobManager(jobGraph.getJobID(), cause);
slotPoolGateway.disconnectResourceManager();
}
@@ -1473,8 +1499,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private final JobMasterId jobMasterId;
- private ResourceID resourceManagerResourceID;
-
ResourceManagerConnection(
final Logger log,
final JobID jobID,
@@ -1498,7 +1522,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
getTargetAddress(), getTargetLeaderId()) {
@Override
protected CompletableFuture<RegistrationResponse> invokeRegistration(
- ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
+ ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
Time timeout = Time.milliseconds(timeoutMillis);
return gateway.registerJobManager(
@@ -1513,24 +1537,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
- runAsync(() -> {
- resourceManagerResourceID = success.getResourceManagerResourceId();
- establishResourceManagerConnection(success);
- });
+ runAsync(() -> establishResourceManagerConnection(success));
}
@Override
protected void onRegistrationFailure(final Throwable failure) {
handleJobMasterError(failure);
}
-
- public ResourceID getResourceManagerResourceID() {
- return resourceManagerResourceID;
- }
-
- public JobID getJobID() {
- return jobID;
- }
}
//----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 2f61681..c0c9162 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -361,6 +362,53 @@ public class JobMasterTest extends TestLogger {
}
}
+ /**
+ * Tests that we can close an unestablished ResourceManager connection.
+ */
+ @Test
+ public void testCloseUnestablishedResourceManagerConnection() throws Exception {
+ final JobMaster jobMaster = createJobMaster(
+ JobMasterConfiguration.fromConfiguration(configuration),
+ jobGraph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build());
+
+ try {
+ jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+ final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final String firstResourceManagerAddress = "address1";
+ final String secondResourceManagerAddress = "address2";
+
+ final TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway();
+ final TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway();
+
+ rpcService.registerGateway(firstResourceManagerAddress, firstResourceManagerGateway);
+ rpcService.registerGateway(secondResourceManagerAddress, secondResourceManagerGateway);
+
+ final OneShotLatch firstJobManagerRegistration = new OneShotLatch();
+ final OneShotLatch secondJobManagerRegistration = new OneShotLatch();
+
+ firstResourceManagerGateway.setRegisterJobManagerConsumer(
+ jobMasterIdResourceIDStringJobIDTuple4 -> firstJobManagerRegistration.trigger());
+
+ secondResourceManagerGateway.setRegisterJobManagerConsumer(
+ jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger());
+
+ rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, resourceManagerId.toUUID());
+
+ // wait until we have seen the first registration attempt
+ firstJobManagerRegistration.await();
+
+ // this should stop the connection attempts towards the first RM
+ rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, resourceManagerId.toUUID());
+
+ // check that we start registering at the second RM
+ secondJobManagerRegistration.await();
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+ }
+ }
+
private File createSavepoint(long savepointId) throws IOException {
final File savepointFile = temporaryFolder.newFile();
final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());