You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/18 21:09:56 UTC

flink git commit: [FLINK-7533] Let LeaderGatewayRetriever retry failed gateway retrievals

Repository: flink
Updated Branches:
  refs/heads/master 41dba8bb7 -> 51b48f33b


[FLINK-7533] Let LeaderGatewayRetriever retry failed gateway retrievals

Add test case

Only log LeaderGatewayRetriever exception on Debug log level

Properly fail outdated gateway retrieval operations

This closes #4602.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51b48f33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51b48f33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51b48f33

Branch: refs/heads/master
Commit: 51b48f33bb68014663d74548467ed88afd3bb3a3
Parents: 41dba8b
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Aug 18 14:29:29 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 18 23:08:35 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/OptionalConsumer.java |   4 +-
 .../flink/runtime/webmonitor/WebHandler.java    |   4 +-
 .../handlers/TaskManagerLogHandler.java         |   1 +
 .../webmonitor/retriever/GatewayRetriever.java  |  19 ++--
 .../retriever/LeaderGatewayRetriever.java       |  80 ++++++++++++--
 .../webmonitor/retriever/LeaderRetriever.java   |  37 +++----
 .../retriever/LeaderGatewayRetrieverTest.java   | 104 +++++++++++++++++++
 7 files changed, 208 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java b/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
