You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by KurtYoung <gi...@git.apache.org> on 2016/09/29 04:00:11 UTC

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

GitHub user KurtYoung opened a pull request:

    https://github.com/apache/flink/pull/2565

    [FLINK-4406] [cluster management] Implement job master registration at resource manager

     

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/KurtYoung/flink flink-4406

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2565.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2565
    
----
commit 72fe93d0c0eab9ce0688184c3b2c60d19f9ad085
Author: Kurt Young <yk...@gmail.com>
Date:   2016-09-29T00:56:27Z

    [FLINK-4406] [cluster management] Implement job master registration at resource manager

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81141545
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -457,14 +505,94 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat
     		return Acknowledge.get();
     	}
     
    -	/**
    -	 * Triggers the registration of the job master at the resource manager.
    -	 *
    -	 * @param address Address of the resource manager
    -	 */
    -	@RpcMethod
    -	public void registerAtResourceManager(final String address) {
    -		//TODO:: register at the RM
    +	//----------------------------------------------------------------------------------------------\u2028
    +	// Internal methods\u2028
    +	// ----------------------------------------------------------------------------------------------\u2028\u2028
    +
    +	private void handleFatalError(final Throwable cause) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
    +				shutDown();
    +				jobCompletionActions.onFatalError(cause);
    +			}
    +		});
    +	}
    +
    +	private void notifyOfNewResourceManagerLeader(
    +		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
    +	{
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				if (resourceManagerConnection != null) {
    +					if (resourceManagerAddress != null) {
    +						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
    +							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
    +						{
    +							// both address and leader id are not changed, we can keep the old connection
    +							return;
    +						}
    +						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
    +							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
    +					}
    +					else {
    +						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
    +							resourceManagerConnection.getTargetAddress());
    +					}
    +				}
    +
    +				closeResourceManagerConnection();
    +
    +				if (resourceManagerAddress != null) {
    +					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
    +					resourceManagerConnection = new ResourceManagerConnection(
    +						log, jobGraph.getJobID(), leaderSessionID,
    +						resourceManagerAddress, resourceManagerLeaderId, executionContext);
    +					resourceManagerConnection.start();
    +				}
    +			}
    +		});
    +	}
    +
    +	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// only process if we haven't been connected in the meantime
    +				if (resourceManagerGateway == null) {
    --- End diff --
    
    Good catch, there indeed exists come conflicts, for the following events:
    
    a. register at rm1
    b. rm1 success notify
    c. leader changes to rm2, register at rm2
    d. rm2 success notify
    
    (a) and (c) will happen in order, but (b) and (d) are not. 
    
    so when the order are:  a c b d, we will result in keep a wrong gateway. Seems we should attach resource manager's leader id in response so we can do some verification. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81122520
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -457,14 +505,88 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat
     		return Acknowledge.get();
     	}
     
    -	/**
    -	 * Triggers the registration of the job master at the resource manager.
    -	 *
    -	 * @param address Address of the resource manager
    -	 */
    -	@RpcMethod
    -	public void registerAtResourceManager(final String address) {
    -		//TODO:: register at the RM
    +	//----------------------------------------------------------------------------------------------\u2028
    +	// Internal methods\u2028
    +	// ----------------------------------------------------------------------------------------------\u2028\u2028
    +
    +	private void handleFatalError(final Throwable cause) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
    +				shutDown();
    +				jobCompletionActions.onFatalError(cause);
    +			}
    +		});
    +	}
    +
    +	private void notifyOfNewResourceManagerLeader(
    +		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
    +	{
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				if (resourceManagerConnection != null) {
    +					if (resourceManagerAddress != null) {
    --- End diff --
    
    What if the resource manager address and the leader id haven't changed since the last registration?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2565: [FLINK-4406] [cluster management] Implement job master re...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2565
  
    Changes look good to me. +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung closed the pull request at:

    https://github.com/apache/flink/pull/2565


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2565: [FLINK-4406] [cluster management] Implement job master re...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on the issue:

    https://github.com/apache/flink/pull/2565
  
    Thanks for the review @tillrohrmann @mxm , both of you raised some very good points. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

Posted by KurtYoung <gi...@git.apache.org>.
Github user KurtYoung commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81139389
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -457,14 +505,94 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat
     		return Acknowledge.get();
     	}
     
    -	/**
    -	 * Triggers the registration of the job master at the resource manager.
    -	 *
    -	 * @param address Address of the resource manager
    -	 */
    -	@RpcMethod
    -	public void registerAtResourceManager(final String address) {
    -		//TODO:: register at the RM
    +	//----------------------------------------------------------------------------------------------\u2028
    +	// Internal methods\u2028
    +	// ----------------------------------------------------------------------------------------------\u2028\u2028
    +
    +	private void handleFatalError(final Throwable cause) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
    +				shutDown();
    +				jobCompletionActions.onFatalError(cause);
    +			}
    +		});
    +	}
    +
    +	private void notifyOfNewResourceManagerLeader(
    +		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
    +	{
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				if (resourceManagerConnection != null) {
    +					if (resourceManagerAddress != null) {
    +						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
    +							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
    +						{
    +							// both address and leader id are not changed, we can keep the old connection
    +							return;
    +						}
    +						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
    +							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
    +					}
    +					else {
    +						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
    +							resourceManagerConnection.getTargetAddress());
    +					}
    +				}
    +
    +				closeResourceManagerConnection();
    +
    +				if (resourceManagerAddress != null) {
    +					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
    +					resourceManagerConnection = new ResourceManagerConnection(
    +						log, jobGraph.getJobID(), leaderSessionID,
    +						resourceManagerAddress, resourceManagerLeaderId, executionContext);
    +					resourceManagerConnection.start();
    +				}
    +			}
    +		});
    +	}
    +
    +	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// only process if we haven't been connected in the meantime
    +				if (resourceManagerGateway == null) {
    +					// double check the connection is still effective
    +					if (resourceManagerConnection != null) {
    --- End diff --
    
    This will not happen because all changes to the connection is executed in main thread


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2565: [FLINK-4406] [cluster management] Implement job master re...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2565
  
    Thanks for your contribution @KurtYoung. I've merged the code. You can close the PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81135329
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -457,14 +505,94 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat
     		return Acknowledge.get();
     	}
     
    -	/**
    -	 * Triggers the registration of the job master at the resource manager.
    -	 *
    -	 * @param address Address of the resource manager
    -	 */
    -	@RpcMethod
    -	public void registerAtResourceManager(final String address) {
    -		//TODO:: register at the RM
    +	//----------------------------------------------------------------------------------------------\u2028
    +	// Internal methods\u2028
    +	// ----------------------------------------------------------------------------------------------\u2028\u2028
    +
    +	private void handleFatalError(final Throwable cause) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
    +				shutDown();
    +				jobCompletionActions.onFatalError(cause);
    +			}
    +		});
    +	}
    +
    +	private void notifyOfNewResourceManagerLeader(
    +		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
    +	{
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				if (resourceManagerConnection != null) {
    +					if (resourceManagerAddress != null) {
    +						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
    +							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
    +						{
    +							// both address and leader id are not changed, we can keep the old connection
    +							return;
    +						}
    +						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
    +							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
    +					}
    +					else {
    +						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
    +							resourceManagerConnection.getTargetAddress());
    +					}
    +				}
    +
    +				closeResourceManagerConnection();
    +
    +				if (resourceManagerAddress != null) {
    +					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
    +					resourceManagerConnection = new ResourceManagerConnection(
    +						log, jobGraph.getJobID(), leaderSessionID,
    +						resourceManagerAddress, resourceManagerLeaderId, executionContext);
    +					resourceManagerConnection.start();
    +				}
    +			}
    +		});
    +	}
    +
    +	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// only process if we haven't been connected in the meantime
    +				if (resourceManagerGateway == null) {
    --- End diff --
    
    What if we have been connected in the meantime by an old registration attempt which is not valid anymore?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2565: [FLINK-4406] [cluster management] Implement job ma...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2565#discussion_r81134995
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -457,14 +505,94 @@ public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionStat
     		return Acknowledge.get();
     	}
     
    -	/**
    -	 * Triggers the registration of the job master at the resource manager.
    -	 *
    -	 * @param address Address of the resource manager
    -	 */
    -	@RpcMethod
    -	public void registerAtResourceManager(final String address) {
    -		//TODO:: register at the RM
    +	//----------------------------------------------------------------------------------------------\u2028
    +	// Internal methods\u2028
    +	// ----------------------------------------------------------------------------------------------\u2028\u2028
    +
    +	private void handleFatalError(final Throwable cause) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				log.error("Fatal error occurred on JobManager, cause: {}", cause.getMessage(), cause);
    +				shutDown();
    +				jobCompletionActions.onFatalError(cause);
    +			}
    +		});
    +	}
    +
    +	private void notifyOfNewResourceManagerLeader(
    +		final String resourceManagerAddress, final UUID resourceManagerLeaderId)
    +	{
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				if (resourceManagerConnection != null) {
    +					if (resourceManagerAddress != null) {
    +						if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
    +							&& resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
    +						{
    +							// both address and leader id are not changed, we can keep the old connection
    +							return;
    +						}
    +						log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
    +							resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
    +					}
    +					else {
    +						log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
    +							resourceManagerConnection.getTargetAddress());
    +					}
    +				}
    +
    +				closeResourceManagerConnection();
    +
    +				if (resourceManagerAddress != null) {
    +					log.info("Attempting to register at ResourceManager {}", resourceManagerAddress);
    +					resourceManagerConnection = new ResourceManagerConnection(
    +						log, jobGraph.getJobID(), leaderSessionID,
    +						resourceManagerAddress, resourceManagerLeaderId, executionContext);
    +					resourceManagerConnection.start();
    +				}
    +			}
    +		});
    +	}
    +
    +	private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) {
    +		// IMPORTANT: executed by main thread to avoid concurrence
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				// only process if we haven't been connected in the meantime
    +				if (resourceManagerGateway == null) {
    +					// double check the connection is still effective
    +					if (resourceManagerConnection != null) {
    --- End diff --
    
    Couldn't this resource manager connection be replaced by a new one in the meantime?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---