You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mxm <gi...@git.apache.org> on 2015/06/22 16:31:28 UTC

[GitHub] flink pull request: [FLINK-2097] Implement job session management

GitHub user mxm opened a pull request:

    https://github.com/apache/flink/pull/858

    [FLINK-2097] Implement job session management

    This is a joint effort by @StephanEwen and me to introduce a session management in Flink. Session are used to keep a copy of the ExecutionGraph in the job manager for the session lifetime. It is important that the ExecutionGraph is not kept around longer because it consumes a lot of memory. Its intermediate results can also be freed. To integrate sessions properly into Flink, some refactoring was necessary. In particular these are:
    
    - JobId is created through the ExecutionEnvironment and passed through
    - Sessions can be termined by the ExecutionEnvironment or directly through the executor
    - Session are cancelled implicitly through "reapers" or shutdown hooks in the ExecutionEnvironment, otherwise they time out
    - LocalExecutor and RemoteExecutor manage sessions
    - The Client only deals with the communication with the job manager and is agnostic of session management
    
    With the session management, we will be able to properly support backtracking of produced intermediate results. This makes calls to count()/collect()/print() efficient and enables to write incremental/interactive jobs. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mxm/flink session-dev

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/858.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #858
    
----
commit 9852d392bfe69b056596acfba001ab0a574f0ac0
Author: Maximilian Michels <mx...@apache.org>
Date:   2015-05-13T15:06:47Z

    [FLINK-2097] [core] Implement job session management.
    
    Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it
    around for further operations to be attached to the graph. That is the basis if interactive sessions.
    
    This pull request implements a rudimentary session management. Together with the backtracking #640,
    this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually.
    
    ExecutionGraphs are kept as long as
    - no timeout occurred or
    - the session has not been explicitly ended

commit 65464ad19d39a29d41d071b2a4524b414e297147
Author: Stephan Ewen <se...@apache.org>
Date:   2015-05-29T12:35:33Z

    [FLINK-2097] [core] Improve session management.
    
     - The Client manages only connections to the JobManager, it is not job specific
     - Executors provide a more explicit life cycle and methods to start new sessions
     - Sessions are handled by the environments
     - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination
       when the environment runs out of scope

