You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shixiaogang <gi...@git.apache.org> on 2016/08/17 07:43:11 UTC

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

GitHub user shixiaogang opened a pull request:

    https://github.com/apache/flink/pull/2377

    [Flink-4400][cluster management]Leadership Election among JobManagers

    - Implement LeaderContender interface in JobMaster.
    - Create and start the leader election service in JobMaster's bootstrapping.
    - Reformat the JobMaster's code with the style provided by Kete.Young.
    - Add an argument typed FlinkConfiguration in JobMaster's constructor.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4400

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2377.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2377
    
----
commit f9d5214ede8666824443fb68933893e9f98f8297
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Date:   2016-08-17T05:46:00Z

    Implement leader contention prototypes in JobMaster

commit ed5cf7556991f3ed9a8a09d8b34c4fe76c3f8a96
Author: xiaogang.sxg <xi...@alibaba-inc.com>
Date:   2016-08-17T07:35:23Z

    reformat the code and remove the test code

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75321303
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -75,18 +82,132 @@
     	/** UUID to filter out old registration runs */
     	private UUID currentRegistrationRun;
     
    -	public JobMaster(RpcService rpcService, ExecutorService executorService) {
    +	/** Logical representation of the job */
    +	private JobGraph jobGraph;
    +	private JobID jobID;
    +
    +	/** Configuration of the job */
    +	private Configuration configuration;
    +	private RecoveryMode recoveryMode;
    +
    +	/** Service to contend for and retrieve the leadership of JM and RM */
    +	private HighAvailabilityServices highAvailabilityServices;
    +
    +	/** Leader Management */
    +	private LeaderElectionService leaderElectionService = null;
    +	private UUID leaderSessionID;
    +
    +	public JobMaster(
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		ExecutorService executorService,
    +		HighAvailabilityServices highAvailabilityService
    +	) {
     		super(rpcService);
    -		executionContext = ExecutionContext$.MODULE$.fromExecutor(
    +
    +		this.jobGraph = Preconditions.checkNotNull(jobGraph);
    +		this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		this.configuration = configuration;
    --- End diff --
    
    `checkNotNull` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75485459
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -248,4 +374,40 @@ void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, I
     	public boolean isConnected() {
     		return resourceManager != null;
     	}
    +
    +
    +	/**
    +	 * Cancel the current job and notify all listeners the job's cancellation.
    +	 *
    +	 * @param cause Cause for the cancelling.
    +	 */
    +	private void cancelAndClearEverything(Throwable cause) {
    +		// currently, nothing to do here
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Utility classes
    +	// ------------------------------------------------------------------------
    +	private class JobMasterLeaderContender implements LeaderContender {
    --- End diff --
    
    You're right @shixiaogang. Then let's not do it and keep it as it is :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75481109
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -248,4 +374,40 @@ void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, I
     	public boolean isConnected() {
     		return resourceManager != null;
     	}
    +
    +
    +	/**
    +	 * Cancel the current job and notify all listeners the job's cancellation.
    +	 *
    +	 * @param cause Cause for the cancelling.
    +	 */
    +	private void cancelAndClearEverything(Throwable cause) {
    +		// currently, nothing to do here
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Utility classes
    +	// ------------------------------------------------------------------------
    +	private class JobMasterLeaderContender implements LeaderContender {
    --- End diff --
    
    I used to make JobMaster implement the LeaderContender interface. But after checking Stephan's implementation of TaskExecutor, I modified my implementation :)
    
    JobMaster is not only a contender for JM's leader, but also a listener of RM's leader. If we let JM directly implement the `LeaderContender` interface, then we should also make JM implement the `LeaderRetrieval` interface. Note that the two interfaces both have the method called `handleError`. The implementation of `handleError` will be very difficult because we have to check the causes of the exception: if the JM's contention for leadership fails, the JM should kill itself. But in the cases where the listener of RM fails, JM can just wait there for the RM's recovery. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75691504
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -59,62 +42,52 @@
      * It offers the following methods as part of its rpc interface to interact with the JobMaster
      * remotely:
      * <ul>
    - *     <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li>
      *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for
      * given task</li>
      * </ul>
      */
     public class JobMaster extends RpcEndpoint<JobMasterGateway> {
    -	/** Execution context for future callbacks */
    -	private final ExecutionContext executionContext;
    -
    -	/** Execution context for scheduled runnables */
    -	private final ScheduledExecutorService scheduledExecutorService;
    -
    -	private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS);
    -	private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
    -	private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS);
    -	private final long failedRegistrationDelay = 10000;
     
     	/** Gateway to connected resource manager, null iff not connected */
     	private ResourceManagerGateway resourceManager = null;
     
    -	/** UUID to filter out old registration runs */
    -	private UUID currentRegistrationRun;
    -
     	/** Logical representation of the job */
    -	private JobGraph jobGraph;
    -	private JobID jobID;
    +	private final JobGraph jobGraph;
    +	private final JobID jobID;
     
     	/** Configuration of the job */
    -	private Configuration configuration;
    -	private RecoveryMode recoveryMode;
    +	private final Configuration configuration;
    +	private final RecoveryMode recoveryMode;
     
     	/** Service to contend for and retrieve the leadership of JM and RM */
    -	private HighAvailabilityServices highAvailabilityServices;
    +	private final HighAvailabilityServices highAvailabilityServices;
     
     	/** Leader Management */
     	private LeaderElectionService leaderElectionService = null;
     	private UUID leaderSessionID;
     
    +	/**
    +	 * The JM's Constructor
    --- End diff --
    
    Javadocs usually say what the method/constructor does and not what it is. At the moment, it just states the obvious.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75323128
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---
    @@ -36,4 +38,9 @@
     	 * Gets the leader retriever for the cluster's resource manager.
     	 */
     	LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception;
    +
    +	/**
    +	 * Gets the leader election service for the given job.
    +	 */
    +	LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception;
    --- End diff --
    
    Param description is missing here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75321263
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -75,18 +82,132 @@
     	/** UUID to filter out old registration runs */
     	private UUID currentRegistrationRun;
     
    -	public JobMaster(RpcService rpcService, ExecutorService executorService) {
    +	/** Logical representation of the job */
    +	private JobGraph jobGraph;
    +	private JobID jobID;
    +
    +	/** Configuration of the job */
    +	private Configuration configuration;
    +	private RecoveryMode recoveryMode;
    +
    +	/** Service to contend for and retrieve the leadership of JM and RM */
    +	private HighAvailabilityServices highAvailabilityServices;
    --- End diff --
    
    Can these fields be final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75322999
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -248,4 +374,40 @@ void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, I
     	public boolean isConnected() {
     		return resourceManager != null;
     	}
    +
    +
    +	/**
    +	 * Cancel the current job and notify all listeners the job's cancellation.
    +	 *
    +	 * @param cause Cause for the cancelling.
    +	 */
    +	private void cancelAndClearEverything(Throwable cause) {
    +		// currently, nothing to do here
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Utility classes
    +	// ------------------------------------------------------------------------
    +	private class JobMasterLeaderContender implements LeaderContender {
    --- End diff --
    
    I'm not really decided here: Maybe we could let the JobMaster directly implement the LeaderContender interface. Then we could get rid of one layer of indirection (basically the `grantJobMasterleadership`, the `revokeJobMasterLeadership` and `onJobMasterElectionError` methods. Would that make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2377: [Flink-4400][cluster management]Leadership Election among...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2377
  
    Changes look good to me @shixiaogang. Thanks a lot for your work. Will be merging your PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang closed the pull request at:

    https://github.com/apache/flink/pull/2377


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75321520
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -75,18 +82,132 @@
     	/** UUID to filter out old registration runs */
     	private UUID currentRegistrationRun;
     
    -	public JobMaster(RpcService rpcService, ExecutorService executorService) {
    +	/** Logical representation of the job */
    +	private JobGraph jobGraph;
    +	private JobID jobID;
    +
    +	/** Configuration of the job */
    +	private Configuration configuration;
    +	private RecoveryMode recoveryMode;
    +
    +	/** Service to contend for and retrieve the leadership of JM and RM */
    +	private HighAvailabilityServices highAvailabilityServices;
    +
    +	/** Leader Management */
    +	private LeaderElectionService leaderElectionService = null;
    +	private UUID leaderSessionID;
    +
    +	public JobMaster(
    +		JobGraph jobGraph,
    +		Configuration configuration,
    +		RpcService rpcService,
    +		ExecutorService executorService,
    +		HighAvailabilityServices highAvailabilityService
    +	) {
     		super(rpcService);
    -		executionContext = ExecutionContext$.MODULE$.fromExecutor(
    +
    +		this.jobGraph = Preconditions.checkNotNull(jobGraph);
    +		this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		this.configuration = configuration;
    +		this.recoveryMode = RecoveryMode.fromConfig(configuration);
    +
    +		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
     			Preconditions.checkNotNull(executorService));
    -		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    +		this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    --- End diff --
    
    We might can get rid of these execution contexts, because the `RpcService` now offers some as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75323262
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -135,12 +256,12 @@ public void registerAtResourceManager(final String address) {
     	 *
     	 * @param jobMasterRegistration Job master registration info which is sent to the resource
     	 *                              manager
    -	 * @param attemptNumber Registration attempt number
    +	 * @param attemptNumber         Registration attempt number
     	 * @param resourceManagerFuture Future of the resource manager gateway
    -	 * @param registrationRun UUID describing the current registration run
    -	 * @param timeout Timeout of the last registration attempt
    -	 * @param maxTimeout Maximum timeout between registration attempts
    -	 * @param deadline Deadline for the registration
    +	 * @param registrationRun       UUID describing the current registration run
    +	 * @param timeout               Timeout of the last registration attempt
    +	 * @param maxTimeout            Maximum timeout between registration attempts
    +	 * @param deadline              Deadline for the registration
     	 */
     	void handleResourceManagerRegistration(
    --- End diff --
    
    That's actually a follow-up task an should not be part of this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2377: [Flink-4400][cluster management]Leadership Election among...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2377
  
    Good work @shixiaogang. I like your changes. I had only some minor comments inline. After we've addressed them, the PR is good to be merged :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75322074
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -135,12 +256,12 @@ public void registerAtResourceManager(final String address) {
     	 *
     	 * @param jobMasterRegistration Job master registration info which is sent to the resource
     	 *                              manager
    -	 * @param attemptNumber Registration attempt number
    +	 * @param attemptNumber         Registration attempt number
     	 * @param resourceManagerFuture Future of the resource manager gateway
    -	 * @param registrationRun UUID describing the current registration run
    -	 * @param timeout Timeout of the last registration attempt
    -	 * @param maxTimeout Maximum timeout between registration attempts
    -	 * @param deadline Deadline for the registration
    +	 * @param registrationRun       UUID describing the current registration run
    +	 * @param timeout               Timeout of the last registration attempt
    +	 * @param maxTimeout            Maximum timeout between registration attempts
    +	 * @param deadline              Deadline for the registration
     	 */
     	void handleResourceManagerRegistration(
    --- End diff --
    
    Maybe we could reuse Stephan's implementation for the `TaskExecutor` <--> `ResourceManager` registration here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75691680
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -230,153 +203,14 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat
     	 */
     	@RpcMethod
     	public void registerAtResourceManager(final String address) {
    -		currentRegistrationRun = UUID.randomUUID();
    -
    -		Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class);
    -
    -		handleResourceManagerRegistration(
    -			new JobMasterRegistration(getAddress()),
    -			1,
    -			resourceManagerFuture,
    -			currentRegistrationRun,
    -			initialRegistrationTimeout,
    -			maxRegistrationTimeout,
    -			registrationDuration.fromNow());
    +		//TODO:: register at the RM
    --- End diff --
    
    We usually don't use TODOs in our commits. Experience shows that they are usually forgotten.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2377: [Flink-4400][cluster management]Leadership Electio...

Posted by shixiaogang <gi...@git.apache.org>.
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2377#discussion_r75479936
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---
    @@ -135,12 +256,12 @@ public void registerAtResourceManager(final String address) {
     	 *
     	 * @param jobMasterRegistration Job master registration info which is sent to the resource
     	 *                              manager
    -	 * @param attemptNumber Registration attempt number
    +	 * @param attemptNumber         Registration attempt number
     	 * @param resourceManagerFuture Future of the resource manager gateway
    -	 * @param registrationRun UUID describing the current registration run
    -	 * @param timeout Timeout of the last registration attempt
    -	 * @param maxTimeout Maximum timeout between registration attempts
    -	 * @param deadline Deadline for the registration
    +	 * @param registrationRun       UUID describing the current registration run
    +	 * @param timeout               Timeout of the last registration attempt
    +	 * @param maxTimeout            Maximum timeout between registration attempts
    +	 * @param deadline              Deadline for the registration
     	 */
     	void handleResourceManagerRegistration(
    --- End diff --
    
    The differences made here is due to the automatic code reformatting. I focused on the implementation of leader election in this PR. Hence, I did not delete your code about registration here. 
    
    Since the code will not be used in the following PR, I will delete it :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---