You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:36 UTC

[17/52] [abbrv] flink git commit: [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

This closes #2651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1934255
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1934255
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1934255

Branch: refs/heads/master
Commit: a1934255421b97eefd579183e9c7199c43ad1a2c
Parents: 3aafa16
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 16:22:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:24 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobManagerRunner.java     |  6 +++++-
 .../org/apache/flink/runtime/jobmaster/JobMaster.java | 14 ++++++++++----
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java     |  8 ++++++--
 .../flink/runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../flink/runtime/taskexecutor/TaskManagerRunner.java |  2 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java      |  7 +------
 .../runtime/rpc/akka/MainThreadValidationTest.java    |  2 +-
 .../runtime/rpc/akka/MessageSerializationTest.java    |  2 +-
 .../flink/runtime/taskexecutor/TaskExecutorTest.java  |  4 ++--
 9 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 3313d8a..9d8e004 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -374,7 +374,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			// This will eventually be noticed, but can not be ruled out from the beginning.
 			if (leaderElectionService.hasLeadership()) {
 				if (jobRunning) {
-					jobManager.start(leaderSessionID);
+					try {
+						jobManager.start(leaderSessionID);
+					} catch (Exception e) {
+						onFatalError(new Exception("Could not start the job manager.", e));
+					}
 				} else {
 					log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
 					jobFinishedByOther();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 204cd80..c80cc51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -267,7 +267,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 *
 	 * @param leaderSessionID The necessary leader id for running the job.
 	 */
-	public void start(final UUID leaderSessionID) {
+	public void start(final UUID leaderSessionID) throws Exception {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
 			super.start();
 
@@ -283,7 +283,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * Suspend the job and shutdown all other services including rpc.
 	 */
 	@Override
-	public void shutDown() {
+	public void shutDown() throws Exception {
 		// make sure there is a graceful exit
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
@@ -382,7 +382,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		try {
 			resourceManagerLeaderRetriever.stop();
 		} catch (Exception e) {
-			log.warn("Failed to stop resource manager leader retriever when suspending.");
+			log.warn("Failed to stop resource manager leader retriever when suspending.", e);
 		}
 		closeResourceManagerConnection();
 
@@ -761,7 +761,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
-				shutDown();
+
+				try {
+					shutDown();
+				} catch (Exception e) {
+					cause.addSuppressed(e);
+				}
+
 				errorHandler.onFatalError(cause);
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index f93a2e2..b971b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -110,8 +110,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 *
 	 * IMPORTANT: Whenever you override this method, call the parent implementation to enable
 	 * rpc processing. It is advised to make the parent call last.
+	 *
+	 * @throws Exception indicating that something went wrong while starting the RPC endpoint
 	 */
-	public void start() {
+	public void start() throws Exception {
 		((StartStoppable) self).start();
 	}
 
@@ -123,8 +125,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * 
 	 * <p>This method can be overridden to add RPC endpoint specific shut down code.
 	 * The overridden method should always call the parent shut down method.
+	 *
+	 * @throws Exception indicating that the something went wrong while shutting the RPC endpoint down
 	 */
-	public void shutDown() {
+	public void shutDown() throws Exception {
 		rpcService.stopServer(self);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 679324b..8187fde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -196,7 +196,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void start() {
+	public void start() throws Exception {
 		super.start();
 
 		// start by connecting to the ResourceManager

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f56d17c..7d9ee55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -107,7 +107,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	//  Lifecycle management
 	// --------------------------------------------------------------------------------------------
 
-	public void start() {
+	public void start() throws Exception {
 		taskManager.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index ba8eb11..d2dbab7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -36,10 +35,6 @@ import org.junit.Test;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -163,7 +158,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	 * @throws InterruptedException
 	 */
 	@Test(timeout=1000)
-	public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException {
+	public void testRpcEndpointTerminationFuture() throws Exception {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ec1f7e..9f134d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -34,7 +34,7 @@ import static org.junit.Assert.assertTrue;
 public class MainThreadValidationTest extends TestLogger {
 
 	@Test
-	public void failIfNotInMainThread() {
+	public void failIfNotInMainThread() throws Exception {
 		// test if assertions are activated. The test only works if assertions are loaded.
 		try {
 			assert false;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 0d5dc28..d640a97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -80,7 +80,7 @@ public class MessageSerializationTest extends TestLogger {
 	 * Tests that a local rpc call with a non serializable argument can be executed.
 	 */
 	@Test
-	public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException {
+	public void testNonSerializableLocalMessageTransfer() throws Exception {
 		LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
 		testEndpoint.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 4d73a4b..638ec56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -354,7 +354,7 @@ public class TaskExecutorTest extends TestLogger {
 	 * the job leader, it will offer all reserved slots to the JobManager.
 	 */
 	@Test
-	public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException {
+	public void testJobLeaderDetection() throws Exception {
 		final JobID jobId = new JobID();
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
@@ -621,7 +621,7 @@ public class TaskExecutorTest extends TestLogger {
 	 */
 	@Ignore
 	@Test
-	public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException {
+	public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception {
 		final ResourceID resourceID = ResourceID.generate();
 
 		final String address1 = "/resource/manager/address/one";