You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:39 UTC
[20/52] [abbrv] flink git commit: [FLINK-4882] [flip-6] Remove
exceptions from HighAvailabilityServices where not necessary
[FLINK-4882] [flip-6] Remove exceptions from HighAvailabilityServices where not necessary
Cleanup of the interface HighAvailabilityServices so that only methods which really throw an
exception have an exception clause defined.
This closes #2679.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3aafa16e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3aafa16e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3aafa16e
Branch: refs/heads/master
Commit: 3aafa16eae04d8a5a41b84c9a82480b9742c3fb1
Parents: 6f691ad
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Oct 19 14:09:31 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:24 2016 +0100
----------------------------------------------------------------------
.../highavailability/EmbeddedNonHaServices.java | 4 +--
.../HighAvailabilityServices.java | 33 +++++++++++++++-----
.../runtime/highavailability/NonHaServices.java | 4 +--
.../highavailability/ZookeeperHaServices.java | 12 +++----
.../nonha/AbstractNonHaServices.java | 10 +++---
.../flink/runtime/jobmaster/JobMaster.java | 17 ++--------
.../flink/runtime/util/ZooKeeperUtils.java | 5 ++-
.../TestingHighAvailabilityServices.java | 14 ++++-----
8 files changed, 51 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
index 58da287..523218e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java
@@ -43,12 +43,12 @@ public class EmbeddedNonHaServices extends AbstractNonHaServices implements High
// ------------------------------------------------------------------------
@Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return resourceManagerLeaderService.createLeaderRetrievalService();
}
@Override
- public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
return resourceManagerLeaderService.createLeaderElectionService();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index f6db682..360de7b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -43,50 +43,67 @@ public interface HighAvailabilityServices {
/**
* Gets the leader retriever for the cluster's resource manager.
*/
- LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
+ LeaderRetrievalService getResourceManagerLeaderRetriever();
/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job
*
* @param jobID The identifier of the job.
- * @return
- * @throws Exception
+ * @return Leader retrieval service to retrieve the job manager for the given job
*/
- LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception;
+ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);
/**
* Gets the leader election service for the cluster's resource manager.
+ *
+ * @return Leader election service for the resource manager leader election
*/
- LeaderElectionService getResourceManagerLeaderElectionService() throws Exception;
+ LeaderElectionService getResourceManagerLeaderElectionService();
/**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
+ * @return Leader election service for the job manager leader election
*/
- LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception;
+ LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
/**
* Gets the checkpoint recovery factory for the job manager
+ *
+ * @return Checkpoint recovery factory
*/
- CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception;
+ CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/**
* Gets the submitted job graph store for the job manager
+ *
+ * @return Submitted job graph store
+ * @throws Exception if the submitted job graph store could not be created
*/
SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
/**
* Gets the registry that holds information about whether jobs are currently running.
+ *
+ * @return Running job registry to retrieve running jobs
*/
- RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+ RunningJobsRegistry getRunningJobsRegistry();
/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
+ *
+ * @return Blob store
+ * @throws IOException if the blob store could not be created
*/
BlobStore createBlobStore() throws IOException;
// ------------------------------------------------------------------------
+ /**
+ * Shut the high availability service down.
+ *
+ * @throws Exception if the shut down fails
+ */
void shutdown() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 107cbd0..75f44ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -57,12 +57,12 @@ public class NonHaServices extends AbstractNonHaServices implements HighAvailabi
// ------------------------------------------------------------------------
@Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
}
@Override
- public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
return new StandaloneLeaderElectionService();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index e38840b..3e909e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -108,27 +108,27 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
// ------------------------------------------------------------------------
@Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
}
@Override
- public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
@Override
- public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
}
@Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
}
@@ -138,7 +138,7 @@ public class ZookeeperHaServices implements HighAvailabilityServices {
}
@Override
- public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ public RunningJobsRegistry getRunningJobsRegistry() {
throw new UnsupportedOperationException("not yet implemented");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index 8c15a52..237727f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -72,7 +72,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
// ------------------------------------------------------------------------
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
checkNotNull(jobID);
synchronized (lock) {
@@ -83,7 +83,7 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
}
@Override
- public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
checkNotNull(jobID);
synchronized (lock) {
@@ -104,19 +104,19 @@ public abstract class AbstractNonHaServices implements HighAvailabilityServices
}
@Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
checkNotShutdown();
return new StandaloneCheckpointRecoveryFactory();
}
@Override
- public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() {
checkNotShutdown();
return new StandaloneSubmittedJobGraphStore();
}
@Override
- public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ public RunningJobsRegistry getRunningJobsRegistry() {
checkNotShutdown();
return runningJobsRegistry;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3c6bbd3..204cd80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -229,21 +228,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
log.info("Using restart strategy {} for {} ({}).", restartStrategy, jobName, jid);
- CheckpointRecoveryFactory checkpointRecoveryFactory;
- try {
- checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
- } catch (Exception e) {
- log.error("Could not create the access to highly-available checkpoint storage.", e);
- throw new Exception("Could not create the access to highly-available checkpoint storage.", e);
- }
+ CheckpointRecoveryFactory checkpointRecoveryFactory = highAvailabilityServices.getCheckpointRecoveryFactory();
- try {
- resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
- } catch (Exception e) {
- log.error("Could not get the resource manager leader retriever.", e);
- throw new JobSubmissionException(jobGraph.getJobID(),
- "Could not get the resource manager leader retriever.", e);
- }
+ resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever();
this.executionGraph = ExecutionGraphBuilder.buildGraph(
null,
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 81609c2..621edcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -193,7 +193,7 @@ public class ZooKeeperUtils {
public static ZooKeeperLeaderRetrievalService createLeaderRetrievalService(
final CuratorFramework client,
final Configuration configuration,
- final String pathSuffix) throws Exception
+ final String pathSuffix)
{
String leaderPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
@@ -240,12 +240,11 @@ public class ZooKeeperUtils {
* @param configuration {@link Configuration} object containing the configuration values
* @param pathSuffix The path suffix which we want to append
* @return {@link ZooKeeperLeaderElectionService} instance.
- * @throws Exception
*/
public static ZooKeeperLeaderElectionService createLeaderElectionService(
final CuratorFramework client,
final Configuration configuration,
- final String pathSuffix) throws Exception
+ final String pathSuffix)
{
final String latchPath = ConfigurationUtil.getStringWithDeprecatedKeys(
configuration,
http://git-wip-us.apache.org/repos/asf/flink/blob/3aafa16e/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 877812b..e0f71ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -81,7 +81,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
// ------------------------------------------------------------------------
@Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception {
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
if (service != null) {
return service;
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws Exception {
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID);
if (service != null) {
return service;
@@ -101,7 +101,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception {
+ public LeaderElectionService getResourceManagerLeaderElectionService() {
LeaderElectionService service = resourceManagerLeaderElectionService;
if (service != null) {
@@ -112,7 +112,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) throws Exception {
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
LeaderElectionService service = this.jobManagerLeaderElectionServices.get(jobID);
if (service != null) {
@@ -123,7 +123,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception {
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
if (factory != null) {
@@ -134,7 +134,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
+ public SubmittedJobGraphStore getSubmittedJobGraphStore() {
SubmittedJobGraphStore store = submittedJobGraphStore;
if (store != null) {
@@ -146,7 +146,7 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices
}
@Override
- public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+ public RunningJobsRegistry getRunningJobsRegistry() {
return new NonHaRegistry();
}