You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 21:09:56 UTC
flink git commit: [FLINK-7533] Let LeaderGatewayRetriever retry
failed gateway retrievals
Repository: flink
Updated Branches:
refs/heads/master 41dba8bb7 -> 51b48f33b
[FLINK-7533] Let LeaderGatewayRetriever retry failed gateway retrievals
Add test case
Only log LeaderGatewayRetriever exception on Debug log level
Properly fail outdated gateway retrieval operations
This closes #4602.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b48f33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b48f33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b48f33
Branch: refs/heads/master
Commit: 51b48f33bb68014663d74548467ed88afd3bb3a3
Parents: 41dba8b
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 18 14:29:29 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 23:08:35 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/util/OptionalConsumer.java | 4 +-
.../flink/runtime/webmonitor/WebHandler.java | 4 +-
.../handlers/TaskManagerLogHandler.java | 1 +
.../webmonitor/retriever/GatewayRetriever.java | 19 ++--
.../retriever/LeaderGatewayRetriever.java | 80 ++++++++++++--
.../webmonitor/retriever/LeaderRetriever.java | 37 +++----
.../retriever/LeaderGatewayRetrieverTest.java | 104 +++++++++++++++++++
7 files changed, 208 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java b/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
index 94eac2f..6773cdf 100644
--- a/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
+++ b/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
@@ -30,10 +30,10 @@ import java.util.function.Consumer;
* @param <T> type of the optional
*/
public class OptionalConsumer<T> {
- private Optional<T> optional;
+ private final Optional<T> optional;
private OptionalConsumer(Optional<T> optional) {
- this.optional = optional;
+ this.optional = Preconditions.checkNotNull(optional);
}
public static <T> OptionalConsumer<T> of(Optional<T> optional) {
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
index 5fbf2a3..9839abd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
@@ -24,9 +24,9 @@ package org.apache.flink.runtime.webmonitor;
public interface WebHandler {
/**
- * Paths to register the handler under.
+ * Returns an array of REST URL's under which this handler can be registered.
*
- * @return Array of paths under which the handler wants to be registered
+ * @return array containing REST URL's under which this handler can be registered.
*/
String[] getPaths();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index dc9b49f..b3238af 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -138,6 +138,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
this.blobView = Preconditions.checkNotNull(blobView, "blobView");
}
+ @Override
public String[] getPaths() {
switch (fileMode) {
case LOG:
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
index 1771b72..913af68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.retriever;
import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.util.FlinkRuntimeException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -38,19 +39,23 @@ public interface GatewayRetriever<T extends RpcGateway> {
CompletableFuture<T> getFuture();
/**
- * Returns the currently retrieved object if there is such an object. Otherwise
+ * Returns the currently retrieved gateway if there is such an object. Otherwise
* it returns an empty optional.
*
* @return Optional object to retrieve
- * @throws Exception if the future has been completed with an exception
*/
- default Optional<T> getNow() throws Exception {
+ default Optional<T> getNow() {
CompletableFuture<T> leaderFuture = getFuture();
if (leaderFuture != null) {
- CompletableFuture<T> currentLeaderFuture = leaderFuture;
-
- if (currentLeaderFuture.isDone()) {
- return Optional.of(currentLeaderFuture.get());
+ if (leaderFuture.isCompletedExceptionally() || leaderFuture.isCancelled()) {
+ return Optional.empty();
+ } else if (leaderFuture.isDone()) {
+ try {
+ return Optional.of(leaderFuture.get());
+ } catch (Exception e) {
+ // this should never happen
+ throw new FlinkRuntimeException("Unexpected error while accessing the retrieved gateway.", e);
+ }
} else {
return Optional.empty();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
index 4e59859..4d300ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
@@ -19,10 +19,14 @@
package org.apache.flink.runtime.webmonitor.retriever;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.util.ExceptionUtils;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Retrieves and stores the leading {@link RpcGateway}.
@@ -31,24 +35,86 @@ import java.util.concurrent.CompletableFuture;
*/
public abstract class LeaderGatewayRetriever<T extends RpcGateway> extends LeaderRetriever implements GatewayRetriever<T> {
- private volatile CompletableFuture<T> gatewayFuture;
+ private final AtomicReference<CompletableFuture<T>> atomicGatewayFuture;
+
+ private volatile CompletableFuture<T> initialGatewayFuture;
public LeaderGatewayRetriever() {
- gatewayFuture = createGateway(getLeaderFuture());
+ initialGatewayFuture = new CompletableFuture<>();
+ atomicGatewayFuture = new AtomicReference<>(initialGatewayFuture);
}
@Override
public CompletableFuture<T> getFuture() {
- return gatewayFuture;
+ final CompletableFuture<T> currentGatewayFuture = atomicGatewayFuture.get();
+
+ if (currentGatewayFuture.isCompletedExceptionally()) {
+ try {
+ currentGatewayFuture.get();
+ } catch (ExecutionException | InterruptedException executionException) {
+ String leaderAddress;
+
+ try {
+ Tuple2<String, UUID> leaderAddressSessionId = getLeaderNow()
+ .orElse(Tuple2.of("unknown address", HighAvailabilityServices.DEFAULT_LEADER_ID));
+
+ leaderAddress = leaderAddressSessionId.f0;
+ } catch (Exception e) {
+ log.warn("Could not obtain the current leader.", e);
+ leaderAddress = "unknown leader address";
+ }
+
+ if (log.isDebugEnabled() || log.isTraceEnabled()) {
+ // only log exceptions on debug or trace level
+ log.warn(
+ "Error while retrieving the leader gateway. Retrying to connect to {}.",
+ leaderAddress,
+ ExceptionUtils.stripExecutionException(executionException));
+ } else {
+ log.warn(
+ "Error while retrieving the leader gateway. Retrying to connect to {}.",
+ leaderAddress);
+ }
+ }
+
+ // we couldn't resolve the gateway --> let's try again
+ final CompletableFuture<T> newGatewayFuture = createGateway(getLeaderFuture());
+
+ // let's check if there was a concurrent createNewFuture call
+ if (atomicGatewayFuture.compareAndSet(currentGatewayFuture, newGatewayFuture)) {
+ return newGatewayFuture;
+ } else {
+ return atomicGatewayFuture.get();
+ }
+ } else {
+ return atomicGatewayFuture.get();
+ }
}
@Override
- public CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
- CompletableFuture<Tuple2<String, UUID>> newFuture = super.createNewFuture();
+ public void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {
+ final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);
+
+ final CompletableFuture<T> oldGatewayFuture = atomicGatewayFuture.getAndSet(newGatewayFuture);
- gatewayFuture = createGateway(newFuture);
+ // check if the old gateway future was the initial future
+ if (oldGatewayFuture == initialGatewayFuture) {
+ // we have to complete it because a caller might wait on the initial future
+ newGatewayFuture.whenComplete(
+ (t, throwable) -> {
+ if (throwable != null) {
+ oldGatewayFuture.completeExceptionally(throwable);
+ } else {
+ oldGatewayFuture.complete(t);
+ }
+ });
- return newFuture;
+ // free the initial gateway future
+ initialGatewayFuture = null;
+ } else {
+ // try to cancel old gateway retrieval operation
+ oldGatewayFuture.cancel(false);
+ }
}
protected abstract CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture);
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
index fbfb507..e9ea90e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Retrieves and stores the current leader address.
@@ -35,15 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class LeaderRetriever implements LeaderRetrievalListener {
protected final Logger log = LoggerFactory.getLogger(getClass());
- // False if we have to create a new JobManagerGateway future when being notified
- // about a new leader address
- private final AtomicBoolean firstTimeUsage;
-
- protected volatile CompletableFuture<Tuple2<String, UUID>> leaderFuture;
+ private AtomicReference<CompletableFuture<Tuple2<String, UUID>>> atomicLeaderFuture;
public LeaderRetriever() {
- firstTimeUsage = new AtomicBoolean(true);
- leaderFuture = new CompletableFuture<>();
+ atomicLeaderFuture = new AtomicReference<>(new CompletableFuture<>());
}
/**
@@ -55,7 +50,7 @@ public class LeaderRetriever implements LeaderRetrievalListener {
* @throws Exception if the leader future has been completed with an exception
*/
public Optional<Tuple2<String, UUID>> getLeaderNow() throws Exception {
- CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.leaderFuture;
+ CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.atomicLeaderFuture.get();
if (leaderFuture != null) {
CompletableFuture<Tuple2<String, UUID>> currentLeaderFuture = leaderFuture;
@@ -73,25 +68,23 @@ public class LeaderRetriever implements LeaderRetrievalListener {
* Returns the current JobManagerGateway future.
*/
public CompletableFuture<Tuple2<String, UUID>> getLeaderFuture() {
- return leaderFuture;
+ return atomicLeaderFuture.get();
}
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
if (leaderAddress != null && !leaderAddress.equals("")) {
try {
- final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture;
+ final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture = CompletableFuture.completedFuture(Tuple2.of(leaderAddress, leaderSessionID));
- if (firstTimeUsage.compareAndSet(true, false)) {
- newLeaderFuture = leaderFuture;
- } else {
- newLeaderFuture = createNewFuture();
- leaderFuture = newLeaderFuture;
- }
+ final CompletableFuture<Tuple2<String, UUID>> oldLeaderFuture = atomicLeaderFuture.getAndSet(newLeaderFuture);
- log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
+ if (!oldLeaderFuture.isDone()) {
+ // initial leader future
+ oldLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
+ }
- newLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
+ notifyNewLeaderAddress(newLeaderFuture);
}
catch (Exception e) {
handleError(e);
@@ -103,10 +96,8 @@ public class LeaderRetriever implements LeaderRetrievalListener {
public void handleError(Exception exception) {
log.error("Received error from LeaderRetrievalService.", exception);
- leaderFuture.completeExceptionally(exception);
+ atomicLeaderFuture.get().completeExceptionally(exception);
}
- protected CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
- return new CompletableFuture<>();
- }
+ protected void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
new file mode 100644
index 0000000..7be06f3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test cases for the {@link LeaderGatewayRetriever}.
+ */
+public class LeaderGatewayRetrieverTest extends TestLogger {
+
+ /**
+ * Tests that the gateway retrieval is retried in case of a failure.
+ */
+ @Test
+ public void testGatewayRetrievalFailures() throws Exception {
+ final String address = "localhost";
+ final UUID leaderId = UUID.randomUUID();
+
+ RpcGateway rpcGateway = mock(RpcGateway.class);
+
+ TestingLeaderGatewayRetriever leaderGatewayRetriever = new TestingLeaderGatewayRetriever(rpcGateway);
+ TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+ testingLeaderRetrievalService.start(leaderGatewayRetriever);
+
+ CompletableFuture<RpcGateway> gatewayFuture = leaderGatewayRetriever.getFuture();
+
+ // this triggers the first gateway retrieval attempt
+ testingLeaderRetrievalService.notifyListener(address, leaderId);
+
+ // check that the first future has been failed
+ try {
+ gatewayFuture.get();
+
+ fail("The first future should have been failed.");
+ } catch (ExecutionException ignored) {
+ // that's what we expect
+ }
+
+ // the second attempt should fail as well
+ assertFalse((leaderGatewayRetriever.getNow().isPresent()));
+
+ // the third attempt should succeed
+ assertEquals(rpcGateway, leaderGatewayRetriever.getNow().get());
+ }
+
+ private static class TestingLeaderGatewayRetriever extends LeaderGatewayRetriever<RpcGateway> {
+
+ private final RpcGateway rpcGateway;
+ private int retrievalAttempt = 0;
+
+ private TestingLeaderGatewayRetriever(RpcGateway rpcGateway) {
+ this.rpcGateway = rpcGateway;
+ }
+
+ @Override
+ protected CompletableFuture<RpcGateway> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
+ CompletableFuture<RpcGateway> result;
+
+ if (retrievalAttempt < 2) {
+ result = FutureUtils.completedExceptionally(new FlinkException("Could not resolve the leader gateway."));
+ } else {
+ result = CompletableFuture.completedFuture(rpcGateway);
+ }
+
+ retrievalAttempt++;
+
+ return result;
+ }
+ }
+}