You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by beyond1920 <gi...@git.apache.org> on 2016/09/23 07:33:26 UTC

[GitHub] flink pull request #2540: [FLINK-4606] [Client] Integrate the new ResourceMa...

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/2540

    [FLINK-4606] [Client] Integrate the new ResourceManager with the existed FlinkResourceManager

    This pr aims  to integrate the new ResourceManager with the existed FlinkResourceManager, the main difference including:
    1.  Move the useful rpc communication in existed FlinkResourceManager to new ResourceManager, e.g : register infoMessageListener, unregister infoMessageListener, shutDownCluster
    2. Make ResourceManager to be an abstract class, extract framework specific behavior 
    3. Implement standalone resourceManager based on the new base ResourceManager class.
    4. Modify testcases which are effected by abstract resourceManager class.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4606

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2540.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2540
    
----
commit d0e0cea949a8bd2e94c333b2204d90f6ed9e740d
Author: beyond1920 <be...@126.com>
Date:   2016-09-09T01:11:24Z

    yarn slot manager

commit 78a3b44040b73d288f67a7b3491ab6abaf673bdb
Author: beyond1920 <be...@126.com>
Date:   2016-09-10T03:29:40Z

    integrate with existing FlinkResourceManager

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 closed the pull request at:

    https://github.com/apache/flink/pull/2540


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2540: [FLINK-4606] [cluster management] Integrate the new Resou...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2540
  
    This has been merged. Thank you. Could you close the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80612002
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -66,15 +67,16 @@
      *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
      * </ul>
      */
    -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
    +public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
    --- End diff --
    
    @mxm , I adopt `ResourceManager<WorkerType extends TaskExecutorRegistration>
            extends RpcEndpoint<ResourceManagerGateway> ` at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, `public class StandaloneResourceManager extends ResourceManager<TaskExecutorRegistration>`, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to `ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80642652
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -66,15 +67,16 @@
      *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
      * </ul>
      */
    -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
    +public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
    --- End diff --
    
    Yes, I see. I'll modify `RpcEndpoint` and `RpcCompletnessTest` for this to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80473835
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -66,15 +67,16 @@
      *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
      * </ul>
      */
    -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
    +public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
    --- End diff --
    
    I believe this should be 
    
    ```java
     ResourceManager<WorkerType extends TaskExecutorRegistration>
    		extends RpcEndpoint<ResourceManagerGateway>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2540: [FLINK-4606] [cluster management] Integrate the new Resou...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2540
  
    @mxm , thanks for your review, I modified the pr based on your advices:
    1. fIx checkstyle error, `AkkaRpcActorTest` testcase and `RpcCompletenessTest` testcase. Sorry for those mistakes, I would take care of it next time.
    2. About resourceManager, I adopt ResourceManager<WorkerType extends TaskExecutorRegistration> extends RpcEndpoint<ResourceManagerGateway> at first time, but it would fail because of an Exception when I wanna to start a subClass of this ResourceManager. For example, public class StandaloneResourceManager extends ResourceManager<TaskExecutorRegistration>, when I start this ResourceManager, it would call AkkaRpcService.#startServer, an exception would be thrown here because selfGatewayType was mistake for TaskExecutorRegistration class. So I change it to ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80836325
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
     		shutDown();
     	}
     
    +	/**
    +	 * Registers an infoMessage listener
    +	 *
    +	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
    +	 */
    +	@RpcMethod
    +	public void registerInfoMessageListener(final String infoMessageListenerAddress) {
    +		if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
    +			log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
    +		} else {
    +			Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);
    +
    +			infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
    +				@Override
    +				public void accept(InfoMessageListenerRpcGateway gateway) {
    +					log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
    +					infoMessageListeners.put(infoMessageListenerAddress, gateway);
    +				}
    +			}, getMainThreadExecutor());
    +
    +			infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
    +				@Override
    +				public Void apply(Throwable failure) {
    +					log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
    +					return null;
    +				}
    +			}, getMainThreadExecutor());
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters an infoMessage listener
    +	 *
    +	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
    +	 *
    +	 */
    +	@RpcMethod
    +	public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
    +		infoMessageListeners.remove(infoMessageListenerAddress);
    +	}
    +
    +	/**
    +	 * Shutdowns cluster
    +	 *
    +	 * @param finalStatus
    +	 * @param optionalDiagnostics
    +	 */
    +	@RpcMethod
    +	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
    +		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
    +		shutDownApplication(finalStatus, optionalDiagnostics);
    +	}
    +
    +	/**
    +	 * This method should be called by the framework once it detects that a currently registered task executor has failed.
    +	 *
    +	 * @param resourceID Id of the worker that has failed.
    +	 * @param message An informational message that explains why the worker failed.
    +	 */
    +	public void notifyWorkerFailed(final ResourceID resourceID, String message) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				WorkerType worker = taskExecutorGateways.remove(resourceID);
    +				if (worker != null) {
    +					// TODO :: suggest failed task executor to stop itself
    +					slotManager.notifyTaskManagerFailure(resourceID);
    +				}
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Gets the number of currently started TaskManagers.
    +	 *
    +	 * @return The number of currently started TaskManagers.
    +	 */
    +	public int getNumberOfStartedTaskManagers() {
    +		return taskExecutorGateways.size();
    +	}
    +
    +	/**
    +	 * Notifies the resource manager of a fatal error.
    +	 *
    +	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
    +	 * such a way that a high-availability setting would restart this or fail over
    +	 * to another master.
    +	 */
    +	public void onFatalError(final String message, final Throwable error) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				fatalError(message, error);
    +			}
    +		});
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Framework specific behavior
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Initializes the framework specific components.
    +	 *
    +	 * @throws Exception Exceptions during initialization cause the resource manager to fail.
    +	 */
    +	protected abstract void initialize() throws Exception;
    +
    +	/**
    +	 * Callback when a task executor register.
    +	 *
    +	 * @param resourceID The worker resource id
    +	 * @param taskExecutorGateway the task executor gateway
    +	 */
    +	protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
    --- End diff --
    
    @mxm, I omit these method before because of following reasons:
    
    1. Maybe we need   `requestNewWorkers` method. But numWorkers parameter is not enough to allocate a certain number of new workers, the expected ResourceProfile of each worker also needed to pass in.
    
    2. Maybe we need `releaseStartedWorker` method. I omit the method because it is used to release started taskExecutors when ResourceManager receives RemoveResource request, but I could not find any places where sends this request. So I omitted this method before. 
    
    3. We don't need `reacceptRegisteredWorkers` method. Because the method was used to consolidate the taskExecutor view between resourceManager and jobManager when resourceManager reconnects to jobManager after resourceManager restart. But in the new cluster management mode, JobManager doesn't kept the view of live taskExecutors. ResourceManager is responsible for receiving taskExecutors' registration and maintain the taskExecutor view.  So we don't need this method.
    
    4. We don't need `releasePendingWorker` method. Because the method was only used to release pending requests when resourceManager consolidates the taskExecutor view with jobManager after resourceManager restart.  As we said before, this process is not needed in new cluster management mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80478098
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -66,15 +67,16 @@
      *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li>
      * </ul>
      */
    -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender {
    +public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender {
    --- End diff --
    
    The `RpcCompletnessTest` might have to be adapted for this to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2540: [FLINK-4606] [cluster management] Integrate the ne...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80663028
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---
    @@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
     		shutDown();
     	}
     
    +	/**
    +	 * Registers an infoMessage listener
    +	 *
    +	 * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager
    +	 */
    +	@RpcMethod
    +	public void registerInfoMessageListener(final String infoMessageListenerAddress) {
    +		if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
    +			log.warn("Receive a duplicate registration from info message listener on ({})", infoMessageListenerAddress);
    +		} else {
    +			Future<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService().connect(infoMessageListenerAddress, InfoMessageListenerRpcGateway.class);
    +
    +			infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new AcceptFunction<InfoMessageListenerRpcGateway>() {
    +				@Override
    +				public void accept(InfoMessageListenerRpcGateway gateway) {
    +					log.info("Receive a registration from info message listener on ({})", infoMessageListenerAddress);
    +					infoMessageListeners.put(infoMessageListenerAddress, gateway);
    +				}
    +			}, getMainThreadExecutor());
    +
    +			infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
    +				@Override
    +				public Void apply(Throwable failure) {
    +					log.warn("Receive a registration from unreachable info message listener on ({})", infoMessageListenerAddress);
    +					return null;
    +				}
    +			}, getMainThreadExecutor());
    +		}
    +	}
    +
    +	/**
    +	 * Unregisters an infoMessage listener
    +	 *
    +	 * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager
    +	 *
    +	 */
    +	@RpcMethod
    +	public void unRegisterInfoMessageListener(final String infoMessageListenerAddress) {
    +		infoMessageListeners.remove(infoMessageListenerAddress);
    +	}
    +
    +	/**
    +	 * Shutdowns cluster
    +	 *
    +	 * @param finalStatus
    +	 * @param optionalDiagnostics
    +	 */
    +	@RpcMethod
    +	public void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics) {
    +		log.info("shut down cluster because application is in {}, diagnostics {}", finalStatus, optionalDiagnostics);
    +		shutDownApplication(finalStatus, optionalDiagnostics);
    +	}
    +
    +	/**
    +	 * This method should be called by the framework once it detects that a currently registered task executor has failed.
    +	 *
    +	 * @param resourceID Id of the worker that has failed.
    +	 * @param message An informational message that explains why the worker failed.
    +	 */
    +	public void notifyWorkerFailed(final ResourceID resourceID, String message) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				WorkerType worker = taskExecutorGateways.remove(resourceID);
    +				if (worker != null) {
    +					// TODO :: suggest failed task executor to stop itself
    +					slotManager.notifyTaskManagerFailure(resourceID);
    +				}
    +			}
    +		});
    +	}
    +
    +	/**
    +	 * Gets the number of currently started TaskManagers.
    +	 *
    +	 * @return The number of currently started TaskManagers.
    +	 */
    +	public int getNumberOfStartedTaskManagers() {
    +		return taskExecutorGateways.size();
    +	}
    +
    +	/**
    +	 * Notifies the resource manager of a fatal error.
    +	 *
    +	 * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in
    +	 * such a way that a high-availability setting would restart this or fail over
    +	 * to another master.
    +	 */
    +	public void onFatalError(final String message, final Throwable error) {
    +		runAsync(new Runnable() {
    +			@Override
    +			public void run() {
    +				fatalError(message, error);
    +			}
    +		});
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  Framework specific behavior
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Initializes the framework specific components.
    +	 *
    +	 * @throws Exception Exceptions during initialization cause the resource manager to fail.
    +	 */
    +	protected abstract void initialize() throws Exception;
    +
    +	/**
    +	 * Callback when a task executor register.
    +	 *
    +	 * @param resourceID The worker resource id
    +	 * @param taskExecutorGateway the task executor gateway
    +	 */
    +	protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway);
    --- End diff --
    
    This is missing all the other abstract methods of the old ResourceManager. We will need `requestNewWorkers`, `releasePendingWorker`, `releaseStartedWorker`, and `reacceptRegisteredWorkers`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2540: [FLINK-4606] [cluster management] Integrate the new Resou...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2540
  
    Thank you for your changes. I'm trying to incorporate them in `flip-6` now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---