You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/14 09:15:29 UTC

[flink] branch master updated (2826ff8 -> 2284f77)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2826ff8  [FLINK-12374][table-planner-blink] Support translation from StreamExecTableSourceScan/BatchExecTableSourceScan to StreamTransformation. (#8407)
     new 6231b18  [FLINK-12472][yarn] Support setting attemptFailuresValidityInterval of jobs on Yarn
     new b3f102d  [FLINK-12468][yarn] Unregister application from the YARN Resource Manager with a valid appTrackingUrl
     new 792f2a2  [FLINK-12468][yarn] Derive HistoryServer's URL from HistoryServerOptions
     new 07773d0  [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
     new 2284f77  [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/yarn_config_configuration.html       |   5 +
 .../runtime/webmonitor/history/HistoryServer.java  |   2 +-
 .../runtime/resourcemanager/ResourceManager.java   |  27 +++--
 .../webmonitor/history/HistoryServerUtils.java     |  82 +++++++++++++++
 .../ResourceManagerTaskExecutorTest.java           |  68 +++++++++++-
 .../flink/runtime/rpc/TestingRpcService.java       |  22 +++-
 .../webmonitor/history/HistoryServerUtilsTest.java | 114 +++++++++++++++++++++
 .../flink/yarn/AbstractYarnClusterDescriptor.java  |   6 +-
 .../org/apache/flink/yarn/YarnResourceManager.java |   9 +-
 .../yarn/configuration/YarnConfigOptions.java      |  13 +++
 10 files changed, 333 insertions(+), 15 deletions(-)
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java


[flink] 04/05: [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07773d0d9251d6ad8c1770de985d33be8e72b032
Author: Hwanju Kim <hw...@amazon.com>
AuthorDate: Thu May 9 17:03:37 2019 -0700

    [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race
    
    TaskExecutor registration has asynchronous process, which allows a next
    retry after timeout to be processed first ahead of earlier request. Such
    delayed timed-out request can accidently unregister a valid task
    manager, whose slots are permanently not reported to job manager. This
    patch introduces ongoing task executor futures to prevent such race.
---
 .../runtime/resourcemanager/ResourceManager.java   | 27 +++++++----
 .../ResourceManagerTaskExecutorTest.java           | 52 +++++++++++++++++++++-
 .../flink/runtime/rpc/TestingRpcService.java       | 27 ++++++++++-
 3 files changed, 95 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12860a8..03e1d87 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -114,6 +114,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	/** All currently registered TaskExecutors with there framework specific worker information. */
 	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
 
+	/** Ongoing registration of TaskExecutors per resource ID. */
+	private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures;
+
 	/** High availability services for leader retrieval and election. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
@@ -186,6 +189,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		this.jobManagerRegistrations = new HashMap<>(4);
 		this.jmResourceIdRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
+		this.taskExecutorGatewayFutures = new HashMap<>(8);
 	}
 
 
@@ -371,18 +375,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			final Time timeout) {
 
 		CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
+		taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture);
 
 		return taskExecutorGatewayFuture.handleAsync(
 			(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
-				if (throwable != null) {
-					return new RegistrationResponse.Decline(throwable.getMessage());
+				if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+					taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+					if (throwable != null) {
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						return registerTaskExecutorInternal(
+							taskExecutorGateway,
+							taskExecutorAddress,
+							taskExecutorResourceId,
+							dataPort,
+							hardwareDescription);
+					}
 				} else {
-					return registerTaskExecutorInternal(
-						taskExecutorGateway,
-						taskExecutorAddress,
-						taskExecutorResourceId,
-						dataPort,
-						hardwareDescription);
+					log.info("Ignoring outdated TaskExecutorGateway connection.");
+					return new RegistrationResponse.Decline("Decline outdated task executor registration.");
 				}
 			},
 			getMainThreadExecutor());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 95b3d08..fcb92af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -79,6 +80,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
+	private static final long HEARTBEAT_TIMEOUT = 5000;
+
 	private static TestingRpcService rpcService;
 
 	private TestingTaskExecutorGateway taskExecutorGateway;
@@ -133,7 +136,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
-		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
+		HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 
 		SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -209,6 +212,53 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
+	 * Test delayed registration of task executor where the delay is introduced during connection from resource manager
+	 * to the registering task executor.
+	 */
+	@Test
+	public void testDelayedRegisterTaskExecutor() throws Exception {
+		// additional delay over RPC timeout
+		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
+		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		try {
+			// first registration is with connection delay longer than timeout expecting timeout and then retry
+			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			CompletableFuture<RegistrationResponse> firstFuture =
+				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			try {
+				firstFuture.get();
+				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
+			} catch (Exception e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+			}
+
+			// second registration after timeout is with no delay, expecting it to be succeeded
+			rpcService.setConnectionDelayMillis(0);
+			CompletableFuture<RegistrationResponse> secondFuture =
+				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+			RegistrationResponse response = secondFuture.get();
+			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+
+			// on success, send slot report for taskmanager registration
+			final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+			rmGateway.sendSlotReport(taskExecutorResourceID,
+				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+			// wait enough for the first registration's connection delay to be over letting its remaining part go through
+			Thread.sleep(additionalDelayMillis * 2);
+
+			// verify that the latest registration is valid not being unregistered by the delayed one
+			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+				taskExecutorResourceID,
+				TIMEOUT).get();
+			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
+			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
+		} finally {
+			rpcService.setConnectionDelayMillis(0L);
+		}
+	}
+
+	/**
 	 * Tests that a TaskExecutor can disconnect from the {@link ResourceManager}.
 	 */
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f42f09c..6d12266 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,9 @@ public class TestingRpcService extends AkkaRpcService {
 	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
+	/** Artificial delay on connection */
+	private long connectionDelayMillis;
+
 	/**
 	 * Creates a new {@code TestingRpcService}.
 	 */
@@ -99,6 +103,21 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
+		if (connectionDelayMillis <= 0) {
+			return CompletableFuture.completedFuture(gateway);
+		} else {
+			return CompletableFuture.supplyAsync(
+				() -> {
+					try {
+						Thread.sleep(connectionDelayMillis);
+					} catch (InterruptedException ignored) {}
+					return gateway;
+				},
+				TestingUtils.defaultExecutor());
+		}
+	}
+
 	@Override
 	public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
 		RpcGateway gateway = registeredConnections.get(address);
@@ -107,7 +126,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -127,7 +146,7 @@ public class TestingRpcService extends AkkaRpcService {
 			if (clazz.isAssignableFrom(gateway.getClass())) {
 				@SuppressWarnings("unchecked")
 				C typedGateway = (C) gateway;
-				return CompletableFuture.completedFuture(typedGateway);
+				return getRpcGatewayFuture(typedGateway);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz));
 			}
@@ -139,4 +158,8 @@ public class TestingRpcService extends AkkaRpcService {
 	public void clearGateways() {
 		registeredConnections.clear();
 	}
+
+	public void setConnectionDelayMillis(long connectionDelayMillis) {
+		this.connectionDelayMillis = connectionDelayMillis;
+	}
 }


[flink] 05/05: [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2284f777ecd3b62b412bd0fdb9dbcf492314c589
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon May 13 15:45:03 2019 +0200

    [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor
    
    Use latches instead of timeouts/sleeps to test problematic thread interleaving.
    
    This closes #8415.
---
 .../ResourceManagerTaskExecutorTest.java           | 38 +++++++++++++++-------
 .../flink/runtime/rpc/TestingRpcService.java       | 29 +++++++----------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index fcb92af..63d8245 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -217,23 +219,36 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	 */
 	@Test
 	public void testDelayedRegisterTaskExecutor() throws Exception {
-		// additional delay over RPC timeout
-		// use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below
-		final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+		final Time fastTimeout = Time.milliseconds(1L);
 		try {
-			// first registration is with connection delay longer than timeout expecting timeout and then retry
-			rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis);
+			final OneShotLatch startConnection = new OneShotLatch();
+			final OneShotLatch finishConnection = new OneShotLatch();
+
+			// first registration is with blocking connection
+			rpcService.setRpcGatewayFutureFunction(rpcGateway ->
+				CompletableFuture.supplyAsync(
+					() -> {
+						startConnection.trigger();
+						try {
+							finishConnection.await();
+						} catch (InterruptedException ignored) {}
+						return rpcGateway;
+					},
+					TestingUtils.defaultExecutor()));
+
 			CompletableFuture<RegistrationResponse> firstFuture =
-				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout);
 			try {
 				firstFuture.get();
 				fail("Should have failed because connection to taskmanager is delayed beyond timeout");
 			} catch (Exception e) {
-				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException);
+				assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class));
 			}
 
+			startConnection.await();
+
 			// second registration after timeout is with no delay, expecting it to be succeeded
-			rpcService.setConnectionDelayMillis(0);
+			rpcService.resetRpcGatewayFutureFunction();
 			CompletableFuture<RegistrationResponse> secondFuture =
 				rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
 			RegistrationResponse response = secondFuture.get();
@@ -244,8 +259,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			rmGateway.sendSlotReport(taskExecutorResourceID,
 				((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get();
 
-			// wait enough for the first registration's connection delay to be over letting its remaining part go through
-			Thread.sleep(additionalDelayMillis * 2);
+			// let the remaining part of the first registration proceed
+			finishConnection.trigger();
+			Thread.sleep(1L);
 
 			// verify that the latest registration is valid not being unregistered by the delayed one
 			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
@@ -254,7 +270,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 			assertThat(taskManagerInfo.getNumberSlots(), equalTo(1));
 		} finally {
-			rpcService.setConnectionDelayMillis(0L);
+			rpcService.resetRpcGatewayFutureFunction();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 6d12266..f11269d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,11 +23,11 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,11 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TestingRpcService extends AkkaRpcService {
 
+	private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture;
+
 	/** Map of pre-registered connections. */
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 
-	/** Artificial delay on connection */
-	private long connectionDelayMillis;
+	private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
 
 	/**
 	 * Creates a new {@code TestingRpcService}.
@@ -103,19 +104,9 @@ public class TestingRpcService extends AkkaRpcService {
 		}
 	}
 
+	@SuppressWarnings("unchecked")
 	private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) {
-		if (connectionDelayMillis <= 0) {
-			return CompletableFuture.completedFuture(gateway);
-		} else {
-			return CompletableFuture.supplyAsync(
-				() -> {
-					try {
-						Thread.sleep(connectionDelayMillis);
-					} catch (InterruptedException ignored) {}
-					return gateway;
-				},
-				TestingUtils.defaultExecutor());
-		}
+		return (CompletableFuture<C>) rpcGatewayFutureFunction.apply(gateway);
 	}
 
 	@Override
@@ -159,7 +150,11 @@ public class TestingRpcService extends AkkaRpcService {
 		registeredConnections.clear();
 	}
 
-	public void setConnectionDelayMillis(long connectionDelayMillis) {
-		this.connectionDelayMillis = connectionDelayMillis;
+	public void resetRpcGatewayFutureFunction() {
+		rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
+	}
+
+	public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
+		this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
 	}
 }


[flink] 02/05: [FLINK-12468][yarn] Unregister application from the YARN Resource Manager with a valid appTrackingUrl

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3f102d62214b303a4cc71b9e410e4af2e18c812
Author: Victor Wong <ji...@gmail.com>
AuthorDate: Fri May 10 12:20:40 2019 +0800

    [FLINK-12468][yarn] Unregister application from the YARN Resource Manager with a valid appTrackingUrl
---
 docs/_includes/generated/yarn_config_configuration.html          | 5 +++++
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++-
 .../org/apache/flink/yarn/configuration/YarnConfigOptions.java   | 9 +++++++++
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 40dfc09..4943ef8 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -48,6 +48,11 @@
             <td>Time between heartbeats with the ResourceManager in seconds.</td>
         </tr>
         <tr>
+            <td><h5>yarn.history.server.address</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>The address of Flink HistoryServer.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.maximum-failed-containers</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Maximum number of containers the system is going to reallocate in case of a failure.</td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d054afe..b9ea02b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -292,8 +292,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
 		log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);
 
+		String historyServerAddress = flinkConfig.getString(YarnConfigOptions.APPLICATION_HISTORY_SERVER_ADDRESS);
 		try {
-			resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, "");
+			resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, historyServerAddress);
 		} catch (Throwable t) {
 			log.error("Could not unregister the application master.", t);
 		}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 0f46a57..ff6697fd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -180,6 +180,15 @@ public class YarnConfigOptions {
 		.defaultValue("")
 		.withDescription("A comma-separated list of tags to apply to the Flink YARN application.");
 
+	/**
+	 * The address of Flink HistoryServer.
+	 * This configuration parameter allows setting the appTrackingUrl for finished YARN applications.
+	 */
+	public static final ConfigOption<String> APPLICATION_HISTORY_SERVER_ADDRESS =
+		key("yarn.history.server.address")
+		.defaultValue("")
+		.withDescription("The address of Flink HistoryServer.");
+
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */


[flink] 01/05: [FLINK-12472][yarn] Support setting attemptFailuresValidityInterval of jobs on Yarn

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6231b18645dd41031299b7071d20ddea32c60457
Author: Victor Wong <ji...@gmail.com>
AuthorDate: Fri May 10 16:01:50 2019 +0800

    [FLINK-12472][yarn] Support setting attemptFailuresValidityInterval of jobs on Yarn
    
    This closes #8400.
---
 docs/_includes/generated/yarn_config_configuration.html     |  5 +++++
 .../apache/flink/yarn/AbstractYarnClusterDescriptor.java    |  6 ++++--
 .../apache/flink/yarn/configuration/YarnConfigOptions.java  | 13 +++++++++++++
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index a6b04b6..40dfc09 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -8,6 +8,11 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>yarn.application-attempt-failures-validity-interval</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See <a href="https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/">here</a> for more information.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.application-attempts</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will loose the connection. Also, the JobManager address will change and you’ll need to set the JM host:port manually. It is recommended to leave this option at 1.</td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 20b5417..0f24496 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -35,7 +35,6 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginUtils;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -1283,7 +1282,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
 
 		reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
-		reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
+
+		reflector.setAttemptFailuresValidityInterval(
+			appContext,
+			flinkConfiguration.getLong(YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL));
 	}
 
 	private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 40a5929..0f46a57 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -102,6 +102,19 @@ public class YarnConfigOptions {
 			" to set the JM host:port manually. It is recommended to leave this option at 1.");
 
 	/**
+	 * The config parameter defining the attemptFailuresValidityInterval of Yarn application.
+	 */
+	public static final ConfigOption<Long> APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL =
+		key("yarn.application-attempt-failures-validity-interval")
+		.defaultValue(10000L)
+		.withDescription(Description.builder()
+			.text("Time window in milliseconds which defines the number of application attempt failures when restarting the AM. " +
+				"Failures which fall outside of this window are not being considered. " +
+				"Set this value to -1 in order to count globally. " +
+				"See %s for more information.", link("https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/", "here"))
+			.build());
+
+	/**
 	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
 	 */
 	public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =


[flink] 03/05: [FLINK-12468][yarn] Derive HistoryServer's URL from HistoryServerOptions

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 792f2a2d9c0b5c8bba881455dedbed09b02d363c
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon May 13 14:52:19 2019 +0200

    [FLINK-12468][yarn] Derive HistoryServer's URL from HistoryServerOptions
    
    This commit derives the HistoryServer's URL from the availabe HistoryServerOptions.
    
    This closes #8396.
---
 .../generated/yarn_config_configuration.html       |   5 -
 .../runtime/webmonitor/history/HistoryServer.java  |   2 +-
 .../webmonitor/history/HistoryServerUtils.java     |  82 +++++++++++++++
 .../webmonitor/history/HistoryServerUtilsTest.java | 114 +++++++++++++++++++++
 .../org/apache/flink/yarn/YarnResourceManager.java |  10 +-
 .../yarn/configuration/YarnConfigOptions.java      |   9 --
 6 files changed, 205 insertions(+), 17 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 4943ef8..40dfc09 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -48,11 +48,6 @@
             <td>Time between heartbeats with the ResourceManager in seconds.</td>
         </tr>
         <tr>
-            <td><h5>yarn.history.server.address</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>The address of Flink HistoryServer.</td>
-        </tr>
-        <tr>
             <td><h5>yarn.maximum-failed-containers</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Maximum number of containers the system is going to reallocate in case of a failure.</td>
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 0bb98e8..12a5b56 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -144,7 +144,7 @@ public class HistoryServer {
 		Preconditions.checkNotNull(numFinishedPolls);
 
 		this.config = config;
-		if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config)) {
+		if (HistoryServerUtils.isSSLEnabled(config)) {
 			LOG.info("Enabling SSL for the history server.");
 			try {
 				this.serverSSLFactory = SSLUtils.createRestServerSSLEngineFactory(config);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java
new file mode 100644
index 0000000..e790ba6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtils.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.runtime.net.SSLUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+/**
+ * Utility class for the HistoryServer.
+ */
+public enum HistoryServerUtils {
+	;
+
+	private static final Logger LOG = LoggerFactory.getLogger(HistoryServerUtils.class);
+
+	public static boolean isSSLEnabled(Configuration config) {
+		return config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.isRestSSLEnabled(config);
+	}
+
+	public static Optional<URL> getHistoryServerURL(Configuration configuration) {
+		final String hostname = getHostname(configuration);
+
+		if (hostname != null) {
+			final String protocol = getProtocol(configuration);
+			final int port = getPort(configuration);
+
+			try {
+				return Optional.of(new URL(protocol, hostname, port, ""));
+			} catch (MalformedURLException e) {
+				LOG.debug("Could not create the HistoryServer's URL from protocol: {}, hostname: {} and port: {}.", protocol, hostname, port, e);
+				return Optional.empty();
+			}
+		} else {
+			LOG.debug("Not hostname has been specified for the HistoryServer. This indicates that it has not been started.");
+			return Optional.empty();
+		}
+	}
+
+	private static int getPort(Configuration configuration) {
+		return configuration.getInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT);
+	}
+
+	@Nullable
+	private static String getHostname(Configuration configuration) {
+		return configuration.getString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS);
+	}
+
+	private static String getProtocol(Configuration configuration) {
+		if (isSSLEnabled(configuration)) {
+			return "https";
+		} else {
+			return "http";
+		}
+	}
+
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java
new file mode 100644
index 0000000..dbee4e5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerUtilsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link HistoryServerUtils}.
+ */
+public class HistoryServerUtilsTest extends TestLogger {
+
+	private static final String HOSTNAME = "foobar";
+	private static final int PORT = 1234;
+
+	@Test
+	public void testIsSSLEnabledDefault() {
+		final Configuration configuration = new Configuration();
+
+		assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false));
+	}
+
+	@Test
+	public void testIsSSLEnabledWithoutRestSSL() {
+		final Configuration configuration = new Configuration();
+		configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, true);
+
+		assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false));
+	}
+
+	@Test
+	public void testIsSSLEnabledOnlyRestSSL() {
+		final Configuration configuration = new Configuration();
+		configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+
+		assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(false));
+	}
+
+	@Test
+	public void testIsSSLEnabled() {
+		final Configuration configuration = new Configuration();
+		enableSSL(configuration);
+
+		assertThat(HistoryServerUtils.isSSLEnabled(configuration), is(true));
+	}
+
+	private void enableSSL(Configuration configuration) {
+		configuration.setBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED, true);
+		configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
+	}
+
+	@Test
+	public void testGetHistoryServerURL() throws MalformedURLException {
+		final Configuration configuration = createDefaultConfiguration();
+
+		final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(configuration);
+
+		assertThat(historyServerURL.get(), is(new URL("http", HOSTNAME, PORT, "")));
+	}
+
+	@Test
+	public void testGetHistoryServerURLWithSSL() throws MalformedURLException {
+		final Configuration configuration = createDefaultConfiguration();
+		enableSSL(configuration);
+
+		final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(configuration);
+
+		assertThat(historyServerURL.get(), is(new URL("https", HOSTNAME, PORT, "")));
+	}
+
+	@Test
+	public void testGetHistoryServerURLWithoutHS() {
+		final Configuration configuration = new Configuration();
+
+		assertThat(HistoryServerUtils.getHistoryServerURL(configuration).isPresent(), is(false));
+	}
+
+	@Nonnull
+	private Configuration createDefaultConfiguration() {
+		final Configuration configuration = new Configuration();
+		configuration.setString(HistoryServerOptions.HISTORY_SERVER_WEB_ADDRESS, HOSTNAME);
+		configuration.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, PORT);
+		return configuration;
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index b9ea02b..65baab5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -70,6 +72,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -292,9 +295,12 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
 		log.info("Unregister application from the YARN Resource Manager with final status {}.", yarnStatus);
 
