You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mxm <gi...@git.apache.org> on 2016/05/10 17:20:47 UTC

[GitHub] flink pull request: [FLINK-3667] refactor client communication

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/1978

    [FLINK-3667] refactor client communication

    This is mainly an effort to refactor the client side of the cluster instantiation and communication. This pull request moves around many things which were already in place but, hopefully, makes it easier and more explicit for new cluster clients/frameworks to interface.
    
    The main changes: 
    
    Client
    - `Client` becomes the abstract base class for client<->cluster communication
    - It enforces a stricter life cycle for cluster communication via abstract methods that clients need to implement
    - It shares resources, e.g. ActorSystem with the subclassed clients
    
    Yarn
    - All Yarn dependencies have been moved to `flink-yarn`
    - The Yarn client (`YarnClusterClient`) has been refactored as subclass of the `Client` class
    - Yarn specific configuration (`YarnClusterDescriptor`) has been implemented as subclass of the `ClusterDescriptor`
    
    CliFrontend
    - `CliFrontend` interfaces with custom command-line parsers via the `CustomCommandLine` interface
    - `CliFrontend` doesn't use any special logic for Yarn
    
    Other
    - A few bug fixes regarding the `ApplicationClient` communication
    
    ------
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [X] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink FLINK-3667

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1978.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1978
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63681528
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
    @@ -211,14 +191,41 @@ public void run() {
     		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
     
     		isConnected = true;
    +
    +		logAndSysout("Waiting until all TaskManagers have connected");
    +
    +		while(true) {
    +			GetClusterStatusResponse status = getClusterStatus();
    +			if (status != null) {
    +				if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
    +					logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
    +						+ clusterDescriptor.getTaskManagerCount() + ")");
    +				} else {
    +					logAndSysout("All TaskManagers are connected");
    +					break;
    +				}
    +			} else {
    +				logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
    +			}
    +
    +			try {
    +				Thread.sleep(500);
    +			}
    +			catch (InterruptedException e) {
    +				LOG.error("Interrupted while waiting for TaskManagers");
    +				System.err.println("Thread is interrupted");
    +				Thread.currentThread().interrupt();
    --- End diff --
    
    I guess I wrote this code. If I'm not mistaken, this means that the while(true) loop is not interruptable, right?
    I think we should change that and break out of the loop if it has been interrupted.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-219992759
  
    Thank you for working on this! 
    The flink-client is pretty complicated and I like this pull request as it is reducing the complexity.
    Maybe (depending on the implications) we can further simplify the code and introduce some nice abstractions to make the code more maintainable.
    I'll think about it a bit and get back to the PR once all the issues from the first pass have been addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-222285524
  
    This PR dovetails nicely with the Mesos work and I'll be sure to build on it.     Here's a few suggestions to align it even further.
    
    The problem of _managing_ a Flink cluster is mostly independent from _using_ a cluster to submit and manage jobs.    I would like to see the two concerns be cleanly separated.    In this PR, the `ClusterDescriptor` handles creating the cluster, then produces a `Client` with which to manage jobs and to handle shutdown.    I suggest that a new component - the `YarnDispatcher` - be introduced to handle all lifecycle operations for a cluster.   Make the `ClusterDescriptor` be an entity class that is given to the dispatcher.  
    
    A related issue is that its only possible to use the `YarnClusterClient` to interact with a newly-created YARN session, not a pre-existing one.   When submitting a job to an existing YARN session, seems the `StandaloneClusterClient` is used (by supplying a JM endpoint) - is that true?
    
    Eventually the CLI should provide a nice way to discover and use existing YARN sessions.
    
    The `detached` flags could use clarification.   In the `Client` context, the detached concept seems related to interactivity with the job (tailing the status messages, etc).   I don't think it should imply anything about the lifecycle of the cluster; leave that to the dispatcher.   The `stopAfterJob` method should move accordingly to the dispatcher.
    
    How this relates to Mesos is, the `MesosDispatcher` component will run in the Mesos cluster and be accessed remotely by the CLI.    The `ClusterDescriptor` will be passed via REST to it.   Everything will fit nicely.  :)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-219986951
  
    I tried the change on a cluster, but the YARN session is not deploying:
    
    
    ```
    2016-05-18 10:25:12,181 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider  - Failing over to rm75
    2016-05-18 10:25:12,300 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Trying to associate with JobManager leader akka://flink/user/jobmanager#49723024
    2016-05-18 10:25:12,307 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager#49723024] - leader session null
    2016-05-18 10:25:12,308 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 1
    2016-05-18 10:25:12,317 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting new TaskManager container with 1024 megabytes memory. Pending requests: 2
    2016-05-18 10:25:13,369 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_1463565924024_0001_01_000002 - Remaining pending container requests: 1
    2016-05-18 10:25:13,369 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ResourceID{resourceId='container_1463565924024_0001_01_000002'} on host cdh544-worker-5.c.astral-sorter-757.internal
    2016-05-18 10:25:13,372 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : cdh544-worker-5.c.astral-sorter-757.internal:8041
    2016-05-18 10:25:13,752 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Received new container: container_1463565924024_0001_01_000003 - Remaining pending container requests: 0
    2016-05-18 10:25:13,752 INFO  org.apache.flink.yarn.YarnFlinkResourceManager                - Launching TaskManager in container ResourceID{resourceId='container_1463565924024_0001_01_000003'} on host cdh544-worker-2.c.astral-sorter-757.internal
    2016-05-18 10:25:13,753 INFO  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  - Opening proxy : cdh544-worker-2.c.astral-sorter-757.internal:8041
    2016-05-18 10:25:17,738 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:17,740 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:18,101 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:18,101 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:18,621 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:18,621 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:18,977 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:18,978 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:19,120 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:19,121 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:19,997 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:19,998 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:21,140 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:21,141 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:22,016 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:22,016 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:25,161 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:25,161 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:26,037 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:26,037 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:27,831 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:27,831 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:28,131 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:28,132 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:28,351 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:28,351 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:28,651 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:28,651 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:28,737 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:28,737 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,007 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:29,007 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,151 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:29,152 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,256 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:29,257 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,370 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:29,371 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,527 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:29,528 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,671 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:29,672 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:29,672 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:29,672 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:30,026 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:30,027 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:30,276 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:30,277 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:30,547 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:30,548 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:30,548 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:30,549 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:30,691 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:30,691 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:31,170 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:31,171 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:31,391 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:31,392 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:31,567 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:31,567 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:31,692 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:31,693 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:31,693 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:31,694 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:32,047 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:32,047 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:32,298 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:32,298 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:32,569 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:32,570 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:32,570 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:32,571 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:32,711 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:32,712 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:32,712 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:32,713 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:33,181 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:33,181 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:33,587 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:33,587 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:33,588 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:33,589 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:34,056 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:34,057 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:34,731 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:34,731 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:35,191 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:35,192 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:35,410 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:35,411 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:35,607 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:35,607 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:35,711 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:35,711 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:35,712 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:35,712 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:36,068 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:36,069 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:36,316 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:36,317 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:36,586 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:36,587 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:36,587 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:36,587 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:36,731 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:36,731 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:36,733 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:36,733 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:37,607 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:37,608 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:37,608 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'}
    2016-05-18 10:25:37,609 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000002'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:37,860 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:37,861 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:38,161 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:38,161 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:38,380 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    2016-05-18 10:25:38,381 WARN  org.apache.flink.yarn.YarnJobManager                          - TaskManager's resource id ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'} is not registered with ResourceManager. Refusing registration.
    2016-05-18 10:25:38,382 WARN  org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager resource registration failed for ResourceID{resourceId='container_e58_1463565924024_0001_01_000003'}
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-219993662
  
    Thanks for the review. I'll try to incorporate your suggestions. I'll ping you again for a second look :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63679526
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
    @@ -100,6 +102,9 @@
     	 */
     	private JobID lastJobID;
     
    +	/** Switch for blocking/detached job submission of the client */
    +	private boolean detachedJobSubmission = false;
    --- End diff --
    
    There is a flag to toggle detached submission and there are detached and blocking methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63677456
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -20,8 +20,6 @@
     
    --- End diff --
    
    The CliFrontend still contains YARN specific code like the YARN properties file parsing.
    
    I wonder if there's a way to completely remove everything mentioning YARN from the CLI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by EronWright <gi...@git.apache.org>.
Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r64975479
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -393,7 +373,7 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {
     			}
     		}
     
    -		// ------------------ Check if the YARN Cluster has the requested resources --------------
    +		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
    --- End diff --
    
    should remain "Cluster"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63683193
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
    @@ -211,14 +191,41 @@ public void run() {
     		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
     
     		isConnected = true;
    +
    +		logAndSysout("Waiting until all TaskManagers have connected");
    +
    +		while(true) {
    +			GetClusterStatusResponse status = getClusterStatus();
    +			if (status != null) {
    +				if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
    +					logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
    +						+ clusterDescriptor.getTaskManagerCount() + ")");
    +				} else {
    +					logAndSysout("All TaskManagers are connected");
    +					break;
    +				}
    +			} else {
    +				logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
    +			}
    +
    +			try {
    +				Thread.sleep(500);
    +			}
    +			catch (InterruptedException e) {
    +				LOG.error("Interrupted while waiting for TaskManagers");
    +				System.err.println("Thread is interrupted");
    +				Thread.currentThread().interrupt();
    --- End diff --
    
    This is the old code that was used for deployment. Though at a different place. Would be good to make it interruptible and cleanup everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r65348898
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
    @@ -100,6 +102,9 @@
     	 */
     	private JobID lastJobID;
     
    +	/** Switch for blocking/detached job submission of the client */
    +	private boolean detachedJobSubmission = false;
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r65348868
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -20,8 +20,6 @@
     
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1978: [FLINK-3667] refactor client communication

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1978


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63680659
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
    @@ -100,6 +102,9 @@
     	 */
     	private JobID lastJobID;
     
    +	/** Switch for blocking/detached job submission of the client */
    +	private boolean detachedJobSubmission = false;
    --- End diff --
    
    Yes, this flag is actually just an indicator that can be set by the cluster client. Users may still submit jobs blocking or detached. I know that's not ideal, I'll try to see if we can get rid of this. I think we could get rid of the detached/blocking methods and make this the only point where the user controls the job submission mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-222480544
  
    Thanks for your comments @EronWright. This the the first refactoring of the client code of both cluster related and job submission related matters. It is a very good suggestion to separate the two. I would like to build upon this PR in a follow-up.
    
    I'll push a revised version of this PR later which clarifies the `detached` flag to make it solely indicate detached job submission. You're right, the dispatcher can have its own logic to decide whether the cluster should be kept alive or shutdown after job completion. The update will also completely get rid of Yarn dependencies in the CliFrontend (e.g. Yarn properties file).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63680057
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
    @@ -65,31 +73,25 @@
     /**
      * Encapsulates the functionality necessary to submit a program to a remote cluster.
      */
    -public class Client {
    +public abstract class Client {
    --- End diff --
    
    Whoever introduced the savepoint method I think did it out of API compatibility reasons. 
    
    I was wondering the same for the many run methods (actually tried to remove some). We need to duplicate them because runBlocking returns a JobExuectionResult and runDetached returns a JobSubmission result. We could get rid of the duplication if we did some naughty casting depending on the result returned.
    
    Optimized plans are only created for batch. The problem is that there are really that many ways to submit Flink programs. I think we could tackle this in a follow-up PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r65349622
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
    @@ -211,14 +191,41 @@ public void run() {
     		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
     
     		isConnected = true;
    +
    +		logAndSysout("Waiting until all TaskManagers have connected");
    +
    +		while(true) {
    +			GetClusterStatusResponse status = getClusterStatus();
    +			if (status != null) {
    +				if (status.numRegisteredTaskManagers() < clusterDescriptor.getTaskManagerCount()) {
    +					logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/"
    +						+ clusterDescriptor.getTaskManagerCount() + ")");
    +				} else {
    +					logAndSysout("All TaskManagers are connected");
    +					break;
    +				}
    +			} else {
    +				logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
    +			}
    +
    +			try {
    +				Thread.sleep(500);
    +			}
    +			catch (InterruptedException e) {
    +				LOG.error("Interrupted while waiting for TaskManagers");
    +				System.err.println("Thread is interrupted");
    +				Thread.currentThread().interrupt();
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-219987643
  
    Forgetting an argument leads to a NPE
    ```
    robert@cdh544-master:~/flink/build-target$ ./bin/flink run  -m yarn-cluster ./examples/batch/WordCount.jar 
    YARN cluster mode detected. Switching Log4j output to console
    2016-05-18 10:30:12,762 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Missing required argument yn
    2016-05-18 10:30:12,762 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Missing required argument yn
    Usage:
       Required
         -yn,--yarncontainer <arg>   Number of YARN container to allocate (=Number of Task Managers)
       Optional
         -yD <arg>                            Dynamic properties
         -yd,--yarndetached                   Start detached
         -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in MB]
         -ynm,--yarnname <arg>                Set a custom name for the application on YARN
         -yq,--yarnquery                      Display available YARN resources (memory, cores)
         -yqu,--yarnqueue <arg>               Specify YARN queue.
         -ys,--yarnslots <arg>                Number of slots per TaskManager
         -yst,--yarnstreaming                 Start Flink in streaming mode
         -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in MB]
    
    ------------------------------------------------------------
     The program finished with the following exception:
    
    java.lang.NullPointerException
    	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClient(FlinkYarnSessionCli.java:370)
    	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createClient(FlinkYarnSessionCli.java:55)
    	at org.apache.flink.client.CliFrontend.getClient(CliFrontend.java:948)
    	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:302)
    	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1067)
    	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1118)
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-218712827
  
    One test run failed but it's unrelated to the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63680404
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -830,53 +797,30 @@ else if (result instanceof DisposeSavepointFailure) {
     	//  Interaction with programs and JobManager
     	// --------------------------------------------------------------------------------------------
     
    -	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
    -		LOG.info("Starting execution of program");
    +	protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
    +		logAndSysout("Starting execution of program");
     
     		JobSubmissionResult result;
     		try {
    -			result = client.runDetached(program, parallelism);
    --- End diff --
    
    It seems that the runDetached is only used in the `DetachedEnvironment`. Maybe we can use the setter-variant there as well to get rid of the method completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1978
  
    @rmetzger I've updated the pull request and would like to get your opinion on it again. I've addressed all your comments. Specifically, I've removed all Yarn code from the CliFrontend. I've clarified the detached flag and moved the stopAfterJob to the cluster implementation. The exceptions you were getting upon cluster startup have been resolved in #2013.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-219989249
  
    > I tried the change on a cluster, but the YARN session is not deploying:
    
    I also tried out the changes on a (local) yarn cluster and everything worked. I have to look into what exactly causes the registration issue.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63678293
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
    @@ -65,31 +73,25 @@
     /**
      * Encapsulates the functionality necessary to submit a program to a remote cluster.
      */
    -public class Client {
    +public abstract class Client {
    --- End diff --
    
    I wonder if we can rid of some of the overloaded methods here.
    
    For example
    ```
    public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException
    public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException
    ```
    
    seem both unused (on first sight, not sure).
    
    ![image](https://cloud.githubusercontent.com/assets/89049/15355314/301e8e9a-1cf2-11e6-84ef-592609320f9f.png)
    
    
    Ideally we have one runBlocking, one runDetached and one plan optimization. I'm not sure if its really necessary to have so many variants ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1978#issuecomment-219987024
  
    After the shutdown, I got the following exception
    ```
    2016-05-18 10:26:07,076 INFO  org.apache.flink.yarn.ApplicationClient                       - Disconnect from JobManager Actor[akka.tcp://flink@10.240.221.7:41286/user/jobmanager#49723024].
    2016-05-18 10:26:07,083 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
    2016-05-18 10:26:07,083 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
    2016-05-18 10:26:07,112 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
    2016-05-18 10:26:07,128 INFO  org.apache.flink.yarn.YarnClusterClient                       - Deleting files in hdfs://nameservice1/user/robert/.flink/application_1463565924024_0001
    2016-05-18 10:26:07,130 INFO  org.apache.flink.yarn.YarnClusterClient                       - Application application_1463565924024_0001 finished with state FINISHED and final state SUCCEEDED at 1463567167072
    Error while deploying YARN cluster: The YarnClusterClient has already been stopped
    java.lang.RuntimeException: The YarnClusterClient has already been stopped
    	at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:304)
    	at org.apache.flink.yarn.YarnClusterClient.connectToCluster(YarnClusterClient.java:198)
    	at org.apache.flink.yarn.YarnClusterClient.<init>(YarnClusterClient.java:132)
    	at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:676)
    	at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:312)
    	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:435)
    	at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:345)
    2016-05-18 10:26:07,445 INFO  org.apache.flink.yarn.YarnClusterClient                       - YARN Client is shutting down
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63682705
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
    @@ -65,31 +73,25 @@
     /**
      * Encapsulates the functionality necessary to submit a program to a remote cluster.
      */
    -public class Client {
    +public abstract class Client {
    --- End diff --
    
    Okay, I see. Maybe we can reduce the number a bit by reducing the savepoint-compatibility methods.
    If these methods are only called in one or two locations, it should be fine to just pass a null for the savepoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63681079
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -830,53 +797,30 @@ else if (result instanceof DisposeSavepointFailure) {
     	//  Interaction with programs and JobManager
     	// --------------------------------------------------------------------------------------------
     
    -	protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
    -		LOG.info("Starting execution of program");
    +	protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
    +		logAndSysout("Starting execution of program");
     
     		JobSubmissionResult result;
     		try {
    -			result = client.runDetached(program, parallelism);
    --- End diff --
    
    I'll look into it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3667] refactor client communication

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1978#discussion_r63677960
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -20,8 +20,6 @@
     
    --- End diff --
    
    You're right, these should also go. I'll remove them in another commit here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---