You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:39 UTC

[20/52] [abbrv] flink git commit: [FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary

[FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary

Cleanup of the interface HighAvailabilityServices so that only methods which really throw an
exception have an exception clause defined.

This closes #2679.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3aafa16e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3aafa16e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3aafa16e

Branch: refs/heads/master
Commit: 3aafa16eae04d8a5a41b84c9a82480b9742c3fb1
Parents: 6f691ad
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 19 14:09:31 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:24 2016 +0100

----------------------------------------------------------------------
 .../highavailability/EmbeddedNonHaServices.java |  4 +--
 .../HighAvailabilityServices.java               | 33 +++++++++++++++-----
 .../runtime/highavailability/NonHaServices.java |  4 +--
 .../highavailability/ZookeeperHaServices.java   | 12 +++----
 .../nonha/AbstractNonHaServices.java            | 10 +++---
 .../flink/runtime/jobmaster/JobMaster.java      | 17 ++--------
 .../flink/runtime/util/ZooKeeperUtils.java      |  5 ++-
 .../TestingHighAvailabilityServices.java        | 14 ++++-----
 8 files changed, 51 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 58da287..523218e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,12 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return resourceManagerLeaderService.createLeaderRetrievalService();
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return resourceManagerLeaderService.createLeaderElectionService();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index f6db682..360de7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -43,50 +43,67 @@ public interface HighAvailabilityServices {
 	/**
 	 * Gets the leader retriever for the cluster's resource manager.
 	 */
-	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+	LeaderRetrievalService getResourceManagerLeaderRetriever();
 
 	/**
 	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
 	 *
 	 * @param jobID The identifier of the job.
-	 * @return
-	 * @throws Exception
+	 * @return Leader retrieval service to retrieve the job manager for the given job
 	 */
-	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
+	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
 
 	/**
 	 * Gets the leader election service for the cluster's resource manager.
+	 *
+	 * @return Leader election service for the resource manager leader election
 	 */
-	LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
+	LeaderElectionService getResourceManagerLeaderElectionService();
 
 	/**
 	 * Gets the leader election service for the given job.
 	 *
 	 * @param jobID The identifier of the job running the election.
+	 * @return Leader election service for the job manager leader election
 	 */
-	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception;
+	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
 
 	/**
 	 * Gets the checkpoint recovery factory for the job manager
+	 *
+	 * @return Checkpoint recovery factory
 	 */
-	CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
+	CheckpointRecoveryFactory getCheckpointRecoveryFactory();
 
 	/**
 	 * Gets the submitted job graph store for the job manager
+	 *
+	 * @return Submitted job graph store
+	 * @throws Exception if the submitted job graph store could not be created
 	 */
 	SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
 
 	/**
 	 * Gets the registry that holds information about whether jobs are currently running.
+	 *
+	 * @return Running job registry to retrieve running jobs
 	 */
-	RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+	RunningJobsRegistry getRunningJobsRegistry();
 
 	/**
 	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
+	 *
+	 * @return Blob store
+	 * @throws IOException if the blob store could not be created
 	 */
 	BlobStore createBlobStore() throws IOException;
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Shut the high availability service down.
+	 *
+	 * @throws Exception if the shut down fails
+	 */
 	void shutdown() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 107cbd0..75f44ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -57,12 +57,12 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return new StandaloneLeaderElectionService();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index e38840b..3e909e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -108,27 +108,27 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
 	}
 
 	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
 	}
 
@@ -138,7 +138,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
 	}
 
 	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+	public RunningJobsRegistry getRunningJobsRegistry() {
 		throw new UnsupportedOperationException("not yet implemented");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 8c15a52..237727f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -72,7 +72,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		checkNotNull(jobID);
 
 		synchronized (lock) {
@@ -83,7 +83,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		checkNotNull(jobID);
 
 		synchronized (lock) {
@@ -104,19 +104,19 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		checkNotShutdown();
 		return new StandaloneCheckpointRecoveryFactory();
 	}
 
 	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() {
 		checkNotShutdown();
 		return new StandaloneSubmittedJobGraphStore();
 	}
 
 	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+	public RunningJobsRegistry getRunningJobsRegistry() {
 		checkNotShutdown();
 		return runningJobsRegistry;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3c6bbd3..204cd80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -229,21 +228,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 		log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
 
-		CheckpointRecoveryFactory checkpointRecoveryFactory;
-		try {
-			checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
-		} catch (Exception e) {
-			log.error("Could not create the access to highly-available checkpoint storage.", e);
-			throw new Exception("Could not create the access to highly-available checkpoint storage.", e);
-		}
+		CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
 
-		try {
-			resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
-		} catch (Exception e) {
-			log.error("Could not get the resource manager leader retriever.", e);
-			throw new JobSubmissionException(jobGraph.getJobID(),
-					"Could not get the resource manager leader retriever.", e);
-		}
+		resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
 		this.executionGraph = ExecutionGraphBuilder.buildGraph(
 				null,

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 81609c2..621edcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -193,7 +193,7 @@ public class ZooKeeperUtils {
 	public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
 		final CuratorFramework client,
 		final Configuration configuration,
-		final String pathSuffix) throws Exception
+		final String pathSuffix)
 	{
 		String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
 			configuration,
@@ -240,12 +240,11 @@ public class ZooKeeperUtils {
 	 * @param configuration {@link Configuration} object containing the configuration values
 	 * @param pathSuffix    The path suffix which we want to append
 	 * @return {@link ZooKeeperLeaderElectionService} instance.
-	 * @throws Exception
 	 */
 	public static ZooKeeperLeaderElectionService createLeaderElectionService(
 		final CuratorFramework client,
 		final Configuration configuration,
-		final String pathSuffix) throws Exception
+		final String pathSuffix)
 	{
 		final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
 			configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 877812b..e0f71ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -81,7 +81,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	// ------------------------------------------------------------------------
 
 	@Override
-	public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
 		LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
 		if (service != null) {
 			return service;
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
 		LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
 		if (service != null) {
 			return service;
@@ -101,7 +101,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+	public LeaderElectionService getResourceManagerLeaderElectionService() {
 		LeaderElectionService service = resourceManagerLeaderElectionService;
 
 		if (service != null) {
@@ -112,7 +112,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
 		LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
 
 		if (service != null) {
@@ -123,7 +123,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
 		CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
 
 		if (factory != null) {
@@ -134,7 +134,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+	public SubmittedJobGraphStore getSubmittedJobGraphStore() {
 		SubmittedJobGraphStore store = submittedJobGraphStore;
 
 		if (store != null) {
@@ -146,7 +146,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
 	}
 
 	@Override
-	public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+	public RunningJobsRegistry getRunningJobsRegistry() {
 		return new NonHaRegistry();
 	}