You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/02/17 04:43:23 UTC

spark git commit: [SPARK-11627] Add initial input rate limit for spark streaming backpressure mechanism.

Repository: spark
Updated Branches:
  refs/heads/master 5f37aad48 -> 7218c0eba


[SPARK-11627] Add initial input rate limit for spark streaming backpressure mechanism.

https://issues.apache.org/jira/browse/SPARK-11627

Spark Streaming backpressure mechanism has no initial input rate limit, it might cause OOM exception.
In the firest batch task ,receivers receive data at the maximum speed they can reach,it might exhaust executors memory resources. Add a initial input rate limit value can make sure the Streaming job execute  success in the first batch,then the backpressure mechanism can adjust receiving rate adaptively.

Author: junhao <ju...@mogujie.com>

Closes #9593 from junhaoMg/junhao-dev.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7218c0eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7218c0eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7218c0eb

Branch: refs/heads/master
Commit: 7218c0eba957e0a079a407b79c3a050cce9647b2
Parents: 5f37aad
Author: junhao <ju...@mogujie.com>
Authored: Tue Feb 16 19:43:17 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Feb 16 19:43:17 2016 -0800

----------------------------------------------------------------------
 docs/configuration.md                                       | 8 ++++++++
 .../org/apache/spark/streaming/receiver/RateLimiter.scala   | 9 ++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7218c0eb/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 0dbfe3b..a2c0dfe 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1471,6 +1471,14 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.streaming.backpressure.initialRate</code></td>
+  <td>not set</td>
+  <td>
+    This is the initial maximum receiving rate at which each receiver will receive data for the
+    first batch when the backpressure mechanism is enabled.
+  </td>
+</tr>
+<tr>
   <td><code>spark.streaming.blockInterval</code></td>
   <td>200ms</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/7218c0eb/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index bca1fbc..6a1b672 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
 
   // treated as an upper limit
   private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
-  private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
+  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
 
   def waitToPush() {
     rateLimiter.acquire()
@@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
         rateLimiter.setRate(newRate)
       }
     }
+
+  /**
+   * Get the initial rateLimit to initial rateLimiter
+   */
+  private def getInitialRateLimit(): Long = {
+    math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org