index 94eac2f..6773cdf 100644
--- a/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
+++ b/flink-core/src/main/java/org/apache/flink/util/OptionalConsumer.java
@@ -30,10 +30,10 @@ import java.util.function.Consumer;
  * @param <T> type of the optional
  */
 public class OptionalConsumer<T> {
-	private Optional<T> optional;
+	private final Optional<T> optional;
 
 	private OptionalConsumer(Optional<T> optional) {
-		this.optional = optional;
+		this.optional = Preconditions.checkNotNull(optional);
 	}
 
 	public static <T> OptionalConsumer<T> of(Optional<T> optional) {

http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
index 5fbf2a3..9839abd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
@@ -24,9 +24,9 @@ package org.apache.flink.runtime.webmonitor;
 public interface WebHandler {
 
 	/**
-	 * Paths to register the handler under.
+	 * Returns an array of REST URL's under which this handler can be registered.
 	 *
-	 * @return Array of paths under which the handler wants to be registered
+	 * @return array containing REST URL's under which this handler can be registered.
 	 */
 	String[] getPaths();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index dc9b49f..b3238af 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -138,6 +138,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 		this.blobView = Preconditions.checkNotNull(blobView, "blobView");
 	}
 
+	@Override
 	public String[] getPaths() {
 		switch (fileMode) {
 			case LOG:

http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
index 1771b72..913af68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/GatewayRetriever.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.retriever;
 
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -38,19 +39,23 @@ public interface GatewayRetriever<T extends RpcGateway> {
 	CompletableFuture<T> getFuture();
 
 	/**
-	 * Returns the currently retrieved object if there is such an object. Otherwise
+	 * Returns the currently retrieved gateway if there is such an object. Otherwise
 	 * it returns an empty optional.
 	 *
 	 * @return Optional object to retrieve
-	 * @throws Exception if the future has been completed with an exception
 	 */
-	default Optional<T> getNow() throws Exception {
+	default Optional<T> getNow() {
 		CompletableFuture<T> leaderFuture = getFuture();
 		if (leaderFuture != null) {
-			CompletableFuture<T> currentLeaderFuture = leaderFuture;
-
-			if (currentLeaderFuture.isDone()) {
-				return Optional.of(currentLeaderFuture.get());
+			if (leaderFuture.isCompletedExceptionally() || leaderFuture.isCancelled()) {
+				return Optional.empty();
+			} else if (leaderFuture.isDone()) {
+				try {
+					return Optional.of(leaderFuture.get());
+				} catch (Exception e) {
+					// this should never happen
+					throw new FlinkRuntimeException("Unexpected error while accessing the retrieved gateway.", e);
+				}
 			} else {
 				return Optional.empty();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
index 4e59859..4d300ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetriever.java
@@ -19,10 +19,14 @@
 package org.apache.flink.runtime.webmonitor.retriever;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Retrieves and stores the leading {@link RpcGateway}.
@@ -31,24 +35,86 @@ import java.util.concurrent.CompletableFuture;
  */
 public abstract class LeaderGatewayRetriever<T extends RpcGateway> extends LeaderRetriever implements GatewayRetriever<T> {
 
-	private volatile CompletableFuture<T> gatewayFuture;
+	private final AtomicReference<CompletableFuture<T>> atomicGatewayFuture;
+
+	private volatile CompletableFuture<T> initialGatewayFuture;
 
 	public LeaderGatewayRetriever() {
-		gatewayFuture = createGateway(getLeaderFuture());
+		initialGatewayFuture = new CompletableFuture<>();
+		atomicGatewayFuture = new AtomicReference<>(initialGatewayFuture);
 	}
 
 	@Override
 	public CompletableFuture<T> getFuture() {
-		return gatewayFuture;
+		final CompletableFuture<T> currentGatewayFuture = atomicGatewayFuture.get();
+
+		if (currentGatewayFuture.isCompletedExceptionally()) {
+			try {
+				currentGatewayFuture.get();
+			} catch (ExecutionException | InterruptedException executionException) {
+				String leaderAddress;
+
+				try {
+					Tuple2<String, UUID> leaderAddressSessionId = getLeaderNow()
+						.orElse(Tuple2.of("unknown address", HighAvailabilityServices.DEFAULT_LEADER_ID));
+
+					leaderAddress = leaderAddressSessionId.f0;
+				} catch (Exception e) {
+					log.warn("Could not obtain the current leader.", e);
+					leaderAddress = "unknown leader address";
+				}
+
+				if (log.isDebugEnabled() || log.isTraceEnabled()) {
+					// only log exceptions on debug or trace level
+					log.warn(
+						"Error while retrieving the leader gateway. Retrying to connect to {}.",
+						leaderAddress,
+						ExceptionUtils.stripExecutionException(executionException));
+				} else {
+					log.warn(
+						"Error while retrieving the leader gateway. Retrying to connect to {}.",
+						leaderAddress);
+				}
+			}
+
+			// we couldn't resolve the gateway --> let's try again
+			final CompletableFuture<T> newGatewayFuture = createGateway(getLeaderFuture());
+
+			// let's check if there was a concurrent createNewFuture call
+			if (atomicGatewayFuture.compareAndSet(currentGatewayFuture, newGatewayFuture)) {
+				return newGatewayFuture;
+			} else {
+				return atomicGatewayFuture.get();
+			}
+		} else {
+			return atomicGatewayFuture.get();
+		}
 	}
 
 	@Override
-	public CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
-		CompletableFuture<Tuple2<String, UUID>> newFuture = super.createNewFuture();
+	public void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {
+		final CompletableFuture<T> newGatewayFuture = createGateway(newLeaderAddressFuture);
+
+		final CompletableFuture<T> oldGatewayFuture = atomicGatewayFuture.getAndSet(newGatewayFuture);
 
-		gatewayFuture = createGateway(newFuture);
+		// check if the old gateway future was the initial future
+		if (oldGatewayFuture == initialGatewayFuture) {
+			// we have to complete it because a caller might wait on the initial future
+			newGatewayFuture.whenComplete(
+				(t, throwable) -> {
+					if (throwable != null) {
+						oldGatewayFuture.completeExceptionally(throwable);
+					} else {
+						oldGatewayFuture.complete(t);
+					}
+				});
 
-		return newFuture;
+			// free the initial gateway future
+			initialGatewayFuture = null;
+		} else {
+			// try to cancel old gateway retrieval operation
+			oldGatewayFuture.cancel(false);
+		}
 	}
 
 	protected abstract CompletableFuture<T> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture);

http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
index fbfb507..e9ea90e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/LeaderRetriever.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Retrieves and stores the current leader address.
@@ -35,15 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class LeaderRetriever implements LeaderRetrievalListener {
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
-	// False if we have to create a new JobManagerGateway future when being notified
-	// about a new leader address
-	private final AtomicBoolean firstTimeUsage;
-
-	protected volatile CompletableFuture<Tuple2<String, UUID>> leaderFuture;
+	private AtomicReference<CompletableFuture<Tuple2<String, UUID>>> atomicLeaderFuture;
 
 	public LeaderRetriever() {
-		firstTimeUsage = new AtomicBoolean(true);
-		leaderFuture = new CompletableFuture<>();
+		atomicLeaderFuture = new AtomicReference<>(new CompletableFuture<>());
 	}
 
 	/**
@@ -55,7 +50,7 @@ public class LeaderRetriever implements LeaderRetrievalListener {
 	 * @throws Exception if the leader future has been completed with an exception
 	 */
 	public Optional<Tuple2<String, UUID>> getLeaderNow() throws Exception {
-		CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.leaderFuture;
+		CompletableFuture<Tuple2<String, UUID>> leaderFuture = this.atomicLeaderFuture.get();
 		if (leaderFuture != null) {
 			CompletableFuture<Tuple2<String, UUID>> currentLeaderFuture = leaderFuture;
 
@@ -73,25 +68,23 @@ public class LeaderRetriever implements LeaderRetrievalListener {
 	 * Returns the current JobManagerGateway future.
 	 */
 	public CompletableFuture<Tuple2<String, UUID>> getLeaderFuture() {
-		return leaderFuture;
+		return atomicLeaderFuture.get();
 	}
 
 	@Override
 	public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
 		if (leaderAddress != null && !leaderAddress.equals("")) {
 			try {
-				final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture;
+				final CompletableFuture<Tuple2<String, UUID>> newLeaderFuture = CompletableFuture.completedFuture(Tuple2.of(leaderAddress, leaderSessionID));
 
-				if (firstTimeUsage.compareAndSet(true, false)) {
-					newLeaderFuture = leaderFuture;
-				} else {
-					newLeaderFuture = createNewFuture();
-					leaderFuture = newLeaderFuture;
-				}
+				final CompletableFuture<Tuple2<String, UUID>> oldLeaderFuture = atomicLeaderFuture.getAndSet(newLeaderFuture);
 
-				log.info("New leader reachable under {}:{}.", leaderAddress, leaderSessionID);
+				if (!oldLeaderFuture.isDone()) {
+					// initial leader future
+					oldLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
+				}
 
-				newLeaderFuture.complete(Tuple2.of(leaderAddress, leaderSessionID));
+				notifyNewLeaderAddress(newLeaderFuture);
 			}
 			catch (Exception e) {
 				handleError(e);
@@ -103,10 +96,8 @@ public class LeaderRetriever implements LeaderRetrievalListener {
 	public void handleError(Exception exception) {
 		log.error("Received error from LeaderRetrievalService.", exception);
 
-		leaderFuture.completeExceptionally(exception);
+		atomicLeaderFuture.get().completeExceptionally(exception);
 	}
 
-	protected CompletableFuture<Tuple2<String, UUID>> createNewFuture() {
-		return new CompletableFuture<>();
-	}
+	protected void notifyNewLeaderAddress(CompletableFuture<Tuple2<String, UUID>> newLeaderAddressFuture) {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51b48f33/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
new file mode 100644
index 0000000..7be06f3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/LeaderGatewayRetrieverTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test cases for the {@link LeaderGatewayRetriever}.
+ */
+public class LeaderGatewayRetrieverTest extends TestLogger {
+
+	/**
+	 * Tests that the gateway retrieval is retried in case of a failure.
+	 */
+	@Test
+	public void testGatewayRetrievalFailures() throws Exception {
+		final String address = "localhost";
+		final UUID leaderId = UUID.randomUUID();
+
+		RpcGateway rpcGateway = mock(RpcGateway.class);
+
+		TestingLeaderGatewayRetriever leaderGatewayRetriever = new TestingLeaderGatewayRetriever(rpcGateway);
+		TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
+
+		testingLeaderRetrievalService.start(leaderGatewayRetriever);
+
+		CompletableFuture<RpcGateway> gatewayFuture = leaderGatewayRetriever.getFuture();
+
+		// this triggers the first gateway retrieval attempt
+		testingLeaderRetrievalService.notifyListener(address, leaderId);
+
+		// check that the first future has been failed
+		try {
+			gatewayFuture.get();
+
+			fail("The first future should have been failed.");
+		} catch (ExecutionException ignored) {
+			// that's what we expect
+		}
+
+		// the second attempt should fail as well
+		assertFalse((leaderGatewayRetriever.getNow().isPresent()));
+
+		// the third attempt should succeed
+		assertEquals(rpcGateway, leaderGatewayRetriever.getNow().get());
+	}
+
+	private static class TestingLeaderGatewayRetriever extends LeaderGatewayRetriever<RpcGateway> {
+
+		private final RpcGateway rpcGateway;
+		private int retrievalAttempt = 0;
+
+		private TestingLeaderGatewayRetriever(RpcGateway rpcGateway) {
+			this.rpcGateway = rpcGateway;
+		}
+
+		@Override
+		protected CompletableFuture<RpcGateway> createGateway(CompletableFuture<Tuple2<String, UUID>> leaderFuture) {
+			CompletableFuture<RpcGateway> result;
+
+			if (retrievalAttempt < 2) {
+				result = FutureUtils.completedExceptionally(new FlinkException("Could not resolve the leader gateway."));
+			} else {
+				result = CompletableFuture.completedFuture(rpcGateway);
+			}
+
+			retrievalAttempt++;
+
+			return result;
+		}
+	}
+}