You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/31 19:13:10 UTC

[07/16] git commit: Updated testsuites to work with the slack time of file stream.

Updated testsuites to work with the slack time of file stream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8ca14a1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8ca14a1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8ca14a1e

Branch: refs/heads/master
Commit: 8ca14a1e5157a449d1fa7dc0657079ed82c3c4be
Parents: b31e91f
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Dec 23 16:27:00 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 23 16:27:00 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/dstream/FileInputDStream.scala | 10 +++++++++-
 .../org/apache/spark/streaming/CheckpointSuite.scala      |  6 ++++++
 .../org/apache/spark/streaming/InputStreamsSuite.scala    |  8 +++++++-
 3 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ca14a1e/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index fb52bcf..b526a43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -201,7 +201,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
     // Creating an RDD from a HDFS file immediately after the file is created sometime returns
     // an RDD with 0 partitions. To avoid that, we introduce a slack time - files that are older
     // than slack time from current time is considered for processing.
-    val slackTime = System.getProperty("spark.streaming.filestream.slackTime", "2000").toLong
+    val slackTime = System.getProperty("spark.streaming.fileStream.slackTime", "2000").toLong
     val maxModTime = currentTime - slackTime
 
     def accept(path: Path): Boolean = {
@@ -237,4 +237,12 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
 private[streaming]
 object FileInputDStream {
   def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
+
+  private[streaming] def disableSlackTime() {
+    System.setProperty("spark.streaming.fileStream.slackTime", "0")
+  }
+
+  private[streaming] def restoreSlackTime() {
+    System.clearProperty("spark.streaming.fileStream.slackTime")
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ca14a1e/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 4e25c95..0347cc1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -200,6 +200,9 @@ class CheckpointSuite extends TestSuiteBase {
     val clockProperty = System.getProperty("spark.streaming.clock")
     System.clearProperty("spark.streaming.clock")
 
+    // Disable slack time of file stream when testing with local file system
+    FileInputDStream.disableSlackTime()
+
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
     var ssc = new StreamingContext(master, framework, Seconds(1))
@@ -300,6 +303,9 @@ class CheckpointSuite extends TestSuiteBase {
     // Enable manual clock back again for other tests
     if (clockProperty != null)
       System.setProperty("spark.streaming.clock", clockProperty)
+
+    // Restore the default slack time
+    FileInputDStream.restoreSlackTime()
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8ca14a1e/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 62a9f12..e506c95 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,7 +23,7 @@ import akka.actor.IOManager
 import akka.actor.Props
 import akka.util.ByteString
 
-import org.apache.spark.streaming.dstream.{NetworkReceiver, SparkFlumeEvent}
+import org.apache.spark.streaming.dstream.{FileInputDStream, NetworkReceiver, SparkFlumeEvent}
 import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
 import java.io.{File, BufferedWriter, OutputStreamWriter}
 import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
@@ -152,6 +152,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     // Disable manual clock as FileInputDStream does not work with manual clock
     System.clearProperty("spark.streaming.clock")
 
+    // Disable slack time of file stream when testing with local file system
+    FileInputDStream.disableSlackTime()
+
     // Set up the streaming context and input streams
     val testDir = Files.createTempDir()
     val ssc = new StreamingContext(master, framework, batchDuration)
@@ -196,6 +199,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
     // Enable manual clock back again for other tests
     System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+
+    // Restore the default slack time
+    FileInputDStream.restoreSlackTime()
   }