You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sachingoel0101 <gi...@git.apache.org> on 2015/08/10 12:59:00 UTC

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

GitHub user sachingoel0101 opened a pull request:

    https://github.com/apache/flink/pull/1003

    Parameter Server: Distributed Key-Value store, using Akka messaging

    This PR adds a basic parameter server implementation built using Akka messaging.
    *Interface:*
    Access: Inside `RuntimeContext`
    Operations:
    1. `registerBatch(Key, Value)` and `registerAsync(Key, Value)` where key and value signify the Key-Value pair. 
    2. `updateParameter(key, value)`
    3. `fetchParameter(key)`
    
    `Batch` strategy implies that all updates will be done after all registered clients have sent their updates.
    `Async` strategy implies any updates will be immediately applied.
    [I intend to support the Stale synchronous strategy too. But I want to get feedback on the overall design of the work so far.]
    
    Messaging and Actor systems:
    1. A server is started within the actor system of every task manager.
    2. All servers register with the Job Manager.
    3. Job Manager determines on which server a particular key-value pair will be stored and lets every registered server know as part of a regular heartbeat message.
    4. `RuntimeContext` employs a `Patterns.ask` and `Await` with infinite timeout to query the server on the same task manager.
    5. Servers forward their client's requests to the appropriate servers based on the key value.
    6. After performing the desired operations, results are sent back to the originating server which forwards them back to the client.
    
    * There is a scope for an implementation of fault-tolerance in this by maintaining two servers for every key-value pair. 
    * In case a task has to restart, they can start from a different iteration number(by accessing a clock value), not having to send all earlier updates, because all the updates are applied to the Value guaranteedly.
    * There are two examples demonstrating the usage of Batch and Async strategies in Java examples package.
    * The travis build doesn't pass yet since there are some new messages from `ParameterServer` which need to be handled in the unit tests.
    
    I would love feedback about the overall design. 
    This is mostly inspired from a Machine learning Parameter server perspective, and doesn't touch the actual internals of Iterative tasks. Instead, all operations on the Parameter Server are blocking, and have an infinite timeout. However, there is an intrinsic failure involved with a maximum number of re-tries, in case the operations fail to succeed, inside the Server itself.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sachingoel0101/flink flink_server

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1003.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1003
    
----
commit 539319a30e8d2d841bf1deb1b78de4c4f404dfb1
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-05T04:04:38Z

    Reorganized the server framework to runtime itself. Added asynchronous
    blocking calls to the server.

commit 06daf1630afba005401826a68a646859383f4f9c
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-09T00:29:30Z

    Integrating into the Flink Runtime

commit 22933416e57cd63331a93134f56c44bea0395b24
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-10T01:25:54Z

    Added a server for each task manager

commit f3f524b9ecfbeba876c8a5946ac3b491df1b0b6b
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-10T02:32:32Z

    Added JobManager as a manager of servers

commit 988309ac4bbfb928b32b08fcecfab2e03d3a3feb
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-10T04:15:08Z

    Added a parameter store

commit 51c31078482e7c77de8beb90030094ea81a95384
Author: Sachin Goel <sa...@gmail.com>
Date:   2015-08-10T10:32:10Z

    Added examples to demonstrate the usage

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-131477326
  
    A stand-alone parameter server service will require setting up and tearing down of the client every time the user, say, opens and closes a Rich function while using it. Further, it means we have to add another dependency when the same could be accomplished using akka.
    In this implementation, the parameter *server* is at every task manager [effectively it acts as a client to serve all running tasks at one node. In fact, in this sense, there are no servers, just clients at every worker, which are managed by the Job Manager]. This in itself means lesser data transfer over the network, since every *server* will usually be owner of a key and can serve its *clients* faster instead of every request going over the network.
    Further, it is completely distributed, and every task manager maintains its own *server* and sets it up or tears it down along-with itself.
    As far as including it in the core itself is concerned, there isn't much of it. There are the 3-4 odd functions directly added in the Runtime context, which effectively serve as an interface.
    @tillrohrmann, could you weigh in here if this is the intended use of a PS in ML algorithms. I can easily see this working with, for example, the regression algorithm.
    The reasons I included it into the runtime is that, there will be no chances of failure now. If the TaskManager is alive, the Parameter Server at that client will be alive. Further, the Job Manager manages the servers and determines where each key will go [which will be crucial to recovery], something which can be very hard to determine in a completely de-centralized manner (I couldn't think of a full-proof way). This ensures that the server is running only on the workers where it's needed, and if it is needed. Keeping the Job Manager in the loop also ensures that recovery is easy. If a Task Manager fails, the Job Manager knows which server failed by matching the `InstanceID`s and can kick off the recovery process from the duplicate server. [This is not implemented yet.]
    A stand-alone PS will add another master-node system in parallel to the JobManager-TaskManager system, which can be efficiently used for this purpose. Of course, this doesn't matter if we use an external key-value store.
    I will have a look at #967 and see how the two can be integrated.
    
    I had a look at an open implementation done for Spark. https://github.com/apache/spark/compare/branch-1.3...chouqin:ps-on-spark-1.3
    This adds a separate context and a function on RDD to access the PS and does require running a service inside the core environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-145854816
  
    @sachingoel0101 Sounds good! Can you open a new PR once you are done with the Parameter Server interface and close this PR? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 closed the pull request at:

    https://github.com/apache/flink/pull/1003


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-131403660
  
    This is again baked into the Flink runtime. Is there a way to keep this separate?
    
    I am still a bit puzzled if this is the pattern with which people use parameter servers: embedded in the workers that operate on the training data. My impression so far was that the parameter servers are a separate component, usually running on different machines.
    
    Also, could you comment on how this could work with the changes proposes in #967


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-139742195
  
    Having interfaces for a Parameter Server service in Flink is a very good idea, IMO. This interface can be implemented for different backends, such as Ignite or an own lightweight implementation.
    
    However, I doubt that it really necessary to bake the Parameter Server master into the JobManager. Can't this be a completely stand-alone service to which Flink programs write to and read from via the provided interfaces?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-142190932
  
    Yes. I agree. I'm currently working on finalizing the interface. Just waiting for another PR to get in.
    
    And involving the Job Manager is not strictly necessary. I can create another standalone actor to manage the servers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-138419004
  
    @StephanEwen , I had a look at #967 and I quite agree with the interface designed by @nltran. I believe I can separate out my parameter server implementation from the core runtime itself. 
    The only component involved from core will be the Job Manager, which will act as master for all Parameter Servers started.
    And as you suggested, we can give the user a choice to choose either Ignite based implementation or the native one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

Posted by sachingoel0101 <gi...@git.apache.org>.
Github user sachingoel0101 commented on the pull request:

    https://github.com/apache/flink/pull/1003#issuecomment-146178033
  
    Sure. However, any further work will depend on finalizing the interface in #967 which has been idle for quite some time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---