You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/01 08:41:10 UTC

[43/50] [abbrv] flink git commit: [FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

This closes #2651.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0262e27e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0262e27e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0262e27e

Branch: refs/heads/flip-6
Commit: 0262e27e6ebd551c20b93b3bc55d7a8f4f72f70a
Parents: d2d0b12
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 16:22:16 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 1 09:39:34 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobManagerRunner.java     |  6 +++++-
 .../org/apache/flink/runtime/jobmaster/JobMaster.java | 14 ++++++++++----
 .../org/apache/flink/runtime/rpc/RpcEndpoint.java     |  8 ++++++--
 .../flink/runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../flink/runtime/taskexecutor/TaskManagerRunner.java |  2 +-
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java      |  7 +------
 .../runtime/rpc/akka/MainThreadValidationTest.java    |  2 +-
 .../runtime/rpc/akka/MessageSerializationTest.java    |  2 +-
 .../flink/runtime/taskexecutor/TaskExecutorTest.java  |  4 ++--
 9 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 3313d8a..9d8e004 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -374,7 +374,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			// This will eventually be noticed, but can not be ruled out from the beginning.
 			if (leaderElectionService.hasLeadership()) {
 				if (jobRunning) {
-					jobManager.start(leaderSessionID);
+					try {
+						jobManager.start(leaderSessionID);
+					} catch (Exception e) {
+						onFatalError(new Exception("Could not start the job manager.", e));
+					}
 				} else {
 					log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
 					jobFinishedByOther();

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a9ac1fe..49c4df1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -266,7 +266,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 *
 	 * @param leaderSessionID The necessary leader id for running the job.
 	 */
-	public void start(final UUID leaderSessionID) {
+	public void start(final UUID leaderSessionID) throws Exception {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
 			super.start();
 
@@ -282,7 +282,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	 * Suspend the job and shutdown all other services including rpc.
 	 */
 	@Override
-	public void shutDown() {
+	public void shutDown() throws Exception {
 		// make sure there is a graceful exit
 		getSelf().suspendExecution(new Exception("JobManager is shutting down."));
 		super.shutDown();
@@ -381,7 +381,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		try {
 			resourceManagerLeaderRetriever.stop();
 		} catch (Exception e) {
-			log.warn("Failed to stop resource manager leader retriever when suspending.");
+			log.warn("Failed to stop resource manager leader retriever when suspending.", e);
 		}
 		closeResourceManagerConnection();
 
@@ -767,7 +767,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
-				shutDown();
+
+				try {
+					shutDown();
+				} catch (Exception e) {
+					cause.addSuppressed(e);
+				}
+
 				errorHandler.onFatalError(cause);
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index f93a2e2..b971b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -110,8 +110,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 *
 	 * IMPORTANT: Whenever you override this method, call the parent implementation to enable
 	 * rpc processing. It is advised to make the parent call last.
+	 *
+	 * @throws Exception indicating that something went wrong while starting the RPC endpoint
 	 */
-	public void start() {
+	public void start() throws Exception {
 		((StartStoppable) self).start();
 	}
 
@@ -123,8 +125,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * 
 	 * <p>This method can be overridden to add RPC endpoint specific shut down code.
 	 * The overridden method should always call the parent shut down method.
+	 *
+	 * @throws Exception indicating that the something went wrong while shutting the RPC endpoint down
 	 */
-	public void shutDown() {
+	public void shutDown() throws Exception {
 		rpcService.stopServer(self);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 601d804..aea926c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -194,7 +194,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void start() {
+	public void start() throws Exception {
 		super.start();
 
 		// start by connecting to the ResourceManager

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f56d17c..7d9ee55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -107,7 +107,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 	//  Lifecycle management
 	// --------------------------------------------------------------------------------------------
 
-	public void start() {
+	public void start() throws Exception {
 		taskManager.start();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index ba8eb11..d2dbab7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -36,10 +35,6 @@ import org.junit.Test;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -163,7 +158,7 @@ public class AkkaRpcActorTest extends TestLogger {
 	 * @throws InterruptedException
 	 */
 	@Test(timeout=1000)
-	public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException {
+	public void testRpcEndpointTerminationFuture() throws Exception {
 		final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
 		rpcEndpoint.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ec1f7e..9f134d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -34,7 +34,7 @@ import static org.junit.Assert.assertTrue;
 public class MainThreadValidationTest extends TestLogger {
 
 	@Test
-	public void failIfNotInMainThread() {
+	public void failIfNotInMainThread() throws Exception {
 		// test if assertions are activated. The test only works if assertions are loaded.
 		try {
 			assert false;

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 0d5dc28..d640a97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -80,7 +80,7 @@ public class MessageSerializationTest extends TestLogger {
 	 * Tests that a local rpc call with a non serializable argument can be executed.
 	 */
 	@Test
-	public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException {
+	public void testNonSerializableLocalMessageTransfer() throws Exception {
 		LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
 		TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
 		testEndpoint.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/0262e27e/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 87bde35..caae54e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -339,7 +339,7 @@ public class TaskExecutorTest extends TestLogger {
 	 * the job leader, it will offer all reserved slots to the JobManager.
 	 */
 	@Test
-	public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException {
+	public void testJobLeaderDetection() throws Exception {
 		final JobID jobId = new JobID();
 
 		final TestingSerialRpcService rpc = new TestingSerialRpcService();
@@ -606,7 +606,7 @@ public class TaskExecutorTest extends TestLogger {
 	 */
 	@Ignore
 	@Test
-	public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException {
+	public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception {
 		final ResourceID resourceID = ResourceID.generate();
 
 		final String address1 = "/resource/manager/address/one";