You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sarutak <gi...@git.apache.org> on 2014/07/29 08:35:50 UTC

[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

GitHub user sarutak opened a pull request:

    https://github.com/apache/spark/pull/1632

    [SPARK-2677] BasicBlockFetchIterator#next can wait forever

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sarutak/spark SPARK-2677

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1632.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1632
    
----
commit f9a25db2f6881123188bf522d7ae0d5ceff73264
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Date:   2014-07-28T23:30:56Z

    temporary commit

commit f1dcd5a58476c213b135e6f17c079e5e7209157b
Author: sarutak <sarutak@hd106.(none)>
Date:   2014-07-29T06:31:46Z

    Modified to enable Remote Fetch Timeout

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16095403
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -72,6 +73,7 @@ private[spark] class ConnectionManager(
     
       // default to 30 second timeout waiting for authentication
       private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
    +  private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30)
    --- End diff --
    
    60 is better


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323483
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +654,21 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    +                if (isAckTimeout) {
    +                  logWarning(s"Ack message ${message.id} maybe received after timeout")
    +                }
    --- End diff --
    
    The original code (before considering ack timeout) threw Exception when a message which is not referenced is received. So , I decided to throw Exception even if Ack timeout was occurred because we can't distinguish the  non-referenced message is caused by ack timeout.
    
    On a second throug, fundamentally, is it needed throwing exception here?
    When we receive non-referenced message, should we warn simply right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323528
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +654,21 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    +                if (isAckTimeout) {
    +                  logWarning(s"Ack message ${message.id} maybe received after timeout")
    +                }
    --- End diff --
    
    O.K. I'll remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52386455
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18669/consoleFull) for   PR 1632 at commit [`cddbc7b`](https://github.com/apache/spark/commit/cddbc7b534e68e6d4b50a1c327de5c85ac977844).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16094928
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -836,9 +845,14 @@ private[spark] class ConnectionManager(
       def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
           : Future[Message] = {
         val promise = Promise[Message]()
    +
    +    val ackTimeoutMonitor =  new Timer(s"Ack Timeout Monitor-" +
    +      "${connectionManagerId}-MessageId(${message.id})", true)
    +
         val status = new MessageStatus(message, connectionManagerId, s => {
    +      ackTimeoutMonitor.cancel()
    --- End diff --
    
    On closer inspection, I guess the race only sets `isAckTimeout` because the `foreach` in `messageStatuses.remove(message.id).foreach` will guard against the race, so we won't end up failing the promise after calling success on it from the other thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51861681
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52385383
  
    Actually, I retract my earlier LGTM; this needs a bit of user-facing configuration documentation and I think there's a corner-case bug in how we handle late-arriving ACKs.  I'd like for this to make it into the next 1.1.0 preview release, so I may fix these issues myself in a new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16095159
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +655,25 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    -                throw new Exception("Could not find reference for received ack message " +
    -                  message.id)
    +                /**
    +                 * If isAckTimeout == true, Future returned from sendMessageReliably should fail
    +                 * and finally, FetchFailedException is thrown so in this case, we don't need
    +                 * to throw Exception here
    +                 */
    +                if (!isAckTimeout) {
    --- End diff --
    
    Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message.  I guess that `isAckTimeout` is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.
    
    It might be a good idea to `logWarning` if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16308568
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -836,9 +842,26 @@ private[spark] class ConnectionManager(
       def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
           : Future[Message] = {
         val promise = Promise[Message]()
    +
    +    val timeoutTask = new TimerTask {
    +      override def run(): Unit = {
    +        messageStatuses.synchronized {
    +          isAckTimeout = true
    +          messageStatuses.remove(message.id).foreach ( s => {
    +            s.synchronized {
    --- End diff --
    
    Why is this `synchronized` needed?  Aren't all writes to the promise already guarded by the `messageStatuses.synchronized` lock?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51865034
  
    QA results for PR 1632:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18346/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51866549
  
    @shivaram That's a really good suggestion.  I'll try to write a failing unit test that directly uses BasicBlockFetcherIterator so that we can test your approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51867662
  
    Hi @shivaram , @JoshRosen 
    
    At first, I have an idea to use poll. I thought it's the easy way.
    But, if we use poll and catch TimeoutException, I think, ConnectionManager's state is not reset.
    It means, Future object should wait Promise forever.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by lianhuiwang <gi...@git.apache.org>.
Github user lianhuiwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r15573389
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -117,31 +121,45 @@ object BlockFetcherIterator {
           })
           bytesInFlight += req.size
           val sizeMap = req.blocks.toMap  // so we can look up the size of each blockID
    -      val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
    -      future.onSuccess {
    -        case Some(message) => {
    -          val bufferMessage = message.asInstanceOf[BufferMessage]
    -          val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
    -          for (blockMessage <- blockMessageArray) {
    -            if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
    -              throw new SparkException(
    -                "Unexpected message " + blockMessage.getType + " received from " + cmId)
    +
    +      val sendRequestThread = new Thread(s"sendRequestThread(${req.address.host}:${req.address.port})") {
    --- End diff --
    
    how about to control the number of connect in a blockfetch. that can reduce server's connect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51868710
  
    I think the current solution is better. `LinkedBlockingQueue .poll` will bring a lot of problems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52387365
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18669/consoleFull) for   PR 1632 at commit [`cddbc7b`](https://github.com/apache/spark/commit/cddbc7b534e68e6d4b50a1c327de5c85ac977844).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ShuffleBlockManager(blockManager: BlockManager,`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16091154
  
    --- Diff: core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala ---
    @@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {
     
       }
     
    +  test("sendMessageRelyably timeout") {
    --- End diff --
    
    Spelling: should be `sendMessageReliably`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16094973
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -836,9 +845,14 @@ private[spark] class ConnectionManager(
       def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
           : Future[Message] = {
         val promise = Promise[Message]()
    +
    +    val ackTimeoutMonitor =  new Timer(s"Ack Timeout Monitor-" +
    --- End diff --
    
    I think we can create a ConnectionManager-wide Timer and submit multiple tasks to it.  MessageStatus could store a reference to the TimerTasks and we could cancel those individual tasks when we receive acks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323402
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
         var ackMessage: Option[Message] = None
     
         def markDone(ackMessage: Option[Message]) {
    -      this.synchronized {
    -        this.ackMessage = ackMessage
    -        completionHandler(this)
    -      }
    +      this.ackMessage = ackMessage
    +      completionHandler(this)
         }
       }
     
       private val selector = SelectorProvider.provider.openSelector()
    +  private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
     
       // default to 30 second timeout waiting for authentication
       private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
    +  private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
    --- End diff --
    
    This should be documented in doc/configuration.md.  When users expect long GC pauses, they may wish to increase timeouts across the board, so it's important that all timeout options are documented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16095425
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -72,6 +73,7 @@ private[spark] class ConnectionManager(
     
       // default to 30 second timeout waiting for authentication
       private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
    +  private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30)
    --- End diff --
    
    Year, maybe right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52385166
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51861905
  
    QA tests have started for PR 1632. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18346/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323399
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +654,21 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    +                if (isAckTimeout) {
    +                  logWarning(s"Ack message ${message.id} maybe received after timeout")
    +                }
    --- End diff --
    
    I think there should be an an `else` block here so that we throw an exception only if we haven't hit the ack timeout.
    
    This current code looks wrong because it will fall-through and throw an exception if we receive late-arriving messages that we've already timed out on and marked as failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51868341
  
    O.K. I'll try to resolve using poll somehow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52385196
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18662/consoleFull) for   PR 1632 at commit [`e85f88b`](https://github.com/apache/spark/commit/e85f88bba721f6ef89521a64dd9afd78d86ef964).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323524
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +654,21 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    +                if (isAckTimeout) {
    +                  logWarning(s"Ack message ${message.id} maybe received after timeout")
    +                }
    --- End diff --
    
    Yeah, that's fine, too.  Let's just drop the exception for now and remove the `isAckTimeout` variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52348963
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18623/consoleFull) for   PR 1632 at commit [`7ed48be`](https://github.com/apache/spark/commit/7ed48be337f469b75a1ba0c85b6817e5beb9f3a6).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51864387
  
    Another way to solve this is to change the BasicBlockFetcher to use `poll` with a timeout in LinkedBlockingQueue [1]
    
    [1] http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#poll(long,%20java.util.concurrent.TimeUnit)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-50440763
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r15508224
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala ---
    @@ -117,31 +121,45 @@ object BlockFetcherIterator {
           })
           bytesInFlight += req.size
           val sizeMap = req.blocks.toMap  // so we can look up the size of each blockID
    -      val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
    -      future.onSuccess {
    -        case Some(message) => {
    -          val bufferMessage = message.asInstanceOf[BufferMessage]
    -          val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
    -          for (blockMessage <- blockMessageArray) {
    -            if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
    -              throw new SparkException(
    -                "Unexpected message " + blockMessage.getType + " received from " + cmId)
    +
    +      val sendRequestThread = new Thread(s"sendRequestThread(${req.address.host}:${req.address.port})") {
    --- End diff --
    
    New thread affect performance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323512
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +654,21 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    +                if (isAckTimeout) {
    +                  logWarning(s"Ack message ${message.id} maybe received after timeout")
    +                }
    --- End diff --
    
    I think, the situation receiving non-referenced message is not critical, we should not throw Exception at least so I think log warn is better when receiving non-referenced message even if the message is late arriving ack or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16323500
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +654,21 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    +                if (isAckTimeout) {
    +                  logWarning(s"Ack message ${message.id} maybe received after timeout")
    +                }
    --- End diff --
    
    Before, it was undoubtedly a bug if we received an ack for a message and didn't have a corresponding SentMessageStatus.
    
    Now that we have timeouts, we have no way to distinguish between a late-arriving ack for a SentMessageStatus that we've already deleted and a bogus ack sent due to buggy code.  As I commented upthread, one option would be to simply convert this into a warning.  But another option is to keep it as an error _unless_ we've timed out at least once, in which case we treat it as a warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51873138
  
    @sarutak You are right that using poll wouldn't clear up the internal state in ConnectionManager. I think @JoshRosen 's idea of using a shared timer pool or re-using some of the existing thread pools (the future execution context ?) might be fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52279573
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18600/consoleFull) for   PR 1632 at commit [`66cfff7`](https://github.com/apache/spark/commit/66cfff765f44aea7ff2bf5afe1a29403e79b7951).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16091512
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -836,9 +845,14 @@ private[spark] class ConnectionManager(
       def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
           : Future[Message] = {
         val promise = Promise[Message]()
    +
    +    val ackTimeoutMonitor =  new Timer(s"Ack Timeout Monitor-" +
    +      "${connectionManagerId}-MessageId(${message.id})", true)
    +
         val status = new MessageStatus(message, connectionManagerId, s => {
    +      ackTimeoutMonitor.cancel()
    --- End diff --
    
    Even though access to `isAckTimeout` is guarded by the `messageStatuses` lock, I think there's still a race condition here.  According to the [Timer.cancel()](http://docs.oracle.com/javase/7/docs/api/java/util/Timer.html#cancel()) docs, `cancel()` "does not interfere with a currently executing task (if it exists)."  So, there's still the potential for the timeout task to race with the message, start running and get blocked waiting to synchronize, and eventually run and throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52343198
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18623/consoleFull) for   PR 1632 at commit [`7ed48be`](https://github.com/apache/spark/commit/7ed48be337f469b75a1ba0c85b6817e5beb9f3a6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16095492
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -22,6 +22,7 @@ import java.nio._
     import java.nio.channels._
     import java.nio.channels.spi._
     import java.net._
    +import java.util.{Timer, TimerTask}
    --- End diff --
    
    [HashedWheelTimer](http://netty.io/4.0/api/io/netty/util/HashedWheelTimer.html)  performance is better


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52406146
  
    I've merged this to `master` and `branch-1.1`.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51872227
  
    @sarutak I left updates on a couple of my earlier comments.  This solution can work and I have a few suggestions for minor cleanup (e.g. re-using a Timer).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16091329
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -134,6 +136,7 @@ private[spark] class ConnectionManager(
       // to be able to track asynchronous messages
       private val idCount: AtomicInteger = new AtomicInteger(1)
     
    +  private var isAckTimeout = false
    --- End diff --
    
    Logically, this seems like per-message state, so it feels wrong to have it as a ConnectionManager-wide variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51709077
  
    In #1758 @JoshRosen fixed ConnectionManager to handle the case remote executor return error message.
    But, the case remote executor hangs up is not handled so if remote executor cannot return any message, fetching executor still waits forever.
    
    The latest PR  fixes this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52342830
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51872672
  
    @JoshRosen Thanks!
    I'll try it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16091409
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -836,9 +845,14 @@ private[spark] class ConnectionManager(
       def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
           : Future[Message] = {
         val promise = Promise[Message]()
    +
    +    val ackTimeoutMonitor =  new Timer(s"Ack Timeout Monitor-" +
    --- End diff --
    
    Creating a new watchdog thread for each message seems like it could be really expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1632


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16282025
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -836,9 +845,14 @@ private[spark] class ConnectionManager(
       def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
           : Future[Message] = {
         val promise = Promise[Message]()
    +
    +    val ackTimeoutMonitor =  new Timer(s"Ack Timeout Monitor-" +
    --- End diff --
    
    I've modified for ConnectionManager-wide timer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51867803
  
    The reason why I din't use Await.ready and Await.result is because those are blocking method. Current way which use onComplete callback is non-blocking.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51825773
  
    Jenkins, this is ok to test.  Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52347790
  
    @JoshRosen Exactly, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52385237
  
    Pending Jenkins, this looks good to me.  Committers, feel free to merge (or I'll do it).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by loveconan1988 <gi...@git.apache.org>.
Github user loveconan1988 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1632#discussion_r16153410
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---
    @@ -652,19 +655,25 @@ private[spark] class ConnectionManager(
               }
             }
             if (bufferMessage.hasAckId()) {
    -          val sentMessageStatus = messageStatuses.synchronized {
    +          messageStatuses.synchronized {
                 messageStatuses.get(bufferMessage.ackId) match {
                   case Some(status) => {
                     messageStatuses -= bufferMessage.ackId
    -                status
    +                status.markDone(Some(message))
                   }
                   case None => {
    -                throw new Exception("Could not find reference for received ack message " +
    -                  message.id)
    +                /**
    +                 * If isAckTimeout == true, Future returned from sendMessageReliably should fail
    +                 * and finally, FetchFailedException is thrown so in this case, we don't need
    +                 * to throw Exception here
    +                 */
    +                if (!isAckTimeout) {
    --- End diff --
    
    ------------------ 原始邮件 ------------------
      发件人: "Josh Rosen";<no...@github.com>;
     发送时间: 2014年8月12日(星期二) 中午12:24
     收件人: "apache/spark"<sp...@noreply.github.com>; 
     
     主题: Re: [spark] [SPARK-2677] BasicBlockFetchIterator#next can waitforever (#1632)
    
     
    
     
    In core/src/main/scala/org/apache/spark/network/ConnectionManager.scala:
    >                } >                case None => { > -                throw new Exception("Could not find reference for received ack message " + > -                  message.id) > +                /** > +                 * If isAckTimeout == true, Future returned from sendMessageReliably should fail > +                 * and finally, FetchFailedException is thrown so in this case, we don't need > +                 * to throw Exception here > +                 */ > +                if (!isAckTimeout) {  
    Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.
     
    It might be a good idea to logWarning if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.
     
    —
    Reply to this email directly or view it on GitHub.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-51863140
  
    This approach seems very complicated, race-prone, and hard to understand.
    
    `Await.ready` and `Await.result` already support timeouts, so why not just add timeout logic at those call sites?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52385869
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18662/consoleFull) for   PR 1632 at commit [`e85f88b`](https://github.com/apache/spark/commit/e85f88bba721f6ef89521a64dd9afd78d86ef964).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2677] BasicBlockFetchIterator#next can ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1632#issuecomment-52280997
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18605/consoleFull) for   PR 1632 at commit [`7ed48be`](https://github.com/apache/spark/commit/7ed48be337f469b75a1ba0c85b6817e5beb9f3a6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org