You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wangzhijiang999 <gi...@git.apache.org> on 2016/09/02 10:07:44 UTC

[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

GitHub user wangzhijiang999 opened a pull request:

    https://github.com/apache/flink/pull/2461

    [FLINK-4505][Cluster Management] Implement TaskManagerFactory to brin\u2026

    Implement TaskExecutorFactory that should be an abstract class with the helper methods to bring up the TaskManager. The factory can be implemented by some classes to start a TaskManager in different modes (testing, standalone, yarn).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4505

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2461.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2461
    
----
commit 212222c4b408bb0138b41e49a439f6d07f9a0ca9
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Date:   2016-09-02T10:00:49Z

    [FLINK-4505][Cluster Management] Implement TaskManagerFactory to bring up TaskManager for different modes
    
    Summary: above
    
    Test Plan: NA
    
    Reviewers: kete.yangkt
    
    Subscribers: #blink
    
    Differential Revision: http://phabricator.taobao.net/D5606

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    I've merged your PR @wangzhijiang999. Thanks for your work :-) You can close this PR now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    @mxm ,  The {{StandaloneTaskExecutorFactory}} can be used for mini cluster or testing mode I think, and the {{YarnTaskExecutorFactory}} used for yarn mode. After you confirm the implementation, I can add some testings for the factory if needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    I think you should have at least another method `startComponents` which starts the different components. Everything else can be added later when we see that it would be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 closed the pull request at:

    https://github.com/apache/flink/pull/2461


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2461#discussion_r77329068
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java ---
    @@ -0,0 +1,198 @@
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.taskmanager.MemoryLogger;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import akka.actor.ActorSystem;
    +import akka.util.Timeout;
    +
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import com.typesafe.config.Config;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * An factory for creating {@link TaskExecutor} and starting it in yarn mode.
    + */
    +public class YarnTaskExecutorFactory extends TaskExecutorFactory {
    +
    +	public YarnTaskExecutorFactory(Configuration configuration, ResourceID resourceID) {
    +		super(configuration, resourceID);
    +	}
    +
    +	@Override
    +	public TaskExecutor createAndStartTaskExecutor() throws Exception {
    +		return selectNetworkInterfaceAndRunTaskManager(configuration, resourceID);
    +	}
    +
    +	/**
    +	 * Starts and runs the TaskManager.
    +	 * <p/>
    +	 * This method first tries to select the network interface to use for the TaskManager
    +	 * communication. The network interface is used both for the actor communication
    +	 * (coordination) as well as for the data exchange between task managers. Unless
    +	 * the hostname/interface is explicitly configured in the configuration, this
    +	 * method will try out various interfaces and methods to connect to the JobManager
    +	 * and select the one where the connection attempt is successful.
    +	 * <p/>
    +	 * After selecting the network interface, this method brings up an actor system
    +	 * for the TaskManager and its actors, starts the TaskManager's services
    +	 * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
    +	 *
    +	 * @param configuration    The configuration for the TaskManager.
    +	 * @param resourceID       The id of the resource which the task manager will run on.
    +	 */
    +	private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
    --- End diff --
    
    The parameters provided from different modes are different as I referred to previous implementation for **TaskManager**. For example, **YarnTaskManagerRunner** invokes the method "selectNetworkInterfaceAndRunTaskManager", and **ForkableFlinkMiniCluster** invokes the method "startTaskManagerComponentsAndActor" to start **TaskManager** before. So I retained the previous ways for different modes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    Hi @wangzhijiang999, maybe we should start simple without introducing a factory method, because there might actually be not many cases to distinguish. Maybe we could rename the `TaskManagerFactory` into `TaskManagerRunner` which has static methods to create the `TaskManagers` components and does the network selection. That way we keep the initialization and the actual `TaskManager` logic separated.
    
    For testing purposes I guess we don't need to setup any components because they are usually mocked or one is using testing components. Passing these components to the constructor of the `TaskManager` should not be a big deal.
    
    Does this make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    Thanks for the contribution @wangzhijiang999.
    
    I think the abstraction is not right. The `StandaloneFactory` and the `YarnFactory` should actually only differ in the class of the `TaskExecutor` they start. Furthermore, I think that the network interface selection should not be part of the factory. Instead I would pass the hostname and port to the `createTaskManager(hostname, port)` method which constructs the `TaskManager`. Maybe it makes even sense to pass the `RpcService` via the `createTaskManager(RpcService)` and defer the RpcService creation to the outside method running the select interface method. 
    
    The difference between the Standalone/Yarn and Testing factory is that the first two create the TM components whereas the `TestingFactory` is initialized with the `TaskManager` components.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    @tillrohrmann , I tried to understand your idea as follows:
    1. Provide specific TaskExecutorFactory class instead of abstract factory for both standalone/yarn mode.
    2. Network selection and RPC service creation methods should be pulled out from factory, maybe remove to some utility class?(for JM reuse)
    3. The constructor of TaskExecutorFactory supports many different parameters: such as 
    TaskExecutorFactory(Configuration, ResourceID)
    TaskExecutorFactory(RpcService, HighAvailabilityServices)
    TaskExecutorFactory(hostname, port)
    TaskExecutorFactory(Configuration, ResourceID, RpcService, HighAvailabilityServices,hostname, port), 
    The above three constructors for partial parameters can be transformed into the fourth final constructor , all the missing parameters can be generated by internal default value.
    4. The TaskExecutorFactory supports the method "createTaskManger" to bring up TaskManger for outside  world, and this method will construct the related components(TaskManagerConfiguration, NetworkManager, IOManager, MemoryManager)
    5. For testing mode, construct the TestingTaskExecutorFactory to pass all the components explicitly, including (ResourceID,MemoryManager,IOManager,NetworkEnvironment,numberOfSlots, RpcService,HighAvailabilityServices), the TaskManagerConfiguration should be passed from outside or generate implicitly?
    6. In addition, the localTaskManagerCommunication parameter is needed before to decide whether to create NettyConfig for standalone or yarn mode. Now I will remove this parameter to create ~~NettyConfig~~ always.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    @tillrohrmann , thank you for merging and help. If there are any following works to do related with TaskManager, you can assign to me and I am willing to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    Thanks for your contribution @wangzhijiang999. I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    I think we don't need the distinction between standalone and yarn in the TaskManager startup. They should actually be the same. The difference will be the `StandaloneRunner`, `YarnRunner` and `MesosRunner`.
    
    I think the factory should be responsible for creating the `TaskManager` components. I would pull out the `RpcService` creation and, thus, also the network interface selection, because this is code which can be re-used across different components (TM and JM, for example).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    @tillrohrmann , I created the `TaskExecutorRunner` for constructing the related components for starting `TaskExecutor`, and removed the factory class. Currently only the `ResourceID` and 'Configuration` parameters must be passed to the method `startComponents`, other parameters are optional. After you confirm the way , I will write some testings based on it. 
    The `MemoryLogger` is removed from `TaskExecutorRunner` temporarily. The `ActorGateway` should not be passed to the constructor of `MemoryLogger` directly I think. If you agree, I will re-structure the `MemoryLogger` in another jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    Yes, that also make sense. For testing purpose it is very clear, so we do not need do anything currently, all the components in `TaskManager` constructor can be mocked implicitly.
    
    For `TaskManagerRunner`, the purpose is to pull out the initialization of related components from `TaskManager` to make it logic clear. Just one issue to be confirmed, we should provide more static methods of different parameter units for outside world or just one static method such as 'selectNetworkInterfaceAndRunTaskManager(`Configuration` configuration,`ResourceID` resourceID) '? 
    I think providing more methods with different parameters may be reasonable, because some parameters such as 'hostname','port', `RpcService`, `HighAvailabilityServices` may want to be passed from outside.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2461: [FLINK-4505][Cluster Management] Implement TaskMan...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2461#discussion_r77327741
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/YarnTaskExecutorFactory.java ---
    @@ -0,0 +1,198 @@
    +package org.apache.flink.runtime.taskexecutor;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.taskmanager.MemoryLogger;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.util.NetUtils;
    +
    +import akka.actor.ActorSystem;
    +import akka.util.Timeout;
    +
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import com.typesafe.config.Config;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * An factory for creating {@link TaskExecutor} and starting it in yarn mode.
    + */
    +public class YarnTaskExecutorFactory extends TaskExecutorFactory {
    +
    +	public YarnTaskExecutorFactory(Configuration configuration, ResourceID resourceID) {
    +		super(configuration, resourceID);
    +	}
    +
    +	@Override
    +	public TaskExecutor createAndStartTaskExecutor() throws Exception {
    +		return selectNetworkInterfaceAndRunTaskManager(configuration, resourceID);
    +	}
    +
    +	/**
    +	 * Starts and runs the TaskManager.
    +	 * <p/>
    +	 * This method first tries to select the network interface to use for the TaskManager
    +	 * communication. The network interface is used both for the actor communication
    +	 * (coordination) as well as for the data exchange between task managers. Unless
    +	 * the hostname/interface is explicitly configured in the configuration, this
    +	 * method will try out various interfaces and methods to connect to the JobManager
    +	 * and select the one where the connection attempt is successful.
    +	 * <p/>
    +	 * After selecting the network interface, this method brings up an actor system
    +	 * for the TaskManager and its actors, starts the TaskManager's services
    +	 * (library cache, shuffle network stack, ...), and starts the TaskManager itself.
    +	 *
    +	 * @param configuration    The configuration for the TaskManager.
    +	 * @param resourceID       The id of the resource which the task manager will run on.
    +	 */
    +	private TaskExecutor selectNetworkInterfaceAndRunTaskManager(
    --- End diff --
    
    Why does only the Yarn factory selects the network interface and not the standalone implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

Posted by wangzhijiang999 <gi...@git.apache.org>.
Github user wangzhijiang999 commented on the issue:

    https://github.com/apache/flink/pull/2461
  
    @tillrohrmann , the current implementation is just follow the previous process for yarn and standalone modes. And I agree your opinion and actually it is not very clear for current ways. I think first we should re-define and confirm the parameters passed to the "createTaskManager" method for different factories, then I can fix the inner processes based on the input parameters. Would you suggest the specific parameters passed for different factories?  Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---