commit 6d89edd4a63fa3971c0246f46c7b8c98f3fc6c30
Author: Maximilian Michels <mx...@apache.org>
Date:   2015-06-18T14:38:09Z

    [FLINK-2097] [core] Finalize session management

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-140405615
  
    I've rebased again...If nobody objects, I will merge this soon. The new API-facing methods on `ExecutionEnvironment` will be disabled until we implement first applications of session management. I've added a separate commit that does that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/858#discussion_r38951273
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---
    @@ -53,41 +54,41 @@ class JobManagerFailsITCase(_system: ActorSystem)
       }
     
       "A TaskManager" should {
    -    "detect a lost connection to the JobManager and try to reconnect to it" in {
    -
    -      val num_slots = 13
    -      val cluster = startDeathwatchCluster(num_slots, 1)
    -
    -      val tm = cluster.getTaskManagers(0)
    -      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
    -
    -      // disable disconnect message to test death watch
    -      tm ! DisableDisconnect
    -
    -      try {
    -        within(TestingUtils.TESTING_DURATION) {
    -          jmGateway.tell(RequestNumberRegisteredTaskManager, self)
    -          expectMsg(1)
    -
    -          tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
    -
    -          jmGateway.tell(PoisonPill, self)
    -
    -          expectMsgClass(classOf[JobManagerTerminated])
    -
    -          cluster.restartLeadingJobManager()
    -
    -          cluster.waitForTaskManagersToBeRegistered()
    -
    -          cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
    -            .tell(RequestNumberRegisteredTaskManager, self)
    -
    -          expectMsg(1)
    -        }
    -      } finally {
    -        cluster.stop()
    -      }
    -    }
    +//    "detect a lost connection to the JobManager and try to reconnect to it" in {
    --- End diff --
    
    Re-enable the test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/858


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-138860661
  
    Thanks Max for the detailed description.
    
    On Wed, Sep 9, 2015 at 11:12 AM, Max <no...@github.com> wrote:
    
    > Of course! The following classes have been refactored in the course of
    > integrating them with the session management:
    >
    > *Client*
    >
    >    - Establish connection to JobManager on creation
    >    - Refactor run method into runBlocking and runDetached
    >    - Extract helper classes to generate the Plan
    >    - Make Optimizer and JobGraph generation methods static
    >    - Pass ClassLoader correctly (do not keep one per Client but rather
    >    let it be passed before submission)
    >
    > *CliFrontend*
    >
    >    - runBlocking and runDetached methods by analogy with the Client class
    >
    > *ExecutionEnvironment*, *LocalEnvironment*, *RemoteEnvironment*
    >
    >    - modified abstract class to support sessions (timeout and jobID
    >    generation)
    >    - handle session management via Reapers and ShutdownHooks
    >
    > *PlanExecutor*, *LocalExecutor*, *RemoteExecutor*
    >
    >    - modified interface
    >    - support session termination
    >    - set JobID on Plan
    >
    > *JobManager*
    >
    >    - keep ExecutionGraph as long as session has not expired
    >
    > Future issues:
    >
    >    - Support for sessions in streaming. Currently streaming jobs are
    >    agnostic of sessions.
    >    - Representation of sessions in the JobManager web frontend. How do we
    >    represent updates to the ExecutionGraph in sessions?
    >    - Build features on top of session management (e.g. intermediate
    >    results)
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/858#issuecomment-138848287>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-123755792
  
    It should just add more nodes to the ExecutionGraph. Existing ones should not be modified. For batch, I think the assumption is that it needs to be finished. For streaming, I could also picture attaching nodes at runtime but this has to be carefully implemented..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/858#discussion_r39017774
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---
    @@ -53,41 +54,41 @@ class JobManagerFailsITCase(_system: ActorSystem)
       }
     
       "A TaskManager" should {
    -    "detect a lost connection to the JobManager and try to reconnect to it" in {
    -
    -      val num_slots = 13
    -      val cluster = startDeathwatchCluster(num_slots, 1)
    -
    -      val tm = cluster.getTaskManagers(0)
    -      val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
    -
    -      // disable disconnect message to test death watch
    -      tm ! DisableDisconnect
    -
    -      try {
    -        within(TestingUtils.TESTING_DURATION) {
    -          jmGateway.tell(RequestNumberRegisteredTaskManager, self)
    -          expectMsg(1)
    -
    -          tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
    -
    -          jmGateway.tell(PoisonPill, self)
    -
    -          expectMsgClass(classOf[JobManagerTerminated])
    -
    -          cluster.restartLeadingJobManager()
    -
    -          cluster.waitForTaskManagersToBeRegistered()
    -
    -          cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
    -            .tell(RequestNumberRegisteredTaskManager, self)
    -
    -          expectMsg(1)
    -        }
    -      } finally {
    -        cluster.stop()
    -      }
    -    }
    +//    "detect a lost connection to the JobManager and try to reconnect to it" in {
    --- End diff --
    
    Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-138625172
  
    I've ported this pull request to the latest master. It was a lot more work than I anticipated because some classes had diverged significantly and merging them was a bit hard.
    
    Due to some refactoring, the changes have grown quite large again and I know that makes reviewing hard. Despite that, I wouldn't delay merging this pull request much further. We can disable the session management until it is integrated with the rest of the system (intermediate results) by throwing an exception on the interface methods. If we decide later, that we want to delay this feature, we could also remove the session code. In that case, it would still make sense to merge this pull request because it contains a lot of nice refactoring.
    
    With the session management in place, we can reuse already computed intermediate results with not too much effort. Actually, only some API changes are remaining to expose the session management to the user in production.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-138848287
  
    Of course! The following classes have been refactored in the course of integrating them with the session management:
    
    **Client**
    - Establish connection to JobManager on creation
    - Refactor run method into `runBlocking` and `runDetached`
    - Extract helper classes to generate the Plan
    - Make Optimizer and JobGraph generation methods `static`
    - Pass `ClassLoader` correctly (do not keep one per Client but rather let it be passed before submission)
    
    **CliFrontend**
    - `runBlocking` and `runDetached` methods by analogy with the Client class
    
    **ExecutionEnvironment**, **LocalEnvironment**, **RemoteEnvironment**
    - modified abstract class to support sessions (timeout and jobID generation)
    - handle session management via Reapers and ShutdownHooks
    
    **PlanExecutor**, **LocalExecutor**, **RemoteExecutor**
    - modified interface
    - support session termination
    - set JobID on Plan
    
    **JobManager**
    - keep ExecutionGraph as long as session has not expired
    
    Future issues:
    - Support for sessions in streaming. Currently streaming jobs are agnostic of sessions.
    - Representation of sessions in the JobManager web frontend. How do we represent updates to the ExecutionGraph in sessions?
    - Build features on top of session management (e.g. intermediate results)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-123754538
  
    I think right now, it pretty much behaves as if someone started a new job, with the "grown" execution graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-138825995
  
    Could you elaborate a little bit on what you refactored and which components would be important to review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2097] Implement job session management

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/858#issuecomment-123726241
  
    I just had a look at the JobManager in a different context and thought about the following, which might be relevant here: when submitting a new JobGraph, which is attached to an existing ExecutionGraph, some ExecutionGraph state is overwritten by the new JobGraph. With some you might run into (maybe) unexpected behaviour like resetting number of left execution retries or creating a new CheckpointCoordinator for the ExecutionGraph.
    
    What's the intended behaviour of attaching to an existing ExecutionGraph? Is there an implicit assumption that the existing ExecutionGraph needs to be finished already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---