You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:36 UTC
[17/52] [abbrv] flink git commit: [FLINK-4847] Let
RpcEndpoint.start/shutDown throw exceptions
[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions
Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.
This closes #2651.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1934255
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1934255
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1934255
Branch: refs/heads/master
Commit: a1934255421b97eefd579183e9c7199c43ad1a2c
Parents: 3aafa16
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Oct 17 16:22:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:24 2016 +0100
----------------------------------------------------------------------
.../flink/runtime/jobmaster/JobManagerRunner.java | 6 +++++-
.../org/apache/flink/runtime/jobmaster/JobMaster.java | 14 ++++++++++----
.../org/apache/flink/runtime/rpc/RpcEndpoint.java | 8 ++++++--
.../flink/runtime/taskexecutor/TaskExecutor.java | 2 +-
.../flink/runtime/taskexecutor/TaskManagerRunner.java | 2 +-
.../flink/runtime/rpc/akka/AkkaRpcActorTest.java | 7 +------
.../runtime/rpc/akka/MainThreadValidationTest.java | 2 +-
.../runtime/rpc/akka/MessageSerializationTest.java | 2 +-
.../flink/runtime/taskexecutor/TaskExecutorTest.java | 4 ++--
9 files changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 3313d8a..9d8e004 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -374,7 +374,11 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
// This will eventually be noticed, but can not be ruled out from the beginning.
if (leaderElectionService.hasLeadership()) {
if (jobRunning) {
- jobManager.start(leaderSessionID);
+ try {
+ jobManager.start(leaderSessionID);
+ } catch (Exception e) {
+ onFatalError(new Exception("Could not start the job manager.", e));
+ }
} else {
log.info("Job {} ({}) already finished by others.", jobGraph.getName(), jobGraph.getJobID());
jobFinishedByOther();
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 204cd80..c80cc51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -267,7 +267,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
*
* @param leaderSessionID The necessary leader id for running the job.
*/
- public void start(final UUID leaderSessionID) {
+ public void start(final UUID leaderSessionID) throws Exception {
if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
super.start();
@@ -283,7 +283,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
* Suspend the job and shutdown all other services including rpc.
*/
@Override
- public void shutDown() {
+ public void shutDown() throws Exception {
// make sure there is a graceful exit
getSelf().suspendExecution(new Exception("JobManager is shutting down."));
super.shutDown();
@@ -382,7 +382,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
try {
resourceManagerLeaderRetriever.stop();
} catch (Exception e) {
- log.warn("Failed to stop resource manager leader retriever when suspending.");
+ log.warn("Failed to stop resource manager leader retriever when suspending.", e);
}
closeResourceManagerConnection();
@@ -761,7 +761,13 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
@Override
public void run() {
log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
- shutDown();
+
+ try {
+ shutDown();
+ } catch (Exception e) {
+ cause.addSuppressed(e);
+ }
+
errorHandler.onFatalError(cause);
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index f93a2e2..b971b96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -110,8 +110,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
*
* IMPORTANT: Whenever you override this method, call the parent implementation to enable
* rpc processing. It is advised to make the parent call last.
+ *
+ * @throws Exception indicating that something went wrong while starting the RPC endpoint
*/
- public void start() {
+ public void start() throws Exception {
((StartStoppable) self).start();
}
@@ -123,8 +125,10 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
*
* <p>This method can be overridden to add RPC endpoint specific shut down code.
* The overridden method should always call the parent shut down method.
+ *
+ * @throws Exception indicating that the something went wrong while shutting the RPC endpoint down
*/
- public void shutDown() {
+ public void shutDown() throws Exception {
rpcService.stopServer(self);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 679324b..8187fde 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -196,7 +196,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
// ------------------------------------------------------------------------
@Override
- public void start() {
+ public void start() throws Exception {
super.start();
// start by connecting to the ResourceManager
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index f56d17c..7d9ee55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -107,7 +107,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
// Lifecycle management
// --------------------------------------------------------------------------------------------
- public void start() {
+ public void start() throws Exception {
taskManager.start();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index ba8eb11..d2dbab7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -36,10 +35,6 @@ import org.junit.Test;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -163,7 +158,7 @@ public class AkkaRpcActorTest extends TestLogger {
* @throws InterruptedException
*/
@Test(timeout=1000)
- public void testRpcEndpointTerminationFuture() throws ExecutionException, InterruptedException {
+ public void testRpcEndpointTerminationFuture() throws Exception {
final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService);
rpcEndpoint.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index 9ec1f7e..9f134d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -34,7 +34,7 @@ import static org.junit.Assert.assertTrue;
public class MainThreadValidationTest extends TestLogger {
@Test
- public void failIfNotInMainThread() {
+ public void failIfNotInMainThread() throws Exception {
// test if assertions are activated. The test only works if assertions are loaded.
try {
assert false;
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index 0d5dc28..d640a97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -80,7 +80,7 @@ public class MessageSerializationTest extends TestLogger {
* Tests that a local rpc call with a non serializable argument can be executed.
*/
@Test
- public void testNonSerializableLocalMessageTransfer() throws InterruptedException, IOException {
+ public void testNonSerializableLocalMessageTransfer() throws Exception {
LinkedBlockingQueue<Object> linkedBlockingQueue = new LinkedBlockingQueue<>();
TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, linkedBlockingQueue);
testEndpoint.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/a1934255/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 4d73a4b..638ec56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -354,7 +354,7 @@ public class TaskExecutorTest extends TestLogger {
* the job leader, it will offer all reserved slots to the JobManager.
*/
@Test
- public void testJobLeaderDetection() throws TestingFatalErrorHandler.TestingException, SlotAllocationException {
+ public void testJobLeaderDetection() throws Exception {
final JobID jobId = new JobID();
final TestingSerialRpcService rpc = new TestingSerialRpcService();
@@ -621,7 +621,7 @@ public class TaskExecutorTest extends TestLogger {
*/
@Ignore
@Test
- public void testRejectAllocationRequestsForOutOfSyncSlots() throws SlotAllocationException {
+ public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception {
final ResourceID resourceID = ResourceID.generate();
final String address1 = "/resource/manager/address/one";