You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/22 23:51:05 UTC
[2/4] flink git commit: [FLINK-9408] Let JM try to reconnect to RM
[FLINK-9408] Let JM try to reconnect to RM
This commit changes the behaviour of the JM to always try to reconnect to the latest known RM address.
This closes #6056.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b630e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b630e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b630e5
Branch: refs/heads/release-1.5
Commit: 37b630e53f28c99e452faf7216d4be3577b956ce
Parents: e0b1b46
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue May 22 15:53:38 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed May 23 00:59:50 2018 +0200
----------------------------------------------------------------------
.../EstablishedResourceManagerConnection.java | 13 +--
.../flink/runtime/jobmaster/JobMaster.java | 91 ++++++++++++--------
.../jobmaster/ResourceManagerAddress.java | 77 +++++++++++++++++
.../flink/runtime/jobmaster/JobMasterTest.java | 90 +++++++++++++++++++
4 files changed, 224 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
index 46c1b4b..e64754d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import javax.annotation.Nonnull;
@@ -30,29 +29,25 @@ import javax.annotation.Nonnull;
*/
class EstablishedResourceManagerConnection {
+ @Nonnull
private final ResourceManagerGateway resourceManagerGateway;
- private final ResourceManagerId resourceManagerId;
-
+ @Nonnull
private final ResourceID resourceManagerResourceID;
EstablishedResourceManagerConnection(
@Nonnull ResourceManagerGateway resourceManagerGateway,
- @Nonnull ResourceManagerId resourceManagerId,
@Nonnull ResourceID resourceManagerResourceID) {
this.resourceManagerGateway = resourceManagerGateway;
- this.resourceManagerId = resourceManagerId;
this.resourceManagerResourceID = resourceManagerResourceID;
}
+ @Nonnull
public ResourceManagerGateway getResourceManagerGateway() {
return resourceManagerGateway;
}
- public ResourceManagerId getResourceManagerId() {
- return resourceManagerId;
- }
-
+ @Nonnull
public ResourceID getResourceManagerResourceID() {
return resourceManagerResourceID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5f5a9a9..1df9d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -131,6 +131,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
/**
* JobMaster implementation. The job master is responsible for the execution of a single
@@ -209,6 +210,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
private String lastInternalSavepoint;
@Nullable
+ private ResourceManagerAddress resourceManagerAddress;
+
+ @Nullable
private ResourceManagerConnection resourceManagerConnection;
@Nullable
@@ -888,13 +892,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
final Exception cause) {
if (isConnectingToResourceManager(resourceManagerId)) {
- closeResourceManagerConnection(cause);
+ reconnectToResourceManager(cause);
}
}
private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
- return resourceManagerConnection != null
- && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+ return resourceManagerAddress != null
+ && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
}
@Override
@@ -999,9 +1003,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
setNewFencingToken(newJobMasterId);
+ startJobMasterServices();
+
log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
- startJobMasterServices();
resetAndScheduleExecutionGraph();
return Acknowledge.get();
@@ -1011,6 +1016,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
// start the slot pool make sure the slot pool now accepts messages for this leader
slotPool.start(getFencingToken(), getAddress());
+ //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
+ // try to reconnect to previously known leader
+ reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
+
// job is ready to go, try to establish connection with resource manager
// - activate leader retrieval for the resource manager
// - on notification of the leader, the connection will be established and
@@ -1072,8 +1081,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
ExecutionGraph newExecutionGraph,
JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
validateRunsInMainThread();
- Preconditions.checkState(executionGraph.getState().isTerminalState());
- Preconditions.checkState(jobManagerJobMetricGroup == null);
+ checkState(executionGraph.getState().isTerminalState());
+ checkState(jobManagerJobMetricGroup == null);
executionGraph = newExecutionGraph;
jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
@@ -1103,7 +1112,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
private void scheduleExecutionGraph() {
- Preconditions.checkState(jobStatusListener == null);
+ checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
executionGraph.registerJobStatusListener(jobStatusListener);
@@ -1239,33 +1248,40 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
}
- private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final ResourceManagerId resourceManagerId) {
- if (resourceManagerConnection != null) {
- if (resourceManagerAddress != null) {
- if (Objects.equals(resourceManagerAddress, resourceManagerConnection.getTargetAddress())
- && Objects.equals(resourceManagerId, resourceManagerConnection.getTargetLeaderId())) {
- // both address and leader id are not changed, we can keep the old connection
- return;
- }
+ private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
+ resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
- log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
- resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+ reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
+ }
- closeResourceManagerConnection(new Exception(
- "ResourceManager leader changed to new address " + resourceManagerAddress));
- } else {
- log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
- resourceManagerConnection.getTargetAddress());
- }
+ @Nullable
+ private ResourceManagerAddress createResourceManagerAddress(@Nullable String newResourceManagerAddress, @Nullable ResourceManagerId resourceManagerId) {
+ if (newResourceManagerAddress != null) {
+ // the contract is: address == null <=> id == null
+ checkNotNull(resourceManagerId);
+ return new ResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
+ } else {
+ return null;
}
+ }
+
+ private void reconnectToResourceManager(Exception cause) {
+ closeResourceManagerConnection(cause);
+ tryConnectToResourceManager();
+ }
+ private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
- createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+ connectToResourceManager();
}
}
- private void createResourceManagerConnection(String resourceManagerAddress, ResourceManagerId resourceManagerId) {
- log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
+ private void connectToResourceManager() {
+ assert(resourceManagerAddress != null);
+ assert(resourceManagerConnection == null);
+ assert(establishedResourceManagerConnection == null);
+
+ log.info("Connecting to ResourceManager {}", resourceManagerAddress);
resourceManagerConnection = new ResourceManagerConnection(
log,
@@ -1273,8 +1289,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
resourceId,
getAddress(),
getFencingToken(),
- resourceManagerAddress,
- resourceManagerId,
+ resourceManagerAddress.getAddress(),
+ resourceManagerAddress.getResourceManagerId(),
scheduledExecutorService);
resourceManagerConnection.start();
@@ -1295,7 +1311,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
- success.getResourceManagerId(),
resourceManagerResourceId);
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
@@ -1541,7 +1556,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
@Override
protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
- runAsync(() -> establishResourceManagerConnection(success));
+ runAsync(() -> {
+ // filter out replace connections
+ if (this == resourceManagerConnection) {
+ establishResourceManagerConnection(success);
+ }
+ });
}
@Override
@@ -1610,14 +1630,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId);
if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) {
- final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
- final ResourceManagerId resourceManagerId = establishedResourceManagerConnection.getResourceManagerId();
-
- closeResourceManagerConnection(
- new TimeoutException(
- "The heartbeat of ResourceManager with id " + resourceId + " timed out."));
-
- createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+ reconnectToResourceManager(
+ new JobMasterException(
+ String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
}
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
new file mode 100644
index 0000000..d549f7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/**
+ * Current address and fencing token of the leading ResourceManager.
+ */
+public class ResourceManagerAddress {
+
+ @Nonnull
+ private final String address;
+
+ @Nonnull
+ private final ResourceManagerId resourceManagerId;
+
+ public ResourceManagerAddress(@Nonnull String address, @Nonnull ResourceManagerId resourceManagerId) {
+ this.address = address;
+ this.resourceManagerId = resourceManagerId;
+ }
+
+ @Nonnull
+ public String getAddress() {
+ return address;
+ }
+
+ @Nonnull
+ public ResourceManagerId getResourceManagerId() {
+ return resourceManagerId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ ResourceManagerAddress that = (ResourceManagerAddress) obj;
+ return Objects.equals(address, that.address) &&
+ Objects.equals(resourceManagerId, that.resourceManagerId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(address, resourceManagerId);
+ }
+
+ @Override
+ public String toString() {
+ return address + '(' + resourceManagerId + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/37b630e5/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 09640d5..99cdc16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -70,6 +70,7 @@ import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
@@ -89,6 +90,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -525,6 +527,94 @@ public class JobMasterTest extends TestLogger {
}
}
+ /**
+ * Tests that we continue reconnecting to the latest known RM after a disconnection
+ * message.
+ */
+ @Test
+ public void testReconnectionAfterDisconnect() throws Exception {
+ final JobMaster jobMaster = createJobMaster(
+ JobMasterConfiguration.fromConfiguration(configuration),
+ jobGraph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build());
+
+ final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+ jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+ final BlockingQueue<JobMasterId> registrationsQueue = new ArrayBlockingQueue<>(1);
+
+ testingResourceManagerGateway.setRegisterJobManagerConsumer(
+ jobMasterIdResourceIDStringJobIDTuple4 -> registrationsQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
+
+ rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
+
+ final ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken();
+ rmLeaderRetrievalService.notifyListener(
+ testingResourceManagerGateway.getAddress(),
+ resourceManagerId.toUUID());
+
+ // wait for first registration attempt
+ final JobMasterId firstRegistrationAttempt = registrationsQueue.take();
+
+ assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+
+ assertThat(registrationsQueue.isEmpty(), is(true));
+ jobMasterGateway.disconnectResourceManager(resourceManagerId, new FlinkException("Test exception"));
+
+ // wait for the second registration attempt after the disconnect call
+ assertThat(registrationsQueue.take(), equalTo(jobMasterId));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+ }
+ }
+
+ /**
+ * Tests that the a JM connects to the leading RM after regaining leadership.
+ */
+ @Test
+ public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception {
+ final JobMaster jobMaster = createJobMaster(
+ JobMasterConfiguration.fromConfiguration(configuration),
+ jobGraph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build());
+
+ jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+ final BlockingQueue<JobMasterId> registrationQueue = new ArrayBlockingQueue<>(1);
+ testingResourceManagerGateway.setRegisterJobManagerConsumer(
+ jobMasterIdResourceIDStringJobIDTuple4 -> registrationQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
+
+ final String resourceManagerAddress = testingResourceManagerGateway.getAddress();
+ rpcService.registerGateway(resourceManagerAddress, testingResourceManagerGateway);
+
+ rmLeaderRetrievalService.notifyListener(resourceManagerAddress, testingResourceManagerGateway.getFencingToken().toUUID());
+
+ final JobMasterId firstRegistrationAttempt = registrationQueue.take();
+
+ assertThat(firstRegistrationAttempt, equalTo(jobMasterId));
+
+ jobMaster.suspend(new FlinkException("Test exception."), testingTimeout).get();
+
+ final JobMasterId jobMasterId2 = JobMasterId.generate();
+
+ jobMaster.start(jobMasterId2, testingTimeout).get();
+
+ final JobMasterId secondRegistrationAttempt = registrationQueue.take();
+
+ assertThat(secondRegistrationAttempt, equalTo(jobMasterId2));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+ }
+ }
+
private File createSavepoint(long savepointId) throws IOException {
final File savepointFile = temporaryFolder.newFile();
final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());