You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/07/11 01:01:15 UTC

git commit: [SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption.

Repository: spark
Updated Branches:
  refs/heads/master 40a8fef4e -> 2dd672485


[SPARK-1341] [Streaming] Throttle BlockGenerator to limit rate of data consumption.

Author: Issac Buenrostro <bu...@ooyala.com>

Closes #945 from ibuenros/SPARK-1341-throttle and squashes the following commits:

5514916 [Issac Buenrostro] Formatting changes, added documentation for streaming throttling, stricter unit tests for throttling.
62f395f [Issac Buenrostro] Add comments and license to streaming RateLimiter.scala
7066438 [Issac Buenrostro] Moved throttle code to RateLimiter class, smoother pushing when throttling active
ccafe09 [Issac Buenrostro] Throttle BlockGenerator to limit rate of data consumption.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2dd67248
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2dd67248
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2dd67248

Branch: refs/heads/master
Commit: 2dd67248503306bb08946b1796821e9f9ed4d00e
Parents: 40a8fef
Author: Issac Buenrostro <bu...@ooyala.com>
Authored: Thu Jul 10 16:01:08 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jul 10 16:01:08 2014 -0700

----------------------------------------------------------------------
 docs/configuration.md                           |  9 +++
 .../streaming/receiver/BlockGenerator.scala     |  3 +-
 .../spark/streaming/receiver/RateLimiter.scala  | 69 ++++++++++++++++++++
 .../spark/streaming/NetworkReceiverSuite.scala  | 38 +++++++++++
 4 files changed, 118 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2dd67248/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b84104c..0aea23a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -774,6 +774,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.streaming.receiver.maxRate</code></td>
+  <td>infinite</td>
+  <td>
+    Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
+    each stream will consume at most this number of records per second.
+    Setting this configuration to 0 or a negative number will put no limit on the rate.
+  </td>
+</tr>
+<tr>
   <td><code>spark.streaming.unpersist</code></td>
   <td>true</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/2dd67248/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 78cc2da..0316b68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -44,7 +44,7 @@ private[streaming] class BlockGenerator(
     listener: BlockGeneratorListener,
     receiverId: Int,
     conf: SparkConf
-  ) extends Logging {
+  ) extends RateLimiter(conf) with Logging {
 
   private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
 
@@ -81,6 +81,7 @@ private[streaming] class BlockGenerator(
    * will be periodically pushed into BlockManager.
    */
   def += (data: Any): Unit = synchronized {
+    waitToPush()
     currentBuffer += data
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2dd67248/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
new file mode 100644
index 0000000..e4f6ba6
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.receiver
+
+import org.apache.spark.{Logging, SparkConf}
+import java.util.concurrent.TimeUnit._
+
+/** Provides waitToPush() method to limit the rate at which receivers consume data.
+  *
+  * waitToPush method will block the thread if too many messages have been pushed too quickly,
+  * and only return when a new message has been pushed. It assumes that only one message is
+  * pushed at a time.
+  *
+  * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
+  * per second that each receiver will accept.
+  *
+  * @param conf spark configuration
+  */
+private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
+
+  private var lastSyncTime = System.nanoTime
+  private var messagesWrittenSinceSync = 0L
+  private val desiredRate = conf.getInt("spark.streaming.receiver.maxRate", 0)
+  private val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+
+  def waitToPush() {
+    if( desiredRate <= 0 ) {
+      return
+    }
+    val now = System.nanoTime
+    val elapsedNanosecs = math.max(now - lastSyncTime, 1)
+    val rate = messagesWrittenSinceSync.toDouble * 1000000000 / elapsedNanosecs
+    if (rate < desiredRate) {
+      // It's okay to write; just update some variables and return
+      messagesWrittenSinceSync += 1
+      if (now > lastSyncTime + SYNC_INTERVAL) {
+        // Sync interval has passed; let's resync
+        lastSyncTime = now
+        messagesWrittenSinceSync = 1
+      }
+    } else {
+      // Calculate how much time we should sleep to bring ourselves to the desired rate.
+      val targetTimeInMillis = messagesWrittenSinceSync * 1000 / desiredRate
+      val elapsedTimeInMillis = elapsedNanosecs / 1000000
+      val sleepTimeInMillis = targetTimeInMillis - elapsedTimeInMillis
+      if (sleepTimeInMillis > 0) {
+        logTrace("Natural rate is " + rate + " per second but desired rate is " +
+          desiredRate + ", sleeping for " + sleepTimeInMillis + " ms to compensate.")
+        Thread.sleep(sleepTimeInMillis)
+      }
+      waitToPush()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2dd67248/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index d9ac3c9..f4e11f9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -145,6 +145,44 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
     assert(recordedData.toSet === generatedData.toSet)
   }
 
+  test("block generator throttling") {
+    val blockGeneratorListener = new FakeBlockGeneratorListener
+    val blockInterval = 50
+    val maxRate = 200
+    val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString).
+      set("spark.streaming.receiver.maxRate", maxRate.toString)
+    val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+    val expectedBlocks = 20
+    val waitTime = expectedBlocks * blockInterval
+    val expectedMessages = maxRate * waitTime / 1000
+    val expectedMessagesPerBlock = maxRate * blockInterval / 1000
+    val generatedData = new ArrayBuffer[Int]
+
+    // Generate blocks
+    val startTime = System.currentTimeMillis()
+    blockGenerator.start()
+    var count = 0
+    while(System.currentTimeMillis - startTime < waitTime) {
+      blockGenerator += count
+      generatedData += count
+      count += 1
+      Thread.sleep(1)
+    }
+    blockGenerator.stop()
+
+    val recordedData = blockGeneratorListener.arrayBuffers
+    assert(blockGeneratorListener.arrayBuffers.size > 0)
+    assert(recordedData.flatten.toSet === generatedData.toSet)
+    // recordedData size should be close to the expected rate
+    assert(recordedData.flatten.size >= expectedMessages * 0.9 &&
+      recordedData.flatten.size <= expectedMessages * 1.1 )
+    // the first and last block may be incomplete, so we slice them out
+    recordedData.slice(1, recordedData.size - 1).foreach { block =>
+      assert(block.size >= expectedMessagesPerBlock * 0.8 &&
+        block.size <= expectedMessagesPerBlock * 1.2 )
+    }
+  }
+
   /**
    * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
    */