You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2014/12/25 11:22:41 UTC

[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/3801

    [SPARK-1600] Refactor FileInputStream tests to remove Thread.sleep() calls and SystemClock usage

    This PR refactors Spark Streaming's FileInputStream tests to remove uses of Thread.sleep() and SystemClock, which should hopefully resolve some longstanding flakiness in these tests (see SPARK-1600).
    
    Key changes:
    
    - Modify FileInputDStream to use the scheduler's Clock instead of System.currentTimeMillis(); this allows it to be tested using ManualClock.
    - Fix a synchronization issue in ManualClock's `currentTime` method.
    - Add a StreamingTestWaiter class which allows callers to block until a certain number of batches have finished.
    - Change the FileInputStream tests so that files' modification times are manually set based off of ManualClock; this eliminates many Thread.sleep calls.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark SPARK-1600

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/3801.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3801
    
----
commit a95ddc41f2b10b57fa18e75c865d7ef4507cd771
Author: Josh Rosen <jo...@databricks.com>
Date:   2014-12-16T08:50:01Z

    Modify FileInputDStream to use Clock class.

commit 3c3efc3f75521020f482d56b41465a6373448cf5
Author: Josh Rosen <jo...@databricks.com>
Date:   2014-12-17T01:40:35Z

    Synchronize `currentTime` in ManualClock

commit dda1403f3eaabe9125b87ac45ac3e7b0d667e9de
Author: Josh Rosen <jo...@databricks.com>
Date:   2014-12-25T09:03:00Z

    Add StreamingTestWaiter class.

commit d4f2d87729b20f1060d456a6074f2da6a4e79cb3
Author: Josh Rosen <jo...@databricks.com>
Date:   2014-12-25T09:03:54Z

    Refactor file input stream tests to not rely on SystemClock.

commit c8f06b10431c555dab3be461622d5d96aa807685
Author: Josh Rosen <jo...@databricks.com>
Date:   2014-12-25T10:14:06Z

    Remove Thread.sleep calls in FileInputStream CheckpointSuite test.
    
    Hopefully this will fix SPARK-1600.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22449487
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
    @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     }
     
     /**
    + * This is an interface that can be used to block until certain events occur, such as
    + * the start/completion of batches.  This is much less brittle than waiting on wall-clock time.
    + * Internally, this is implemented using a StreamingListener.  Constructing a new instance of this
    + * class automatically registers a StreamingListener on the given StreamingContext.
    + */
    +class StreamingTestWaiter(ssc: StreamingContext) {
    +
    +  // All access to this state should be guarded by `StreamingTestWaiter.this.synchronized`
    +  private var numCompletedBatches = 0
    +  private var numStartedBatches = 0
    +
    +  private val listener = new StreamingListener {
    +    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numStartedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numCompletedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +  }
    +  ssc.addStreamingListener(listener)
    +
    +  def getNumCompletedBatches: Int = this.synchronized {
    +    numCompletedBatches
    +  }
    +
    +  def getNumStartedBatches: Int = this.synchronized {
    +    numStartedBatches
    +  }
    +
    +  /**
    +   * Block until the number of completed batches reaches the given threshold.
    +   */
    +  def waitForTotalBatchesCompleted(
    --- End diff --
    
    Actually, even this might be overkill: I can just use vanilla `eventually` since we're still blocking on a condition to occur and not relying on real-clock time; I guess using `wait` instead of `sleep` is just an optimization that might save a small amount of test time, but it's not related to flakiness (the goal of this PR).  Therefore, I'll just remove all of this in favor of `eventually` since then I'll benefit from ScalaTest's nice assertion macros.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68097807
  
    These changes are split off from #3687, a larger PR of mine which tried to remove all uses of Thread.sleep() in the streaming tests.
    
    It may look like there are a lot of changes here, but most of that is due to indentation changes when I modified tests to use the `withStreamingContext` fixture.
    
    /cc @tdas for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436115
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
    @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     }
     
     /**
    + * This is an interface that can be used to block until certain events occur, such as
    --- End diff --
    
    This is not an interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22447724
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    -    ssc.stop()
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
     
    -    // Verify whether files created while the driver was down have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Recover context from checkpoint file and verify whether the files that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear to have time-traveled:
    --- End diff --
    
    Oh, a bad style habit of mine that I still sometimes fall into.  I'll fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436087
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    --- End diff --
    
    I think this error statement is wrong. Its not after restart, its the first start. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22447683
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    -    ssc.stop()
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
     
    -    // Verify whether files created while the driver was down have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Recover context from checkpoint file and verify whether the files that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear to have time-traveled:
    +        clock = {
    +          val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +          newClock.setTime(clock.currentTime())
    +          newClock
    +        }
    +        val waiter = new StreamingTestWaiter(ssc)
    --- End diff --
    
    If you're referring to 334-336 in the original diff, that got rewritten to the more compact
    
    ```
     assert(recordedFiles(ssc) === Seq(1, 2, 3))
    ```
    
    which appears below.  It looks like the test on line 334 checks right after creating the restarted streaming context and I think I've preserved that here on line 363 in the new file.  Am I overlooking something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68838737
  
    I am merging this. Thanks @JoshRosen for this humongous effort!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22447733
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
    @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     }
     
     /**
    + * This is an interface that can be used to block until certain events occur, such as
    --- End diff --
    
    I'll fix this up, along with my "two spaces after periods" style in the block comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22271205
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    --- End diff --
    
    I factored some of the common code out here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22449238
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
    @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     }
     
     /**
    + * This is an interface that can be used to block until certain events occur, such as
    + * the start/completion of batches.  This is much less brittle than waiting on wall-clock time.
    + * Internally, this is implemented using a StreamingListener.  Constructing a new instance of this
    + * class automatically registers a StreamingListener on the given StreamingContext.
    + */
    +class StreamingTestWaiter(ssc: StreamingContext) {
    +
    +  // All access to this state should be guarded by `StreamingTestWaiter.this.synchronized`
    +  private var numCompletedBatches = 0
    +  private var numStartedBatches = 0
    +
    +  private val listener = new StreamingListener {
    +    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numStartedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numCompletedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +  }
    +  ssc.addStreamingListener(listener)
    +
    +  def getNumCompletedBatches: Int = this.synchronized {
    +    numCompletedBatches
    +  }
    +
    +  def getNumStartedBatches: Int = this.synchronized {
    +    numStartedBatches
    +  }
    +
    +  /**
    +   * Block until the number of completed batches reaches the given threshold.
    +   */
    +  def waitForTotalBatchesCompleted(
    --- End diff --
    
    Actually, something even simpler (since this is just for test code): I can just copy ScalaTest's `eventually` and modify it to use synchronize / notify instead of polling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68813139
  
      [Test build #25071 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25071/consoleFull) for   PR 3801 at commit [`8340bd0`](https://github.com/apache/spark/commit/8340bd0ab50209316202237a9aee7be619a0b922).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22447721
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    --- End diff --
    
    This actually looks like a typo in my refactoring; thanks for catching it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22271355
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    --- End diff --
    
    I think that there was a bug here in the old tests: here, this is the `outputStream` from the stopped SparkContext, not the restarted one.  The new tests' use of the `withSparkContext` fixture reduces the scope of these variables and prevents this type of error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68619619
  
    @JoshRosen I did a pass and I had some comments. Overall it looks pretty good. Addressing those comments should make it good enough. 
    
    Could you tell me how much is the speed up by converting the unit tests from real clock to manual clock?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22491068
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -319,102 +318,141 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // Make value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        // Advance half a batch so that the first file is created after the StreamingContext starts
    +        clock.addToTime(batchDuration.milliseconds / 2)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            eventually(timeout(batchDuration * 5)) {
    +              assert(waiter.getNumCompletedBatches === i)
    +            }
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        eventually(timeout(batchDuration * 5)) {
    +          assert(waiter.getNumStartedBatches === 3)
    +        }
    +        assert(waiter.getNumCompletedBatches === 2)
    +        logInfo("Output after first start = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed before restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    -    ssc.stop()
    +      // The original StreamingContext has now been stopped.
     
    -    // Verify whether files created while the driver was down have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
    +
    +      // Recover context from checkpoint file and verify whether the files that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear to have time-traveled
    +        clock = {
    +          val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +          newClock.setTime(clock.currentTime())
    +          newClock
    +        }
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    +        // Check that we remember files that were recorded before the restart
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +
    +        // Restart stream computation
    +        ssc.start()
    +        clock.addToTime(batchDuration.milliseconds)
    +        for ((i, index) <- Seq(7, 8, 9).zipWithIndex) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          eventually(timeout(batchDuration * 5)) {
    +            assert(waiter.getNumCompletedBatches === index + 1)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created while the driver was down (4, 5, 6) and files created after
    +        // recovery (7, 8, 9) have been recorded
    +        assert(recordedFiles(ssc) === (1 to 9))
    +
    +        // Append the new output to the old buffer
    +        outputBuffer ++= outputStream.output
    +
    +        val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    +        logInfo("--------------------------------")
    +        logInfo(s"output, size = ${outputBuffer.size}")
    +        outputBuffer.foreach(x => logInfo(s"[${x.mkString(",")}]"))
    +        logInfo(s"expected output, size = ${expectedOutput.size}")
    +        expectedOutput.foreach(x => logInfo(s"[$x]"))
    +        logInfo("--------------------------------")
    +
    +        // Verify whether all the elements received are as expected
    +        val output = outputBuffer.flatMap(x => x)
    +        assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    +        output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    --- End diff --
    
    Can use `toSet` and set comparison methods here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68813146
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25071/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22448279
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    -    ssc.stop()
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
     
    -    // Verify whether files created while the driver was down have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Recover context from checkpoint file and verify whether the files that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear to have time-traveled:
    +        clock = {
    +          val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +          newClock.setTime(clock.currentTime())
    +          newClock
    +        }
    +        val waiter = new StreamingTestWaiter(ssc)
    --- End diff --
    
    Yeah, you are right! Mind adding a comment on the line saying that so that I dont get confused again :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68683294
  
      [Test build #25052 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25052/consoleFull) for   PR 3801 at commit [`b4442c3`](https://github.com/apache/spark/commit/b4442c3538ad462a0a7d39f4b2049ed230e92665).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68100836
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24820/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436099
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    Since the earlier unit test was running in real time, this Thread.sleep was to ensure that the time had progressed since the `ssc.start()` so that the first file that was generated has modtime > context start time. But that is not necessary any more since you are using manual clock and explicitly setting the mod time of the file. So I believe this can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436103
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    -    ssc.stop()
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
     
    -    // Verify whether files created while the driver was down have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Recover context from checkpoint file and verify whether the files that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear to have time-traveled:
    --- End diff --
    
    Why `:` at the end?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68688395
  
      [Test build #25052 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25052/consoleFull) for   PR 3801 at commit [`b4442c3`](https://github.com/apache/spark/commit/b4442c3538ad462a0a7d39f4b2049ed230e92665).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22448937
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    I think that my original thought here was that it somehow mattered whether the underlying Spark job had started, with the idea that the batch start event could happen before the corresponding Spark core events.  But from Streaming's perspective, I guess this doesn't really make sense since batch-level metadata and bookkeeping operations would be tied to the Streaming start events and not the Spark ones.
    
    As you've suggested, I can just replace this with an assert on the number of completed batches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/3801


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68684233
  
    Pushed some commits addressing most of the feedback, but I'm still struggling to remove that last `Thread.sleep(1000)`.  I think that the problem here is that the writing of the checkpoint is asynchronous and without the sleep, we wind up in a state where batch 3 has started processing but has not finished, and the StreamingContext shuts down before a snapshot including batch 3's file info is written.  I plan to dig into this tomorrow to see whether this is actually the case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68756791
  
    Right, that is probably the case. The checkpointing is asynchronous, and there isnt any callback hook for that, which we can use to wait on. We could wait for the expected checkpoint file to appear in the directory. That will make it deterministic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436093
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    --- End diff --
    
    nit: Please rename this "Output after first start = "


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22271345
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    This Thread.sleep call was to ensure that we're actually inside a `fileStream.map` task when the context shuts down.  This might not be necessary given the `waitForTotalBatchesStarted` call on the previous line.  AFAIK, the reason why I had this here was that `waitForTotalBatchesStarted` might fire after streaming starts the batch but before the underlying Spark jobs start.  If we do need to block until Spark itself actually begins the processing, then that could be a little trickier.  I suppose I could just use a SparkListener for that.
    
    I toyed around with the idea of using a semaphore, but that's tricky because you want to ensure that the task and the caller have the same Semaphore object in the JVM.  Since the task is serialized, you effectively have to have a global object with a hashmap that holds the semaphore, which is a bit messy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436035
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    +        assert(outputStream.output.size > 0, "No files processed after restart")
    +        ssc.stop()
    +
    +        // Verify whether files created have been recorded correctly or not
    +        assert(recordedFiles(ssc) === Seq(1, 2, 3))
    +      }
     
    -    // Restart stream computation
    -    ssc.start()
    -    for (i <- Seq(7, 8, 9)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    -    }
    -    Thread.sleep(1000)
    -    logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    -    assert(outputStream.output.size > 0, "No files processed after restart")
    -    ssc.stop()
    +      // Create files while the streaming driver is down
    +      for (i <- Seq(4, 5, 6)) {
    +        writeFile(i, clock)
    +        clock.addToTime(1000)
    +      }
     
    -    // Verify whether files created while the driver was down have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
    -
    -    // Verify whether new files created after recover have been recorded or not
    -    assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
    -
    -    // Append the new output to the old buffer
    -    outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
    -    outputBuffer ++= outputStream.output
    -
    -    val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
    -    logInfo("--------------------------------")
    -    logInfo("output, size = " + outputBuffer.size)
    -    outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
    -    logInfo("expected output, size = " + expectedOutput.size)
    -    expectedOutput.foreach(x => logInfo("[" + x + "]"))
    -    logInfo("--------------------------------")
    -
    -    // Verify whether all the elements received are as expected
    -    val output = outputBuffer.flatMap(x => x)
    -    assert(output.contains(6))  // To ensure that the 3rd input (i.e., 3) was processed
    -    output.foreach(o =>         // To ensure all the inputs are correctly added cumulatively
    -      assert(expectedOutput.contains(o), "Expected value " + o + " not found")
    -    )
    -    // To ensure that all the inputs were received correctly
    -    assert(expectedOutput.last === output.last)
    -    Utils.deleteRecursively(testDir)
    +      // Recover context from checkpoint file and verify whether the files that were
    +      // recorded before failure were saved and successfully recovered
    +      logInfo("*********** RESTARTING ************")
    +      withStreamingContext(new StreamingContext(checkpointDir)) { ssc =>
    +        // Copy over the time from the old clock so that we don't appear to have time-traveled:
    +        clock = {
    +          val newClock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +          newClock.setTime(clock.currentTime())
    +          newClock
    +        }
    +        val waiter = new StreamingTestWaiter(ssc)
    --- End diff --
    
    I dont see the tests in earlier lines 334-336 here. Those test whether after recovering from checkpoint the files are still remember. Although this gets checked eventually by the output, but these test actually fails fast pinpointing the problem. Would be good put them back. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68821073
  
      [Test build #25086 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25086/consoleFull) for   PR 3801 at commit [`e4494f4`](https://github.com/apache/spark/commit/e4494f41ed5931c01b2845418e7705dfd19ba9fb).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436118
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    Also I think its better to use `waiter.waitForTotalBatchesCompleted()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68824923
  
      [Test build #25086 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25086/consoleFull) for   PR 3801 at commit [`e4494f4`](https://github.com/apache/spark/commit/e4494f41ed5931c01b2845418e7705dfd19ba9fb).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68100834
  
      [Test build #24820 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24820/consoleFull) for   PR 3801 at commit [`c8f06b1`](https://github.com/apache/spark/commit/c8f06b10431c555dab3be461622d5d96aa807685).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68688402
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25052/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68097905
  
      [Test build #24820 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24820/consoleFull) for   PR 3801 at commit [`c8f06b1`](https://github.com/apache/spark/commit/c8f06b10431c555dab3be461622d5d96aa807685).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68824928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25086/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/3801#issuecomment-68806849
  
      [Test build #25071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25071/consoleFull) for   PR 3801 at commit [`8340bd0`](https://github.com/apache/spark/commit/8340bd0ab50209316202237a9aee7be619a0b922).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22449114
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
    @@ -104,6 +106,77 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
     }
     
     /**
    + * This is an interface that can be used to block until certain events occur, such as
    + * the start/completion of batches.  This is much less brittle than waiting on wall-clock time.
    + * Internally, this is implemented using a StreamingListener.  Constructing a new instance of this
    + * class automatically registers a StreamingListener on the given StreamingContext.
    + */
    +class StreamingTestWaiter(ssc: StreamingContext) {
    +
    +  // All access to this state should be guarded by `StreamingTestWaiter.this.synchronized`
    +  private var numCompletedBatches = 0
    +  private var numStartedBatches = 0
    +
    +  private val listener = new StreamingListener {
    +    override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numStartedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +    override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit =
    +      StreamingTestWaiter.this.synchronized {
    +        numCompletedBatches += 1
    +        StreamingTestWaiter.this.notifyAll()
    +      }
    +  }
    +  ssc.addStreamingListener(listener)
    +
    +  def getNumCompletedBatches: Int = this.synchronized {
    +    numCompletedBatches
    +  }
    +
    +  def getNumStartedBatches: Int = this.synchronized {
    +    numStartedBatches
    +  }
    +
    +  /**
    +   * Block until the number of completed batches reaches the given threshold.
    +   */
    +  def waitForTotalBatchesCompleted(
    --- End diff --
    
    It occurred to me that this might be misleadingly-named since it waits until _at least_ that many batches have been processed.  To avoid this naming issue, plus a proliferation of similar methods, I might be able to just introduce a helper class that encapsulates this "synchronize on an object and wait for a condition involving it to become true" pattern.
    
    I'm imagining that it could look something vaguely like
    
    ```scala
    def waitUntil[T](obj: T, condition: T => Boolean, timeout: Long): Unit = {
      obj.synchronized {
         while(!condition(obj)) {
          [...] // do the wait() logic here
         }
       }
    }
    
    that encapsulates this wait / notify pattern, so could write something like
    
    ```scala
    waitUntil(waiter, _.completedBatches > 2, timeout, seconds(10)
    ```
    
    Or, with an implicit conversion, something like
    
    ```
    waiter.waitUntil(_.completedBatches > 2, timeout=seconds(10))
    ```
    
    which is a nice-looking syntax and avoids those issues of having to name inequalities.
    
    Similar to [your suggestion](https://github.com/apache/spark/pull/3868/files#r22447509) on another PR, we could add a `pollUntil` method that works for objects that don't support monitor notification / synchronization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22436357
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    Since we want to make sure that the stop is made before the 3rd batch has completed, lets put an assert after the stop that only 2 batches have been completed (maybe some more method in the waiter) and that the output has only two values (1 and 3). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22447710
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    +        logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
    --- End diff --
    
    Good idea; I'll take a pass through the logging lines to try to clean them up (I think I pretty much copied them verbatim from the old code).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1600] Refactor FileInputStream tests to...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3801#discussion_r22449971
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
    @@ -281,102 +279,130 @@ class CheckpointSuite extends TestSuiteBase {
       // failure, are re-processed or not.
       test("recovery with file input stream") {
         // Set up the streaming context and input streams
    +    val batchDuration = Seconds(2)  // Due to 1-second resolution of setLastModified() on some OS's.
         val testDir = Utils.createTempDir()
    -    var ssc = new StreamingContext(master, framework, Seconds(1))
    -    ssc.checkpoint(checkpointDir)
    -    val fileStream = ssc.textFileStream(testDir.toString)
    -    // Making value 3 take large time to process, to ensure that the master
    -    // shuts down in the middle of processing the 3rd batch
    -    val mappedStream = fileStream.map(s => {
    -      val i = s.toInt
    -      if (i == 3) Thread.sleep(2000)
    -      i
    -    })
    -
    -    // Reducing over a large window to ensure that recovery from master failure
    -    // requires reprocessing of all the files seen before the failure
    -    val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
    -    val outputBuffer = new ArrayBuffer[Seq[Int]]
    -    var outputStream = new TestOutputStream(reducedStream, outputBuffer)
    -    outputStream.register()
    -    ssc.start()
    -
    -    // Create files and advance manual clock to process them
    -    // var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    -    Thread.sleep(1000)
    -    for (i <- Seq(1, 2, 3)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      // wait to make sure that the file is written such that it gets shown in the file listings
    -      Thread.sleep(1000)
    +    val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
    +
    +    def writeFile(i: Int, clock: ManualClock): Unit = {
    +      val file = new File(testDir, i.toString)
    +      Files.write(i + "\n", file, Charsets.UTF_8)
    +      assert(file.setLastModified(clock.currentTime()))
    +      // Check that the file's modification date is actually the value we wrote, since rounding or
    +      // truncation will break the test:
    +      assert(file.lastModified() === clock.currentTime())
         }
    -    logInfo("Output = " + outputStream.output.mkString(","))
    -    assert(outputStream.output.size > 0, "No files processed before restart")
    -    ssc.stop()
     
    -    // Verify whether files created have been recorded correctly or not
    -    var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    def recordedFiles = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    -
    -    // Create files while the master is down
    -    for (i <- Seq(4, 5, 6)) {
    -      Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
    -      Thread.sleep(1000)
    +    def recordedFiles(ssc: StreamingContext): Seq[Int] = {
    +      val fileInputDStream =
    +        ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    +      val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
    +      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
         }
     
    -    // Recover context from checkpoint file and verify whether the files that were
    -    // recorded before failure were saved and successfully recovered
    -    logInfo("*********** RESTARTING ************")
    -    ssc = new StreamingContext(checkpointDir)
    -    fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
    -    assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
    -    assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
    +    try {
    +      // This is a var because it's re-assigned when we restart from a checkpoint:
    +      var clock: ManualClock = null
    +      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
    +        ssc.checkpoint(checkpointDir)
    +        clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
    +        val waiter = new StreamingTestWaiter(ssc)
    +        val fileStream = ssc.textFileStream(testDir.toString)
    +        // MKW value 3 take a large time to process, to ensure that the driver
    +        // shuts down in the middle of processing the 3rd batch
    +        val mappedStream = fileStream.map(s => {
    +          val i = s.toInt
    +          if (i == 3) Thread.sleep(4000)
    +          i
    +        })
    +
    +        // Reducing over a large window to ensure that recovery from driver failure
    +        // requires reprocessing of all the files seen before the failure
    +        val reducedStream = mappedStream.reduceByWindow(_ + _, batchDuration * 30, batchDuration)
    +        val outputStream = new TestOutputStream(reducedStream, outputBuffer)
    +        outputStream.register()
    +        ssc.start()
    +
    +        clock.addToTime(batchDuration.milliseconds)
    +        // Create files and advance manual clock to process them
    +        for (i <- Seq(1, 2, 3)) {
    +          writeFile(i, clock)
    +          clock.addToTime(batchDuration.milliseconds)
    +          if (i != 3) {
    +            // Since we want to shut down while the 3rd batch is processing
    +            waiter.waitForTotalBatchesCompleted(i, batchDuration * 5)
    +          }
    +        }
    +        clock.addToTime(batchDuration.milliseconds)
    +        waiter.waitForTotalBatchesStarted(3, batchDuration * 5)
    +        Thread.sleep(1000) // To wait for execution to actually begin
    --- End diff --
    
    I tried removing this sleep and it broke the test; I guess that maybe there is a delay between the time that we get the batch started event and the time that the file is recorded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org