You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/10/23 09:10:00 UTC

flink git commit: [FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary

Repository: flink
Updated Branches:
  refs/heads/flip-6 930334ef7 -> 0de568963


[FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary

Cleanup of the interface HighAvailabilityServices so that only methods which really throw an
exception have an exception clause defined.

This closes #2679.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0de56896
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0de56896
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0de56896

Branch: refs/heads/flip-6
Commit: 0de5689632bd1f8eac6e436959d80d31df9e5ef9
Parents: 930334e
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 19 14:09:31 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Oct 23 11:07:56 2016 +0200

----------------------------------------------------------------------
 .../highavailability/EmbeddedNonHaServices.java |  4 +--
 .../HighAvailabilityServices.java               | 33 +++++++++++++++-----
 .../runtime/highavailability/NonHaServices.java |  4 +--
 .../highavailability/ZookeeperHaServices.java   | 12 +++----
 .../nonha/AbstractNonHaServices.java            | 10 +++---
 .../flink/runtime/jobmaster/JobMaster.java      | 17 ++--------
 .../TestingHighAvailabilityServices.java        | 14 ++++-----
 7 files changed, 49 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 58da287..523218e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,12 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return resourceManagerLeaderService.createLeaderRetrievalService();
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return resourceManagerLeaderService.createLeaderElectionService();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index f6db682..360de7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -43,50 +43,67 @@ public interface HighAvailabilityServices {
 	/**
 	 * Gets the leader retriever for the cluster's resource manager.
 	 */
-	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+	LeaderRetrievalService getResourceManagerLeaderRetriever();
 
 	/**
 	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
 	 *
 	 * @param jobID The identifier of the job.
-	 * @return
-	 * @throws Exception
+	 * @return Leader retrieval service to retrieve the job manager for the given job
 	 */
-	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
+	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
 
 	/**
 	 * Gets the leader election service for the cluster's resource manager.
+	 *
+	 * @return Leader election service for the resource manager leader election
 	 */
-	LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
+	LeaderElectionService getResourceManagerLeaderElectionService();
 
 	/**
 	 * Gets the leader election service for the given job.
 	 *
 	 * @param jobID The identifier of the job running the election.
+	 * @return Leader election service for the job manager leader election
 	 */
-	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception;
+	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
 
 	/**
 	 * Gets the checkpoint recovery factory for the job manager
+	 *
+	 * @return Checkpoint recovery factory
 	 */
-	CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
+	CheckpointRecoveryFactory getCheckpointRecoveryFactory();
 
 	/**
 	 * Gets the submitted job graph store for the job manager
+	 *
+	 * @return Submitted job graph store
+	 * @throws Exception if the submitted job graph store could not be created
 	 */
 	SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
 
 	/**
 	 * Gets the registry that holds information about whether jobs are currently running.
+	 *
+	 * @return Running job registry to retrieve running jobs
 	 */
-	RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+	RunningJobsRegistry getRunningJobsRegistry();
 
 	/**
 	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
+	 *
+	 * @return Blob store
+	 * @throws IOException if the blob store could not be created
 	 */
 	BlobStore createBlobStore() throws IOException;
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Shut the high availability service down.
+	 *
+	 * @throws Exception if the shut down fails
+	 */
 	void shutdown() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 107cbd0..75f44ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -57,12 +57,12 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return new StandaloneLeaderElectionService();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index a9d2610..07c5011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -107,27 +107,27 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
 	}
 
 	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		return new ZooKeeperCheckpointRecoveryFactory(client, configuration);
 	}
 
@@ -137,7 +137,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+	public RunningJobsRegistry getRunningJobsRegistry() {
 		throw new UnsupportedOperationException("not yet implemented");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 8c15a52..237727f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -72,7 +72,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		checkNotNull(jobID);
 
 		synchronized (lock) {
@@ -83,7 +83,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		checkNotNull(jobID);
 
 		synchronized (lock) {
@@ -104,19 +104,19 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		checkNotShutdown();
 		return new StandaloneCheckpointRecoveryFactory();
 	}
 
 	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() {
 		checkNotShutdown();
 		return new StandaloneSubmittedJobGraphStore();
 	}
 
 	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+	public RunningJobsRegistry getRunningJobsRegistry() {
 		checkNotShutdown();
 		return runningJobsRegistry;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5a7c9a1..a9ac1fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -229,21 +228,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
 
-		CheckpointRecoveryFactory checkpointRecoveryFactory;
-		try {
-			checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
-		} catch (Exception e) {
-			log.error("Could not create the access to highly-available checkpoint storage.", e);
-			throw new Exception("Could not create the access to highly-available checkpoint storage.", e);
-		}
+		CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
 
-		try {
-			resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
-		} catch (Exception e) {
-			log.error("Could not get the resource manager leader retriever.", e);
-			throw new JobSubmissionException(jobGraph.getJobID(),
-					"Could not get the resource manager leader retriever.", e);
-		}
+		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
 		this.executionGraph = ExecutionGraphBuilder.buildGraph(
 				null,

http://git-wip-us.apache.org/repos/asf/flink/blob/0de56896/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 877812b..e0f71ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -81,7 +81,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
 		if (service != null) {
 			return service;
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
 			return service;
@@ -101,7 +101,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		LeaderElectionService service = resourceManagerLeaderElectionService;
 
 		if (service != null) {
@@ -112,7 +112,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
 
 		if (service != null) {
@@ -123,7 +123,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
 
 		if (factory != null) {
@@ -134,7 +134,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() {
 		SubmittedJobGraphStore store = submittedJobGraphStore;
 
 		if (store != null) {
@@ -146,7 +146,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+	public RunningJobsRegistry getRunningJobsRegistry() {
 		return new NonHaRegistry();
 	}