You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2014/08/30 21:08:07 UTC

[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/2217

    [SPARK-3327] Make broadcasted value mutable for caching useful information

    This PR makes broadcasted value mutable for caching useful information when implementing some algorithms that iteratively run with those information.
    
    Some algorithms can be implemented more efficiently if we can update broadcasted variables and use them in later iterations.
    
    Specifically, we would like to performa operation "A" on each partition of data. Some variables are updated. Then we want to run operation "B" on the data too. "B" operation uses the variables updated by operation "A".
    
    One of the examples is the Liblinear on Spark from Dr. Lin. They discuss the problem in Section IV.D of the paper "Large-scale Logistic Regression and Linear Support Vector Machines Using Spark."
    
    This implementation can make sure that updated broadcasted variables are consistent after parallel data operations (such as map) are done on the broadcasted variables at same node.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 mutable_broadcast

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2217.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2217
    
----
commit 274b7daccb9986e1de68a6669a8893d3a3818cd0
Author: Liang-Chi Hsieh <vi...@gmail.com>
Date:   2014-08-30T18:37:41Z

    Make broadcasted value mutable for caching useful information when implementing some algorithms.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54344150
  
    Thanks for the PR. Do you mind closing it?
    
    In any case, this changes a fundamental assumption in broadcast variables, and can brings in potentially unsafe consequences and break existing code somewhere else in Spark or in other frameworks built on top of Spark. I believe you should be able to find workarounds even without changing the mutability of broadcast variables. It is best to not do this right now.
    
    If you think we need a new primitive for doing this, you are welcome to write up a design doc that discusses the benefits and the long term maintainability of the primitive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54279202
  
    @rxin, I can get the idea that immutability makes the whole thing safer for broadcasted variables. So I am just wondering if it is worth providing such mutability for the use case I mentioned.
    
    If the data size of re-broadcasted variables is big such as very long vector or huge matrix, then the communication cost would be big for large-scale data.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-53967605
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54381921
  
    This discussion probably belongs in the dev/user list, but a simple workaround is to create an RDD of your 'mutable' state in the first iteration, cache it while computing the partition and call zipPartitions on this RDD with the data for the second iteration. Not sure if this completely covers your use case, but I've seen this pattern work for a bunch of ML algorithms implemented in the AMPLab.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54396114
  
    Thanks for your comments. I agree with Reynold too that it is a rough idea and I need much consideration with a clear design doc for that if I think we really need that primitive. I will close this PR.
    
    I know that there are many possible workarounds for that purpose such as @shivaram's suggestion, global static variable, etc. But these workarounds will need collect data from executors and re-send (re-broadcast, etc.) to executors again in next iteration. That is the main obstacle I want to overcome in order to save the communication cost, especially if the data to collect and re-send totally is huge. 
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54261213
  
    Why do you need to change the mutability of broadcast variable in order to enable that? In general, immutability makes the whole thing safer.
    
    You should still be able to accomplish what you are trying to do by simply checking whether the value has changed; and if it hasn't yet, just reuse the same broadcast variable object.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54295028
  
    @srowen Thanks for comment. In fact I want some persistent mutable states per data partition. I just achieve that goal with mutable broadcasted variables. I know that a broadcasted variable would be shared across many tasks. So I do use a Map variable to separate different variables for different data partition since I can differentiate them with partition index given by mapPartitionWithIndex.
    
    It might look at odds with Spark's design then. However, if mutable Broadcast variable is a bad idea, can we have a mechanism to have persistent mutable states per data partition? Or it is a bad idea too for Spark's design?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54281202
  
    @viirya I also don't quite get why mutability helps the use case you describe. You seem to want some persistent mutable state per executor. But that isn't the purpose of a Broadcast variable. I think it sort of looks related since a Broadcast variable may be shared across many tasks within one JVM, but that works exactly because it is immutable and it doesn't matter which copy of the immutable data you look at.
    
    Persistent state would have its own costs. You can no longer freely schedule tasks on any available executor. You have to deal with loss of the executor and that state. You could always implement that yourself with an external storage system if it made sense. It seems at odds with Spark's design though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya closed the pull request at:

    https://github.com/apache/spark/pull/2217


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54278826
  
    @rxin. I need a way to modify broadcasted variables locally and keep those variables for later use. The locally modified variables are used to store some values calculated at earlier stage of machine learning algorithm. Those values would be used at later stages.
    
    In particular, the algorithm calculates different parameter P for different data partitions using mapPartitionsWithIndex at its first stage. In later stage, the algorithm use parameter P to perform learning.
    
    Under current broadcasted variables, I need to collect calculated values of the earlier stage and re-broadcast them to later stages.
    
    Since current broadcasted variables are immutable, the earlier stage can not modify these variables locally for different partitions of data. So I am wondering if we can provide a mechanism to allow tasks to have locally mutable values for different partitions. Thus I do modify the broadcast interface to provide such function. However, maybe it should be separated from broadcast module.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54342643
  
    Why don't you just use a global static variable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-53977801
  
    Hi there,
    
    The immutability of broadcast variables might be assumed in other places in the code base. Since this approach requires re-broadcasting the entire contents anyways... why not just create a new broadcast variable? Old ones get cleaned up now when they go out of scope anyways.
    
    Also, currently this approach will destroy a broadcast variable even if it is still in scope. For instance, what if the broadcast variable is referenced on an executor between when `doDestroy` is called and when `putSingle` succeeds? This could lead to an exception on the executor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-54369065
  
    I agree with Reynold; mutability seems like it would add a lot of potential for confusion in exchange for not much benefit.  Even if we did decide that mutability was a good idea, I think we would need design docs to very precisely specify the expected behavior and communicate our guarantees / semantics to end-users.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3327] Make broadcasted value mutable fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the pull request:

    https://github.com/apache/spark/pull/2217#issuecomment-53983398
  
    Thanks for your comments. For the possibility to cause an exception on an executor, it happens when `synchronized` is not there. As `setValue` is wrapped in `TorrentBroadcast.synchronized`, I think that the exception caused by the conflict between `doDestroy ` and `putSingle ` is solved.
    
    I would like to know if the immutability of broadcasted variables is necessary. The re-broadcasting would not be happened because new value is immediately filled in to replace old value in a broadcasted variable so the broadcasted variable will not request corresponding block from other remote executors/driver's `BlockManager`. 
    
    In fact, this PR is intended to reduce re-broadcasting when running some algorithms. For some algorithms, we may calculate a parameter in the first step with `mapPartitions` on each partition of data. Then in later steps, the parameter can be reuse without re-calculation if we can keep it. If we collect the calculated values back from all tasks and broadcast again in later steps, there is redundant communication cost involved.
    
    So I would like to let broadcasted variables mutable on each executor. Thus in later steps of such algorithms, we can reuse those calculated values.
    
    Suggestions are welcome. Maybe we can have better solution on that.
    
    I just found that calling `doDestroy` would remove remote blocks and that is not what I want. So I added another commit to remove blocks locally instead.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org