You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by suyanNone <gi...@git.apache.org> on 2014/12/03 12:22:03 UTC

[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

GitHub user suyanNone opened a pull request:

    https://github.com/apache/spark/pull/3582

    [SPARK-4721][CORE] Improve logic while first thread put block failed

    1. make thread which wait old block info try one by one while the first thread which created that block failed.
    2. use reentrantLock instead of this.syn{} in block info.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/suyanNone/spark refine-block-put

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3582.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3582
    
----
commit 740e479da941d9f213e3d65825e60d73ff70d8aa
Author: hushan[胡珊] <hu...@xiaomi.com>
Date:   2014-12-03T11:18:23Z

    Make wait thread try to put one by one if first thread failed

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the pull request:

    https://github.com/apache/spark/pull/3582#issuecomment-65891778
  
     Sorry for my poor comments and English.
    
    In all, 
    1. we do put one thread by one thread until there have 1 thread succeed.
    2. multiple doGetLocal threads and only 1 dropFromMemory thread will wait 1 time whenever put is succeed or failed.  doGetLocal get failed, the return none. dropFromMemory get failed, return none.
    
    There are 3 places call info.waitForReady()
    1. doGetLocal
    2. dropFromMemory
    3. doPut
    
    and if there are many thread try to put the same block.
    for 1, do doGetLocal, I think just wait for one time(Wait1Condition, now renamed as OtherCondition), succeed or failed.
    for 2, actually it will never have the situation if we call dropFromMemory but the block is not ready. but in current code there are have a info.waitForReady method call in dropFromMemory, just for compatibility, let's wait only one time(Wait1Condition) for block put succeed or failed. and also think, if we found one thread do the dropFromMemory, we should cancel all put threads.
    for 3, do all put threads one by one untill there have a success or have a thread want drop it from memory as we described in 2. it may  can fails many times, so WaitNCondition(now named as PutCondition)
    
    All I want to do for WaitType(now I rename BlockWaitCondition), just reuse enum convenience to call method and have a variable can record number of thread wait for that block finish put. and Each Block object have its own wait count, so I use extends Enumration.
           
    
    
    
    
    
    
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone closed the pull request at:

    https://github.com/apache/spark/pull/3582


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the pull request:

    https://github.com/apache/spark/pull/3582#issuecomment-66031430
  
    @JoshRosen 
    
    I guess:
    1. Tow Thread in Same Executor
    1.1 Two Thread in  same Executor,
           Executor have 4 core, and cpu per task is 1.
                                 RDDC <-- other:  TaskSet1
    RDDA.cache <--
                                 RDDB <-- other:   TaskSet2
           Thread A give to TaskSet1
           Thread B give to TaskSet2, because A can't get any task because the locality or A's tasks are all scheduled.
            It so chanced that Thread A and B all deal with the same partition at the same time.
     
    
    2.  Two Thread in different Executor.
    2.2  replicate blockA  to ExecutorB, and ExecutorB is just to cache blockA after call cacheManager.putInBlockManager.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3582#discussion_r21352570
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockInfo.scala ---
    @@ -81,3 +101,45 @@ private object BlockInfo {
       private val BLOCK_PENDING: Long = -1L
       private val BLOCK_FAILED: Long = -2L
     }
    +
    +class WaitType(conditions: mutable.HashMap[String, Condition]) extends Enumeration {
    --- End diff --
    
    I'm having a hard time understanding this class.  Can you please add comments to explain it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3582#issuecomment-65911726
  
    To clarify a bit further: I think that `BlockInfo.waitForReady()` is designed to allow callers to block until a block write has completed.  If we have many threads (readers) waiting for a block to be written, then think we should be okay because `notifyAll()` will wake all of those threads when the block becomes ready.
    
    From your description, it sounds like you're worried about a multiple writer-threads case, where we have many threads attempting to write the same block and a failed write attempt from _one_ of them wakes up the waiting threads and notifies them of a failure even though there's another write in progress which might succeed.  Is your goal to wait until _all_ of the pending writes have failed before notifying a reader that the write has failed and to wait for _one_ of them to succeed before notifying the reader that the write succeeded?
    
    I'll have to dig into the BlockManager internals to see whether we can ever have multiple in-progress writes for the same block.  Do you have an example of when this can happen?
    
    I'd be happy to look over the code and provide more feedback / suggestions, but I want to make sure that I understand the motivation and confirm that this is fixing an actual bug, since it seems like this adds a moderate amount of complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3582#issuecomment-65393427
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3582#issuecomment-65740233
  
    Hi @suyanNone,
    
    I took a brief look through this PR but I'm having a hard time understanding how this code works.  I could some time to look more closely, but it would be very helpful if you could add some comments to explain what purpose the `WaitType` class serves and what its methods do.  For tricky synchronization / thread-safety logic like this, it's better to err on the side of more comments.
    
    Also, something as complex as `WaitType` should have its own tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3582#discussion_r21352462
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockInfo.scala ---
    @@ -26,6 +29,17 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
       private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
       private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
     
    +  val lock = new ReentrantLock()
    +  private val waitNCondition = lock.newCondition()
    +  private val wait1Condition = lock.newCondition()
    +
    +  val waitTypes = new WaitType(new mutable.HashMap +=
    +                            ("PUT" -> waitNCondition,
    +                            "DROP" -> wait1Condition,
    +                            "GET" -> wait1Condition))
    +
    +  var removed = false
    --- End diff --
    
    This should probably be placed next to the previous group of variables (the one that holds `failed`, `pending`, etc).  It should also be private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3582#discussion_r21352490
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockInfo.scala ---
    @@ -26,6 +29,17 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
       private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
       private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
     
    +  val lock = new ReentrantLock()
    +  private val waitNCondition = lock.newCondition()
    +  private val wait1Condition = lock.newCondition()
    +
    +  val waitTypes = new WaitType(new mutable.HashMap +=
    --- End diff --
    
    I think you can just do  `Map("PUT" -> waitNCondition, "DROP" -> wait1Condition, "GET" -> wait1Condition)` instead of using a mutable map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by suyanNone <gi...@git.apache.org>.
Github user suyanNone commented on the pull request:

    https://github.com/apache/spark/pull/3582#issuecomment-65970969
  
    @JoshRosen 
    Okay, I will check if there will be multiple thread to be the writer. I write this code because while I read the current code, it think there will be multiple thread to put the same block.  So I just start from that point to improve it. I will comment later about that. 
    
    and for my code,  the read thread only wait one time whether is succeed or failed.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-4721][CORE] Improve logic while first t...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3582#discussion_r21352510
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockInfo.scala ---
    @@ -26,6 +29,17 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
       private def failed: Boolean = size == BlockInfo.BLOCK_FAILED
       private def initThread: Thread = BlockInfo.blockInfoInitThreads.get(this)
     
    +  val lock = new ReentrantLock()
    +  private val waitNCondition = lock.newCondition()
    --- End diff --
    
    These variables could use comments to explain what they're used for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org