You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dmcguire81 <gi...@git.apache.org> on 2015/04/17 19:52:04 UTC

[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

GitHub user dmcguire81 opened a pull request:

    https://github.com/apache/spark/pull/5559

    [SPARK-6985][streaming] Receiver maxRate over 1000 causes a StackOverflowError

    A simple truncation in integer division (on rates over 1000 messages / second) causes the existing implementation to sleep for 0 milliseconds, then call itself recursively; this causes what is essentially an infinite recursion, since the base case of the calculated amount of time having elapsed can't be reached before available stack space is exhausted. A fix to this truncation error is included in this patch.
    
    However, even with the defect patched, the accuracy of the existing implementation is abysmal (the error bounds of the original test were effectively [-30%, +10%], although this fact was obscured by hard-coded error margins); as such, when the error bounds were tightened down to [-5%, +5%], the existing implementation failed to meet the new, tightened, requirements. Therefore, an industry-vetted solution (from Guava) was used to get the adapted tests to pass.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dmcguire81/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5559
    
----
commit d6e107977e791ca70d7b118739e7160f4dec1867
Author: David McGuire <da...@nike.com>
Date:   2015-04-17T16:43:57Z

    Stack overflow error in RateLimiter on rates over 1000/s

commit 24b1bc02ecd458d5b34f32fe51a221cad621b26d
Author: David McGuire <da...@nike.com>
Date:   2015-04-17T17:00:07Z

    Fix truncation in integer division causing infinite recursion

commit 38f3ca8a4fb7845aebd855ccaa73339fba5ca091
Author: David McGuire <da...@nike.com>
Date:   2015-04-17T17:05:30Z

    Ratchet down the error rate to +/- 5%; tests fail

commit 27947177802c340e6e465f23006b592b6deb43b5
Author: David McGuire <da...@nike.com>
Date:   2015-04-17T17:11:02Z

    Replace the RateLimiter with the Guava implementation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94575083
  
    Jenkins, add to whitelist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94268379
  
      [Test build #30556 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30556/consoleFull) for   PR 5559 at commit [`29011bd`](https://github.com/apache/spark/commit/29011bd3e0190c2f6da68c503f659bf31e11b8ab).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94575454
  
      [Test build #30606 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30606/consoleFull) for   PR 5559 at commit [`d29d2e0`](https://github.com/apache/spark/commit/d29d2e060fe48e8a3f1e506bf2bf2cc13d99d751).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28618212
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.sleep(0)
    --- End diff --
    
    It's not clear that this is a no-op (because of system time granularity - see discussion [here](http://stackoverflow.com/questions/1600572/are-thread-sleep0-and-thread-yield-statements-equivalent)); however, this can't sleep for 1ms because the rate specified is *greater* than 1000 messages / second (1 message / ms), so the test would not actually trigger the throttling. It seems intuitive to replace this with Thread.yield(), since it's presumably necessary for a background thread to get a chance to process receiving messages, but performing that refactoring does not succeed, nor does simply removing the statement.
    
    Any suggestions? Am I understanding the threading model correctly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28617758
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala ---
    @@ -33,37 +33,13 @@ import java.util.concurrent.TimeUnit._
       */
     private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
     
    -  private var lastSyncTime = System.nanoTime
    -  private var messagesWrittenSinceSync = 0L
       private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
    -  private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
    +  private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
     
       def waitToPush() {
         if( desiredRate <= 0 ) {
           return
         }
    -    val now = System.nanoTime
    -    val elapsedNanosecs = math.max(now - lastSyncTime, 1)
    -    val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
    -    if (rate < desiredRate) {
    -      // It's okay to write; just update some variables and return
    -      messagesWrittenSinceSync += 1
    -      if (now > lastSyncTime + SYNC_INTERVAL) {
    -        // Sync interval has passed; let's resync
    -        lastSyncTime = now
    -        messagesWrittenSinceSync = 1
    -      }
    -    } else {
    -      // Calculate how much time we should sleep to bring ourselves to the desired rate.
    -      val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
    -      val elapsedTimeInMillis = elapsedNanosecs / 1000000
    -      val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
    -      if (sleepTimeInMillis > 0) {
    -        logTrace("Natural rate is " + rate + " per second but desired rate is " +
    -          desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
    -        Thread.sleep(sleepTimeInMillis)
    -      }
    -      waitToPush()
    -    }
    +    rateLimiter.acquire()
    --- End diff --
    
    Will change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94059773
  
    @srowen, @harishreedharan: Can one of you take a second pass?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94510003
  
      [Test build #30594 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30594/consoleFull) for   PR 5559 at commit [`8be6934`](https://github.com/apache/spark/commit/8be6934030ac2a0ce6b05de1670a5e5c0e771c8a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94575102
  
    Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94526771
  
      [Test build #30594 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30594/consoleFull) for   PR 5559 at commit [`8be6934`](https://github.com/apache/spark/commit/8be6934030ac2a0ce6b05de1670a5e5c0e771c8a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28641709
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.`yield`()
    --- End diff --
    
    Oh wow, didn't know that. That's certainly right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94509437
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94590849
  
      [Test build #30606 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30606/consoleFull) for   PR 5559 at commit [`d29d2e0`](https://github.com/apache/spark/commit/d29d2e060fe48e8a3f1e506bf2bf2cc13d99d751).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28641760
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.`yield`()
    --- End diff --
    
    I've tried all number of micro-waits, and nothing seems to do the trick like Thread.sleep(0). My hunch is that, given a longer test, the preemptive multithreading would even out and each thread would see sufficient CPU time, but, for such a concise run, the main test thread needs to be voluntarily cooperative.
    
    At any rate, there doesn't seem to be a good compromise between platform independence, intuitive appeal, and speed and repeatability of the tests, so I'm at a bit of a loss. Currently this defect is hamstringing our entire stream-processing operation (rate limiting doesn't work reliably at any rate, but just crashes obviously and violently for the documented rates), so I'd appreciate any guidance or shepherding you can provide.
    
    Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94071092
  
    Thanks for the feedback!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94068713
  
    Looks good. Thread.yield is also not a guarantee, though I think this is the best we can do for now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28615710
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.sleep(0)
    --- End diff --
    
    Why sleep(0) -- it's a no-op?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94590862
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30606/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94268421
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30556/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94267950
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28641435
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.`yield`()
    --- End diff --
    
    Unless I'm about to learn some crazy new syntax, this won't compile. `Thread.yield()`? but if the point is that this loop needs to happen "really fast" then just remove it? I get that this will flood the block generator much much more rapidly, but that might have already been happening with `yield()`. In any event, if you just want to wait less than a millisecond, you can call `Thread.sleep(0, [# nanoseconds])`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94539314
  
    @srowen, @harishreedharan: I think I loosened the constraints sufficiently to account for the thread timing differences on the build server, if one of you could kick off another a build, please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28642064
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.`yield`()
    --- End diff --
    
    What about no wait? I suppose I'm not sure why this depends deterministically on something like `yield` or `sleep(0)` since it doesn't necessarily do anything. If the problem is not going fast enough then I'd expect removing this line goes the fastest of all.
    
    I don't think anyone's arguing against the fix of course; this is about the test implementation only. In the worst case, `Thread.sleep(0)` with a comment isn't so bad but I want to fully explore this first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28641631
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.`yield`()
    --- End diff --
    
    Thanks for the suggestion; I'll experiment with a sleep time that's a scaled version of the original specification, since I'm not trying to meaningfully change the test, just tighten it up a bit.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/5559


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94038579
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28615875
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala ---
    @@ -33,37 +33,13 @@ import java.util.concurrent.TimeUnit._
       */
     private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
     
    -  private var lastSyncTime = System.nanoTime
    -  private var messagesWrittenSinceSync = 0L
       private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
    -  private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
    +  private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
     
       def waitToPush() {
         if( desiredRate <= 0 ) {
           return
         }
    -    val now = System.nanoTime
    -    val elapsedNanosecs = math.max(now - lastSyncTime, 1)
    -    val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
    -    if (rate < desiredRate) {
    -      // It's okay to write; just update some variables and return
    -      messagesWrittenSinceSync += 1
    -      if (now > lastSyncTime + SYNC_INTERVAL) {
    -        // Sync interval has passed; let's resync
    -        lastSyncTime = now
    -        messagesWrittenSinceSync = 1
    -      }
    -    } else {
    -      // Calculate how much time we should sleep to bring ourselves to the desired rate.
    -      val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
    -      val elapsedTimeInMillis = elapsedNanosecs / 1000000
    -      val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
    -      if (sleepTimeInMillis > 0) {
    -        logTrace("Natural rate is " + rate + " per second but desired rate is " +
    -          desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
    -        Thread.sleep(sleepTimeInMillis)
    -      }
    -      waitToPush()
    -    }
    +    rateLimiter.acquire()
    --- End diff --
    
    I had tried to rewrite this iteratively and succeeded, but, I like this a lot better! Looks good since the Guava one is also in events-per-second.
    
    Nit: can this just be:
    
    ```
    if (desiredRate > 0) {
     rateLimiter.acquire()
    }
    ```
    ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94269563
  
    Yeah, I thought some of those lines looked long. Rewrap smartly to keep under 100 per line. (And maybe you can fix the if statement spacing while you're at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28620711
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.sleep(0)
    --- End diff --
    
    Yeah at best it is equivalent to `Thread.yield()` but that wouldn't guarantee any particular behavior in this case. The thread may or may not yield, and other threads may or may not progress at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by dmcguire81 <gi...@git.apache.org>.
Github user dmcguire81 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28641649
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala ---
    @@ -176,7 +176,7 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
           blockGenerator.addData(count)
           generatedData += count
           count += 1
    -      Thread.sleep(1)
    +      Thread.`yield`()
    --- End diff --
    
    PS: It most certainly will compile! From the relevant section of the Scala language specification, 2.9 (p. 5):
    "
    Example 1.1.2 Backquote-enclosed strings are a solution when one needs to ac- cess Java identifiers that are reserved words in Scala. For instance, the statement Thread.yield() is illegal, since yield is a reserved word in Scala. However, here’s a work-around:
    Thread.‘yield‘()
    "
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94044200
  
    +1. This looks good. 
    
    Apart from the couple of things which Sean already pointed out (the Thread.sleep and the if condition), this looks good to be merged! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94268419
  
      [Test build #30556 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30556/consoleFull) for   PR 5559 at commit [`29011bd`](https://github.com/apache/spark/commit/29011bd3e0190c2f6da68c503f659bf31e11b8ab).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.
     * This patch does not change any dependencies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94526789
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30594/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/5559#issuecomment-94350661
  
    Jenkins, this is okay to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-6985][streaming] Receiver maxRate over ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5559#discussion_r28652021
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala ---
    @@ -33,37 +33,12 @@ import java.util.concurrent.TimeUnit._
       */
     private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
     
    -  private var lastSyncTime = System.nanoTime
    -  private var messagesWrittenSinceSync = 0L
       private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
    -  private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
    +  private lazy val rateLimiter = GuavaRateLimiter.create(desiredRate)
     
       def waitToPush() {
    -    if( desiredRate <= 0 ) {
    -      return
    -    }
    -    val now = System.nanoTime
    -    val elapsedNanosecs = math.max(now - lastSyncTime, 1)
    -    val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
    -    if (rate < desiredRate) {
    -      // It's okay to write; just update some variables and return
    -      messagesWrittenSinceSync += 1
    -      if (now > lastSyncTime + SYNC_INTERVAL) {
    -        // Sync interval has passed; let's resync
    -        lastSyncTime = now
    -        messagesWrittenSinceSync = 1
    -      }
    -    } else {
    -      // Calculate how much time we should sleep to bring ourselves to the desired rate.
    -      val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
    -      val elapsedTimeInMillis = elapsedNanosecs / 1000000
    -      val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
    -      if (sleepTimeInMillis > 0) {
    -        logTrace("Natural rate is " + rate + " per second but desired rate is " +
    -          desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
    -        Thread.sleep(sleepTimeInMillis)
    -      }
    -      waitToPush()
    +    if( desiredRate > 0 ) {
    --- End diff --
    
    Don't change this unless we have to push more changes, and this wasn't your change, but the spacing is off here -- should be no space inside parens but space outside. But don't bother with it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org