-		String historyServerAddress = flinkConfig.getString(YarnConfigOptions.APPLICATION_HISTORY_SERVER_ADDRESS);
+		final Optional<URL> historyServerURL = HistoryServerUtils.getHistoryServerURL(flinkConfig);
+
+		final String appTrackingUrl = historyServerURL.map(URL::toString).orElse("");
+
 		try {
-			resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, historyServerAddress);
+			resourceManagerClient.unregisterApplicationMaster(yarnStatus, diagnostics, appTrackingUrl);
 		} catch (Throwable t) {
 			log.error("Could not unregister the application master.", t);
 		}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index ff6697fd..0f46a57 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -180,15 +180,6 @@ public class YarnConfigOptions {
 		.defaultValue("")
 		.withDescription("A comma-separated list of tags to apply to the Flink YARN application.");
 
-	/**
-	 * The address of Flink HistoryServer.
-	 * This configuration parameter allows setting the appTrackingUrl for finished YARN applications.
-	 */
-	public static final ConfigOption<String> APPLICATION_HISTORY_SERVER_ADDRESS =
-		key("yarn.history.server.address")
-		.defaultValue("")
-		.withDescription("The address of Flink HistoryServer.");
-
 	// ------------------------------------------------------------------------
 
 	/** This class is not meant to be instantiated. */