You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Qiping Li (JIRA)" <ji...@apache.org> on 2015/04/16 04:47:59 UTC

[jira] [Comment Edited] (SPARK-6932) A Prototype of Parameter Server

    [ https://issues.apache.org/jira/browse/SPARK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14497483#comment-14497483 ] 

Qiping Li edited comment on SPARK-6932 at 4/16/15 2:47 AM:
-----------------------------------------------------------

[~srowen], thanks for your comments. 

{quote}
Yes I'm asking how these two relate. Is it a continuation? then one of these should be closed. Is it an unrelated implementation?
{quote}

[SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] is just a brainstorm issue and this is a small step to resolve it. We want to a separate issue to discuss our implementation, so we can improve our code and hopefully merge our implementation into Spark to help others.This is a continuation of [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] but others may have other ideas and implementations so I don't know if [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] should be closed.

{quote}
I think you want to coordinate with  [~rezazadeh] 
{quote}

I have asked [~rezazadeh] in [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] and he wanted to use IndexRDD as distributed array to store parameters. As I described in the above description, I don't think IndexRDD is a good option. 

{quote}
It's not clear why this needs a change to core; for example you certainly don't add a method to all RDDs that is specific to this parameter server. It sounds like an external process, so could start life externally and prove it's a good solution.
{quote}



Currently parameter server and client run on the executors and are scheduled by schedulers in core. It can be run as a external service with the following drawbacks:

1. It makes parameter server a lit bit harder to use. To write a machine learning algorithm, user have to new a parameter server client in each partition(in `mapPartition`) and use that client to communicate with server. I don't know if creating a external connector in `map` or `mapPartition` is a good idea. 
2. When a task have finished some iterations and fails, it should not rerun finished iterations after scheduled to run on another node. this can't  be done without the change of core.
3. Another parameter server cluster is needed to store the parameters, users have to coordinate Spark and parameter servers to run a machine learning algorithm. 

So our implementation makes change to core.  




was (Author: chouqin):
[~srowen], thanks for your comments. 

{quote}
Yes I'm asking how these two relate. Is it a continuation? then one of these should be closed. Is it an unrelated implementation?
{quote}

[SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] is just a brainstorm issue and this is a small step to resolve it. We want to a separate issue to discuss our implementation, so we can improve our code and hopefully merge our implementation into Spark to help others.This is a continuation of [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] but others may have other ideas and implementations so I don't if [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] should be closed.

{quote}
I think you want to coordinate with  [~rezazadeh] 
{quote}

I have asked [~rezazadeh] in [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590] and he wanted to use IndexRDD as distributed array to store parameters. As I described in the above description, I don't think IndexRDD is a good option. 

{quote}
It's not clear why this needs a change to core; for example you certainly don't add a method to all RDDs that is specific to this parameter server. It sounds like an external process, so could start life externally and prove it's a good solution.
{quote}



Currently parameter server and client run on the executors and are scheduled by schedulers in core. It can be run as a external service with the following drawbacks:

1. It makes parameter server a lit bit harder to use. To write a machine learning algorithm, user have to new a parameter server client in each partition(in `mapPartition`) and use that client to communicate with server. I don't know if creating a external connector in `map` or `mapPartition` is a good idea. 
2. When a task have finished some iterations and fails, it should not rerun finished iterations after scheduled to run on another node. this can't  be done without the change of core.
3. Another parameter server cluster is needed to store the parameters, users have to coordinate Spark and parameter servers to run a machine learning algorithm. 

So our implementation makes change to core.  



> A Prototype of Parameter Server
> -------------------------------
>
>                 Key: SPARK-6932
>                 URL: https://issues.apache.org/jira/browse/SPARK-6932
>             Project: Spark
>          Issue Type: New Feature
>          Components: ML, MLlib
>            Reporter: Qiping Li
>
> h2. Introduction
> As specified in [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590],it would be very helpful to integrate parameter server into Spark for machine learning algorithms, especially for those with ultra high dimensions features. 
> After carefully studying the design doc of [Parameter Servers|https://docs.google.com/document/d/1SX3nkmF41wFXAAIr9BgqvrHSS5mW362fJ7roBXJm06o/edit?usp=sharing],and the paper of [Factorbird|http://stanford.edu/~rezab/papers/factorbird.pdf], we proposed a prototype of Parameter Server on Spark(Ps-on-Spark), with several key design concerns:
> * *User friendly interface*
> 	Careful investigation is done to most existing Parameter Server systems(including:  [petuum|http://petuum.github.io], [parameter server|http://parameterserver.org], [paracel|https://github.com/douban/paracel]) and a user friendly interface is design by absorbing essence from all these system. 
> * *Prototype of distributed array*
>     IndexRDD (see [SPARK-4590|https://issues.apache.org/jira/browse/SPARK-4590]) doesn't seem to be a good option for distributed array, because in most case, the #key updates/second is not be very high. 
>     So we implement a distributed HashMap to store the parameters, which can be easily extended to get better performance.
>     
> * *Minimal code change*
> 	Quite a lot of effort in done to avoid code change of Spark core. Tasks which need parameter server are still created and scheduled by Spark's scheduler. Tasks communicate with parameter server with a client object, through *akka* or *netty*.
> With all these concerns we propose the following architecture:
> h2. Architecture
> !https://cloud.githubusercontent.com/assets/1285855/7158179/f2d25cc4-e3a9-11e4-835e-89681596c478.jpg!
> Data is stored in RDD and is partitioned across workers. During each iteration, each worker gets parameters from parameter server then computes new parameters based on old parameters and data in the partition. Finally each worker updates parameters to parameter server.Worker communicates with parameter server through a parameter server client,which is initialized in `TaskContext` of this worker.
> The current implementation is based on YARN cluster mode, 
> but it should not be a problem to transplanted it to other modes. 
> h3. Interface
> We refer to existing parameter server systems(petuum, parameter server, paracel) when design the interface of parameter server. 
> *`PSClient` provides the following interface for workers to use:*
> {code}
> //  get parameter indexed by key from parameter server
> def get[T](key: String): T
> // get multiple parameters from parameter server
> def multiGet[T](keys: Array[String]): Array[T]
> // add parameter indexed by `key` by `delta`, 
> // if multiple `delta` to update on the same parameter,
> // use `reduceFunc` to reduce these `delta`s frist.
> def update[T](key: String, delta: T, reduceFunc: (T, T) => T): Unit
> // update multiple parameters at the same time, use the same `reduceFunc`.    
> def multiUpdate(keys: Array[String], delta: Array[T], reduceFunc: (T, T) => T: Unit
>     
> // advance clock to indicate that current iteration is finished.
> def clock(): Unit
>  
> // block until all workers have reached this line of code.
> def sync(): Unit
> {code}
> *`PSContext` provides following functions to use on driver:*
> {code}
> // load parameters from existing rdd.
> def loadPSModel[T](model: RDD[String, T]) 
> // fetch parameters from parameter server to construct model.
> def fetchPSModel[T](keys: Array[String]): Array[T]
> {code} 
>     
> *A new function has been add to `RDD` to run parameter server tasks:*
> {code}
> // run the provided `func` on each partition of this RDD. 
> // This function can use data of this partition(the first argument) 
> // and a parameter server client(the second argument). 
> // See the following Logistic Regression for an example.
> def runWithPS[U: ClassTag](func: (Array[T], PSClient) => U): Array[U]
>    
> {code}
> h2. Example
> Here is an example of using our prototype to implement logistic regression:
> {code:title=LogisticRegression.scala|borderStyle=solid}
> def train(
>     sc: SparkContext,
>     input: RDD[LabeledPoint],
>     numIterations: Int,
>     stepSize: Double,
>     miniBatchFraction: Double): LogisticRegressionModel = {
>     
>     // initialize weights
>     val numFeatures = input.map(_.features.size).first()
>     val initialWeights = new Array[Double](numFeatures)
>     // initialize parameter server context
>     val pssc = new PSContext(sc)
>     // load initialized weights into parameter server
>     val initialModelRDD = sc.parallelize(Array(("w", initialWeights)), 1)
>     pssc.loadPSModel(initialModelRDD)
>     // run logistic regression algorithm on input data   
>     input.runWithPS((arr, client) => {
>       val sampler = new BernoulliSampler[LabeledPoint](miniBatchFraction)
>       
>       // for each iteration, compute delta and update weights
>       for (i <- 0 to numIterations) {
>         // get weights from parameter server
>         val weights = Vectors.dense(client.get[Array[Double]]("w"))
>         sampler.setSeed(i + 42)
>         // for each sample point, compute delta and update weights
>         sampler.sample(arr.toIterator).foreach { point =>
>           // compute delta
>           val data = point.features
>           val label = point.label
>           val margin = -1.0 * dot(data, weights)
>           val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
>           val delta = Vectors.dense(new Array[Double](numFeatures))
>           axpy((-1) * stepSize / math.sqrt(i + 1) * multiplier, data, delta)
>           // update weights
>           client.update("w", delta.toArray, (d1, d2) => {
>             d1.zip(d2).map((a, b) => a + b)
>           })
>         }
>         
>         // end of current iteration
>         client.clock()
>       }
>     })
>     // fetch weights from parameter server
>     val weights = Vectors.dense(pssc.fetchPSModel[Array[Double]](Array("w"))(0))
>     val intercept = 0.0
>     // construct LogisiticRegressionModel
>     new LogisticRegressionModel(weights, intercept).clearThreshold()
> }
> {code}
> The above code can be run on  current PS-on-Spark implementation.
> h2. Other considerations
> The current implementation is just a prototype and we will try to improve it in the following directions: 
> h3. Consistency protocol
> Currently we have just implemented BSP protocol. And SSP consistency will be added soon.
> h3. Model partition across servers
> Currently all the parameters are stored on a single server. Parameters should be partitioned across multiple servers when the parameter size get large. Parameter server client should route request to different servers accordingly. 
> h3. Performance optimizing
> To get better performance, client can cache parameter servers and store updates through operation log(as petuum does). There may be some other ways to improve performance.
> h3. Fault Recovery
> When a parameter server crashes, it should be restarted on another node. Data of a parameter server should be periodically checkpointed so it can be transfered when a server is restarted.When a task is restarted, it should not rerun finished iterations. 
> We would like to see parameter server integrated into Spark soon and hope this help other Spark users who need parameter server. As specified above, there is still much work to be done so any comments are welcome.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org