You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/06/16 08:16:14 UTC

spark git commit: [SPARK-8367] [STREAMING] Add a limit for 'spark.streaming.blockInterval` since a data loss bug.

Repository: spark
Updated Branches:
  refs/heads/master bc76a0f75 -> ccf010f27


[SPARK-8367] [STREAMING] Add a limit for 'spark.streaming.blockInterval` since a data loss bug.

Bug had reported in the jira [SPARK-8367](https://issues.apache.org/jira/browse/SPARK-8367)
The relution is limitting the configuration `spark.streaming.blockInterval` to a positive number.

Author: huangzhaowei <ca...@gmail.com>
Author: huangzhaowei <Sa...@users.noreply.github.com>

Closes #6818 from SaintBacchus/SPARK-8367 and squashes the following commits:

c9d1927 [huangzhaowei] Update BlockGenerator.scala
bd3f71a [huangzhaowei] Use requre instead of if
3d17796 [huangzhaowei] [SPARK_8367][Streaming]Add a limit for 'spark.streaming.blockInterval' since a data loss bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccf010f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccf010f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccf010f2

Branch: refs/heads/master
Commit: ccf010f27bc62f7e7f409c6eef7488ab476de609
Parents: bc76a0f
Author: huangzhaowei <ca...@gmail.com>
Authored: Tue Jun 16 08:16:09 2015 +0200
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jun 16 08:16:09 2015 +0200

----------------------------------------------------------------------
 .../org/apache/spark/streaming/receiver/BlockGenerator.scala     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccf010f2/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 8d73593..92b51ce 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.util.SystemClock
 
 /** Listener object for BlockGenerator events */
 private[streaming] trait BlockGeneratorListener {
@@ -80,6 +80,8 @@ private[streaming] class BlockGenerator(
 
   private val clock = new SystemClock()
   private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
+  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")
+
   private val blockIntervalTimer =
     new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
   private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org