You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by brkyvz <gi...@git.apache.org> on 2015/10/30 07:19:48 UTC

[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

GitHub user brkyvz opened a pull request:

    https://github.com/apache/spark/pull/9373

    [SPARK-11419][STREAMING] Par recovery

    The support for closing WriteAheadLog files after writes was just merged in. Closing every file after a write is a very expensive operation as it creates many small files on S3. It's not necessary to enable it on HDFS anyway.
    
    However, when you have many small files on S3, recovery takes very long. In addition, files start stacking up pretty quickly, and deletes may not be able to keep up, therefore deletes can also be parallelized.
    
    This PR adds support for the two parallelization steps mentioned above, in addition to a couple more failures I encountered during recovery. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/brkyvz/spark par-recovery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9373
    
----
commit 573b657bca5a77297cafbde489ba380b3372c81c
Author: Burak Yavuz <br...@gmail.com>
Date:   2015-10-29T20:54:02Z

    progress

commit 655f4bff61f1ffa21565c73eb0fe732c8ffada3e
Author: Burak Yavuz <br...@gmail.com>
Date:   2015-10-29T23:53:23Z

    ready for PR

commit 06da0d1389f658a1eb3e9aaaf1750fd7ad85567a
Author: Burak Yavuz <br...@gmail.com>
Date:   2015-10-30T06:12:41Z

    ready for PR

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155976711
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44596014
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,23 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def seqToParIterator[I, O](
    +      tpool: ThreadPoolExecutor,
    +      source: Seq[I],
    +      handler: I => Iterator[O]): Iterator[O] = {
    +    val taskSupport = new ThreadPoolTaskSupport(tpool)
    +    val groupSize = math.max(math.max(tpool.getCorePoolSize, tpool.getPoolSize), 8)
    --- End diff --
    
    And you need to `ThreadUtils.newDaemonCachedThreadPool(...,maxThreadNumber)` to set it. Actually, I think you can just set the `maxThreadNumber` to 8 and use it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155692990
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45614/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155269577
  
    **[Test build #45467 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45467/consoleFull)** for PR 9373 at commit [`c2cafe1`](https://github.com/apache/spark/commit/c2cafe1e948568bfc0e25657d21db0aebc3f32e4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155273239
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45456/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155252359
  
    @harishreedharan I couldn't test this on HDFS properly. Instead I enabled the parallelization only when `closeFileAfterWrite` is enabled, which is when you actually really need it. Does that sound okay to you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155556798
  
    **[Test build #45534 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45534/consoleFull)** for PR 9373 at commit [`83aa28e`](https://github.com/apache/spark/commit/83aa28e05de4874eebc89be11dce0a0b8213007e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155253537
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44601858
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -198,6 +197,45 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - seqToParIterator") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
    +     files. This causes recovery to take a very long time. In order to make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we limit the number of
    +     open files to the size of the number of threads in our thread pool rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = synchronized {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = synchronized { value.decrementAndGet() }
    +      def get(): Int = synchronized { value.get() }
    +      def getMax(): Int = synchronized { max }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    +      val counter = new GetMaxCounter()
    +      def handle(value: Int): Iterator[Int] = {
    +        new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
    +          counter.increment()
    +          override def completion() { counter.decrement() }
    +        }
    +      }
    +      val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle)
    +      assert(iterator.toSeq === testSeq)
    +      assert(counter.getMax() > 1) // make sure we are doing a parallel computation!
    --- End diff --
    
    Here is the code we discussed to fix:
    ```Scala
        try {
          val latch = new CountDownLatch(1)
          val testSeq = 1 to 1000
          val counter = new GetMaxCounter()
          def handle(value: Int): Iterator[Int] = {
            new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
              counter.increment()
              latch.await(10, TimeUnit.SECONDS)
              override def completion() { counter.decrement() }
            }
          }
          @volatile var collected: Seq[Int] = Nil
          val t = new Thread() {
            override def run() {
              val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle)
              collected = iterator.toSeq
            }
          }
          t.start()
          eventually(Eventually.timeout(10.seconds)) {
            // make sure we are doing a parallel computation!
            assert(counter.getMax() > 1)
          }
          latch.countDown()
          t.join(10000)
          assert(collected === testSeq)
          // make sure we didn't open too many Iterators
          assert(counter.getMax() <= numThreads)
        } finally {
          tpool.shutdownNow()
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155845851
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43475406
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala ---
    @@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
         if (eventLoop == null) return // scheduler has already been stopped
         logDebug("Stopping JobScheduler")
     
    -    // First, stop receiving
    -    receiverTracker.stop(processAllReceivedData)
    +    if (receiverTracker != null) {
    +      // First, stop receiving
    +      receiverTracker.stop(processAllReceivedData)
    --- End diff --
    
    NPE thrown when streaming context is stopped before recovery is complete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155905167
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155904930
  
    **[Test build #45668 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45668/consoleFull)** for PR 9373 at commit [`ccf7f5b`](https://github.com/apache/spark/commit/ccf7f5b56f822cc41345e29321106afbe5670b7f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155253553
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155254915
  
    **[Test build #45457 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45457/consoleFull)** for PR 9373 at commit [`0b7279f`](https://github.com/apache/spark/commit/0b7279fdda081e8f4557cc0fc0366331380e79e0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156270843
  
    **[Test build #45781 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45781/consoleFull)** for PR 9373 at commit [`79e9b03`](https://github.com/apache/spark/commit/79e9b03e55382d64607ee39ac1a66a102574409a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44595545
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -582,6 +620,9 @@ object WriteAheadLogSuite {
           allowBatching: Boolean): Seq[String] = {
         val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, allowBatching)
         val data = wal.readAll().asScala.map(byteBufferToString).toSeq
    +    // The thread pool for parallel recovery gets killed with wal.close(). Therefore we need to
    +    // eagerly compute data, otherwise the lazy computation will fail.
    +    data.length
    --- End diff --
    
    Could you just change `toSeq` to `toArray`? `toArray` will drain the Iterator at once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156013324
  
    **[Test build #45712 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45712/consoleFull)** for PR 9373 at commit [`dbb31e3`](https://github.com/apache/spark/commit/dbb31e372d178c70bb3a6f8b18c931ce0867d4b2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155976702
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156020249
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155516461
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9373


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152867985
  
    @harishreedharan Here are some benchmark results:
    For reference, the driver was a r3.2xlarge EC2 instance.
    
    ![image](https://cloud.githubusercontent.com/assets/5243515/10871515/54c14846-809e-11e5-91e6-2ac3605d98b7.png)
    
    |Num Threads |	Rate (ms / file) |	Speed-up|
    |-------------------|--------------------------|-------------------|
    |50|	5.556101934|	9.004997951|
    |25|	5.99898194|	8.340196225|
    |8|	8.692144733|	5.756080699|
    |4	|14.1162362	|3.544336169|
    |1	|50.03268653	|1|



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155690177
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155516430
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155677200
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152469638
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44666/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155981153
  
    @brkyvz Few more comments, and one pending comment from before about adding more unit tests.
    @zsxwing please take a look once again.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155848316
  
    **[Test build #45648 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45648/consoleFull)** for PR 9373 at commit [`1ba8340`](https://github.com/apache/spark/commit/1ba834000b40f0d4cf39be5972cb585f9bbb9006).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155266954
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45457/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155677122
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43949361
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala ---
    @@ -207,6 +207,87 @@ class ReceivedBlockTrackerSuite
         tracker1.isWriteAheadLogEnabled should be (false)
       }
     
    +  test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion error") {
    +    val manualClock = new ManualClock
    +    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
    +    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
    +    val tracker = createTracker(clock = manualClock)
    +
    +    val addBlocks = generateBlockInfos()
    +    val batch1 = addBlocks.slice(0, 1)
    +    val batch2 = addBlocks.slice(1, 3)
    +    val batch3 = addBlocks.slice(3, 6)
    +
    +    def advanceTime(): Unit = manualClock.advance(1000)
    +
    +    assert(getWriteAheadLogFiles().length === 0)
    +
    --- End diff --
    
    Can you added inline comments to explain each step, so that the reader can understand whats going on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155690192
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155266838
  
    **[Test build #45457 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45457/consoleFull)** for PR 9373 at commit [`0b7279f`](https://github.com/apache/spark/commit/0b7279fdda081e8f4557cc0fc0366331380e79e0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155987716
  
    **[Test build #45700 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45700/consoleFull)** for PR 9373 at commit [`7e1829b`](https://github.com/apache/spark/commit/7e1829b87e4809a2096b969cdb8de73f03afd616).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152469252
  
    **[Test build #44666 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44666/consoleFull)** for PR 9373 at commit [`7f8cfe3`](https://github.com/apache/spark/commit/7f8cfe340010e867ab73c40fc0ba39b0d0144695).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152441892
  
    **[Test build #44665 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44665/consoleFull)** for PR 9373 at commit [`be5a2ab`](https://github.com/apache/spark/commit/be5a2ab7f1dcd655eefea48918e445ed2bebad55).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155890842
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152893856
  
    Did you try HDFS? I am assuming we'd get similar speed ups there too but in
    that case there are far fewer files in which case the cost to setup the
    streams are paid only a handful of times.
    
    What I am wondering is if we'd actually ever have to deal with that many
    files in the non-S3 case. This adds the additional cost for HDFS or any
    other FS, no? In those cases the number of files usually would be pretty
    small, which may result in this being more expensive.
    
    If this adds only a small cost or if it becomes faster, then let's keep
    this.
    
    
    On Sunday, November 1, 2015, Burak Yavuz <notifications@github.com
    <javascript:_e(%7B%7D,'cvml','notifications@github.com');>> wrote:
    
    > @harishreedharan <https://github.com/harishreedharan> Here are some
    > benchmark results:
    > For reference, the driver was a r3.2xlarge EC2 instance.
    >
    > [image: image]
    > <https://cloud.githubusercontent.com/assets/5243515/10871515/54c14846-809e-11e5-91e6-2ac3605d98b7.png>
    > Num Threads Rate (ms / file) Speed-up 50 5.556101934 9.004997951 25
    > 5.99898194 8.340196225 8 8.692144733 5.756080699 4 14.1162362 3.544336169
    > 1 50.03268653 1
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/9373#issuecomment-152867985>.
    >
    
    
    -- 
    
    Thanks,
    Hari



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43948269
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog(
         val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    synchronized { pastLogs --= oldLogFiles }
    +    def deleteFile(walInfo: LogInfo): Unit = {
    +      try {
    +        val path = new Path(walInfo.path)
    +        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    +        fs.delete(path, true)
    +        logDebug(s"Cleared log file $walInfo")
    +      } catch {
    +        case ex: Exception =>
    +          logWarning(s"Error clearing write ahead log file $walInfo", ex)
           }
           logInfo(s"Cleared log files in $logDirectory older than $threshTime")
         }
    -    if (!executionContext.isShutdown) {
    -      val f = Future { deleteFiles() }
    -      if (waitForCompletion) {
    -        import scala.concurrent.duration._
    -        Await.ready(f, 1 second)
    +    oldLogFiles.foreach { logInfo =>
    +      if (!executionContext.isShutdown) {
    +        val f = Future { deleteFile(logInfo) }
    --- End diff --
    
    Again should not use the default execution context. please make a execution context for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152441896
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44665/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156159147
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155864044
  
    **[Test build #45650 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45650/consoleFull)** for PR 9373 at commit [`1ba8340`](https://github.com/apache/spark/commit/1ba834000b40f0d4cf39be5972cb585f9bbb9006).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155258652
  
    **[Test build #45467 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45467/consoleFull)** for PR 9373 at commit [`c2cafe1`](https://github.com/apache/spark/commit/c2cafe1e948568bfc0e25657d21db0aebc3f32e4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44617355
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -146,30 +154,32 @@ private[streaming] class FileBasedWriteAheadLog(
        * asynchronously.
        */
       def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
    -    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    val oldLogFiles = synchronized {
    +      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
    +      pastLogs --= expiredLogs
    +      expiredLogs
    +    }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    def deleteFile(walInfo: LogInfo): Unit = {
    --- End diff --
    
    `logInfo` is Spark's logging method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44616125
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
       private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
     
       private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
    -  implicit private val executionContext = ExecutionContext.fromExecutorService(
    -    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
    +  private val threadpool = ThreadUtils.newDaemonCachedThreadPool(threadpoolName)
    --- End diff --
    
    Set the max number of threads.
    http://dev.bizo.com/2014/06/cached-thread-pool-considered-harmlful.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44577472
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -199,6 +198,45 @@ class FileBasedWriteAheadLogSuite
     
    --- End diff --
    
    Since some of the changes were in `FileBasedWriteAheadLog*Reader`, these should be unit tested as well. Probably add it CommonWALTests. Simple unit test that writes to the log to get WAL segments, deletes the files and then tries to read it (similar to the test "handling file errors while reading rotating logs")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44615768
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -146,30 +154,32 @@ private[streaming] class FileBasedWriteAheadLog(
        * asynchronously.
        */
       def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
    -    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    val oldLogFiles = synchronized {
    +      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
    +      pastLogs --= expiredLogs
    +      expiredLogs
    +    }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    def deleteFile(walInfo: LogInfo): Unit = {
    +      try {
    +        val path = new Path(walInfo.path)
    +        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    +        fs.delete(path, true)
    +        logDebug(s"Cleared log file $walInfo")
    +      } catch {
    +        case ex: Exception =>
    +          logWarning(s"Error clearing write ahead log file $walInfo", ex)
           }
           logInfo(s"Cleared log files in $logDirectory older than $threshTime")
         }
    -    if (!executionContext.isShutdown) {
    -      val f = Future { deleteFiles() }
    -      if (waitForCompletion) {
    -        import scala.concurrent.duration._
    -        Await.ready(f, 1 second)
    +    oldLogFiles.foreach { logInfo =>
    --- End diff --
    
    nit: When waitForCompletion is true, this whole deletion is done one by one. 
    Might be better to compose all the Futures into a single Future to wait. 
    Not a problem really as waitForCompletion is true only in tests. Do this only if there is any other critical feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-154995112
  
    @brkyvz I think there has been issues with Hadoop 2 related stuff in the master branch. Lets talk offline on how to fix it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-154861924
  
    @harishreedharan I've been trying to test this patch, but I just couldn't set up HDFS to work with Spark using the spark-ec2 scripts. Could you please help me set up a cluster with HDFS so that I can benchmark this?
    Basically, I can get HDFS up and running on the cluster, but Spark can't access it.  get the following exception when I use Hadoop 2:
    ```
    scala> sc.parallelize(1 to 5).saveAsTextFile("hdfs:///trial")
    java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).; Host Details : local host is: "ip-172-31-13-113.us-west-2.compute.internal/172.31.13.113"; destination host is: "ec2-52-32-160-227.us-west-2.compute.amazonaws.com":9000;
    ```
    That looks like a Protobuf version incompatibility.
    
    I launched the ec2 instances using:
    ```
    ec2/spark-ec2 --spark-git-repo=https://github.com/brkyvz/spark --spark-version=fc2951f6530bde932a0bc97f430c6c360eb03209 -s 2 --spot-price=0.2 -t m4.large --no-ganglia -i ... -k ... -r us-west-2 --hadoop-major-version 2 launch burak-streaming-test-2
    ```
    
    I used to get the following when using `--hadoop-major-version 1`:
    ```
    java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: "ip-172-31-30-170.us-west-2.compute.internal/172.31.30.170"; destination host is: "ec2-52-32-211-30.us-west-2.compute.amazonaws.com":9000; 
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44595798
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,23 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def seqToParIterator[I, O](
    +      tpool: ThreadPoolExecutor,
    +      source: Seq[I],
    +      handler: I => Iterator[O]): Iterator[O] = {
    +    val taskSupport = new ThreadPoolTaskSupport(tpool)
    +    val groupSize = math.max(math.max(tpool.getCorePoolSize, tpool.getPoolSize), 8)
    --- End diff --
    
    You can just use `tpool.getMaximumPoolSize.max(8)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155273200
  
    **[Test build #45456 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45456/consoleFull)** for PR 9373 at commit [`98da092`](https://github.com/apache/spark/commit/98da092d14dfa3f9f95261fe1b307f0dda7a785f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44609375
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -198,6 +197,45 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - seqToParIterator") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
    +     files. This causes recovery to take a very long time. In order to make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we limit the number of
    +     open files to the size of the number of threads in our thread pool rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = synchronized {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = synchronized { value.decrementAndGet() }
    +      def get(): Int = synchronized { value.get() }
    +      def getMax(): Int = synchronized { max }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    +      val counter = new GetMaxCounter()
    +      def handle(value: Int): Iterator[Int] = {
    +        new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
    +          counter.increment()
    +          override def completion() { counter.decrement() }
    +        }
    +      }
    +      val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, Int](tpool, testSeq, handle)
    +      assert(iterator.toSeq === testSeq)
    +      assert(counter.getMax() > 1) // make sure we are doing a parallel computation!
    --- End diff --
    
    Make sure you add comments on what the latch does. This is a complicated test and needs inline comments to understand whats going in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152441407
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155556909
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45534/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156181873
  
    **[Test build #45747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45747/consoleFull)** for PR 9373 at commit [`a31822c`](https://github.com/apache/spark/commit/a31822c1762383f17f379c4794c29c8bbe3203fa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152441415
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155905168
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45668/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44608862
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,23 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def seqToParIterator[I, O](
    +      tpool: ThreadPoolExecutor,
    +      source: Seq[I],
    +      handler: I => Iterator[O]): Iterator[O] = {
    +    val taskSupport = new ThreadPoolTaskSupport(tpool)
    +    val groupSize = math.max(math.max(tpool.getCorePoolSize, tpool.getPoolSize), 8)
    --- End diff --
    
    Its not safe for the rest of the system to allow infinite number of thread. I think there should be limits. The limit can be a sparkconf that will not be exposed publicly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155867577
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45648/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156020207
  
    **[Test build #45712 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45712/consoleFull)** for PR 9373 at commit [`dbb31e3`](https://github.com/apache/spark/commit/dbb31e372d178c70bb3a6f8b18c931ce0867d4b2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44577308
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -199,6 +198,45 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - seqToParIterator") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
    +     files. This causes recovery to take a very long time. In order to make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we limit the number of
    +     open files to the size of the number of threads in our thread pool rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = synchronized {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = synchronized { value.decrementAndGet() }
    +      def get(): Int = synchronized { value.get() }
    +      def getMax(): Int = synchronized { max }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    --- End diff --
    
    Can you make this 1000 instead of 8 * 8. Just to make sure that we are splitting things right. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155252022
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155255570
  
    @brkyvz Could you update this PR with master? The batching PR got merged, creating conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152442342
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152442357
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155678727
  
    **[Test build #45601 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45601/consoleFull)** for PR 9373 at commit [`c9ea423`](https://github.com/apache/spark/commit/c9ea4238e63b579f42179b1da6c00140d9aa0c0f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155257362
  
    @brkyvz Sounds good, sir. I think the issue you saw seems to be a protobuf incompatibility issue - did you compile and run against the same hadoop-2 version (2.2+ ?)
    This patch now LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155685296
  
    **[Test build #45601 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45601/consoleFull)** for PR 9373 at commit [`c9ea423`](https://github.com/apache/spark/commit/c9ea4238e63b579f42179b1da6c00140d9aa0c0f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44507814
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,22 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def parallelIteratorCreator[I, O](
    --- End diff --
    
    Rename to `seqToParIterator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155685330
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45601/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43578032
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -17,6 +17,7 @@
     package org.apache.spark.streaming.util
     
     import java.nio.ByteBuffer
    +import java.util.concurrent.ConcurrentSkipListSet
    --- End diff --
    
    NoteToSelf: Remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155675180
  
    @zsxwing Added the "limited parallel reader"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44552651
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -199,6 +198,63 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - parallel readAll opens at most 'numThreads' files") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
    +     files. This causes recovery to take a very long time. In order to make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we limit the number of
    +     open files to the size of the number of threads in our thread pool rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = { value.decrementAndGet() }
    +      def get(): Int = value.get()
    +      def getMax(): Int = max
    +    }
    +    /**
    +     * We need an object that can be iterated through, which will increment our counter once
    +     * initialized, and decrement once closed. This way we can simulate how many "streams" will
    +     * be opened during a real use case.
    +     */
    +    class ReaderObject(cnt: GetMaxCounter, value: Int) extends Iterator[Int] with Closeable {
    +      cnt.increment()
    +      private var returnedValue: Boolean = false
    +      override def hasNext(): Boolean = !returnedValue
    +      override def next(): Int = {
    +        if (!returnedValue) {
    +          returnedValue = true
    +          value
    +        } else {
    +          -1
    +        }
    +      }
    +      override def close(): Unit = {
    +        cnt.decrement()
    +      }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    +      val counter = new GetMaxCounter()
    +      def handle(value: Int): Iterator[Int] = {
    +        val reader = new ReaderObject(counter, value)
    +        CompletionIterator[Int, Iterator[Int]](reader, reader.close)
    --- End diff --
    
    It's not only feasible, it's much more awesome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156162313
  
    **[Test build #45747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45747/consoleFull)** for PR 9373 at commit [`a31822c`](https://github.com/apache/spark/commit/a31822c1762383f17f379c4794c29c8bbe3203fa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156012247
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155685327
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155852068
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155890877
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155864114
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155692988
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152443118
  
    **[Test build #44666 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44666/consoleFull)** for PR 9373 at commit [`7f8cfe3`](https://github.com/apache/spark/commit/7f8cfe340010e867ab73c40fc0ba39b0d0144695).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155269679
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155269682
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45467/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43948149
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog(
         val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    synchronized { pastLogs --= oldLogFiles }
    --- End diff --
    
    You can merged these two synchronized blocks here and move the print later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156270988
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45781/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155867574
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156255342
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155853773
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155867385
  
    **[Test build #45648 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45648/consoleFull)** for PR 9373 at commit [`1ba8340`](https://github.com/apache/spark/commit/1ba834000b40f0d4cf39be5972cb585f9bbb9006).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155844004
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155891434
  
    **[Test build #45668 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45668/consoleFull)** for PR 9373 at commit [`ccf7f5b`](https://github.com/apache/spark/commit/ccf7f5b56f822cc41345e29321106afbe5670b7f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155266950
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155844066
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156182096
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45747/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155252000
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44353236
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog(
         val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    synchronized { pastLogs --= oldLogFiles }
    +    def deleteFile(walInfo: LogInfo): Unit = {
    +      try {
    +        val path = new Path(walInfo.path)
    +        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    +        fs.delete(path, true)
    +        logDebug(s"Cleared log file $walInfo")
    +      } catch {
    +        case ex: Exception =>
    +          logWarning(s"Error clearing write ahead log file $walInfo", ex)
           }
           logInfo(s"Cleared log files in $logDirectory older than $threshTime")
         }
    -    if (!executionContext.isShutdown) {
    -      val f = Future { deleteFiles() }
    -      if (waitForCompletion) {
    -        import scala.concurrent.duration._
    -        Await.ready(f, 1 second)
    +    oldLogFiles.foreach { logInfo =>
    +      if (!executionContext.isShutdown) {
    +        val f = Future { deleteFile(logInfo) }
    --- End diff --
    
    the execution context was defined implicitly in the class definition. Made it non-implicit for better readability


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155848421
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43587988
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -126,11 +127,11 @@ private[streaming] class FileBasedWriteAheadLog(
         val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
         logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
     
    -    logFilesToRead.iterator.map { file =>
    +    logFilesToRead.par.map { file =>
    --- End diff --
    
    @harishreedharan I did several benchmarks with this. In a setting where you would have a 30 minute window operation, which corresponds to ~9000 files on S3 with `closeFileAfterWrite = true`, the current code takes about 15 minutes to recover. With this very simple addition, this time is reduced to 1.5 minutes (on a driver with 4 CPUs, probably with hyper-threading)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43949180
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -366,6 +367,21 @@ object WriteAheadLogSuite {
         segments
       }
     
    +  /**
    +   * Write received block tracker events to a file using the writer class and return an array of
    +   * the file segments written.
    +   */
    +  def writeEventsUsingWriter(
    --- End diff --
    
    This method is related to `ReceivedBlockTracker` and should not be in this file, as this file is totally independent. Move this method to `ReceivedBlockTrackerSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155855744
  
    **[Test build #45650 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45650/consoleFull)** for PR 9373 at commit [`1ba8340`](https://github.com/apache/spark/commit/1ba834000b40f0d4cf39be5972cb585f9bbb9006).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44509356
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -199,6 +198,63 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - parallel readAll opens at most 'numThreads' files") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
    +     files. This causes recovery to take a very long time. In order to make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we limit the number of
    +     open files to the size of the number of threads in our thread pool rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = { value.decrementAndGet() }
    +      def get(): Int = value.get()
    +      def getMax(): Int = max
    +    }
    +    /**
    +     * We need an object that can be iterated through, which will increment our counter once
    +     * initialized, and decrement once closed. This way we can simulate how many "streams" will
    +     * be opened during a real use case.
    +     */
    +    class ReaderObject(cnt: GetMaxCounter, value: Int) extends Iterator[Int] with Closeable {
    +      cnt.increment()
    +      private var returnedValue: Boolean = false
    +      override def hasNext(): Boolean = !returnedValue
    +      override def next(): Int = {
    +        if (!returnedValue) {
    +          returnedValue = true
    +          value
    +        } else {
    +          -1
    +        }
    +      }
    +      override def close(): Unit = {
    +        cnt.decrement()
    +      }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    +      val counter = new GetMaxCounter()
    +      def handle(value: Int): Iterator[Int] = {
    +        val reader = new ReaderObject(counter, value)
    +        CompletionIterator[Int, Iterator[Int]](reader, reader.close)
    --- End diff --
    
    Why do you need the `ReaderObject`? Isnt something like this feasible. 
    ```
    def handle(value: Int): Iterator[Int] = {
       new CompletionIterator(Iterator(1)) {
             cnt.increment()
             override def completion() { cnt.decrement() }
        }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155853826
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156012262
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43948049
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -126,11 +127,11 @@ private[streaming] class FileBasedWriteAheadLog(
         val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
         logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
     
    -    logFilesToRead.iterator.map { file =>
    +    logFilesToRead.par.map { file =>
    --- End diff --
    
    Which threadpool / execution context is this par operation going to? You must not use the default system execution. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155848423
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45647/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by harishreedharan <gi...@git.apache.org>.
Github user harishreedharan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43578480
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -126,11 +127,11 @@ private[streaming] class FileBasedWriteAheadLog(
         val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
         logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
     
    -    logFilesToRead.iterator.map { file =>
    +    logFilesToRead.par.map { file =>
    --- End diff --
    
    This is an expensive operation - you'd end up running an O(n) operation to create a copy (in addition to the copy cost). Do we really need this? I am not entirely sure the copying adds a whole lot of value, considering that this array is not going to be very huge. Also note the additional cost to spin up threads (if the context does not already have them spun up). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44507368
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,22 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def parallelIteratorCreator[I, O](
    +      threadpool: ThreadPoolExecutor,
    +      source: Seq[I],
    +      handler: I => Iterator[O]): Iterator[O] = {
    +    val taskSupport = new ThreadPoolTaskSupport(threadpool)
    +    source.grouped(threadpool.getCorePoolSize).flatMap { element =>
    --- End diff --
    
    Actually, my bad. `grouped` creates groups of n, not n groups. We want n groups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155978604
  
    **[Test build #45700 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45700/consoleFull)** for PR 9373 at commit [`7e1829b`](https://github.com/apache/spark/commit/7e1829b87e4809a2096b969cdb8de73f03afd616).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156020250
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45712/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152469636
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156159178
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156270985
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155845811
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43475449
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -149,27 +150,26 @@ private[streaming] class FileBasedWriteAheadLog(
         val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    synchronized { pastLogs --= oldLogFiles }
    --- End diff --
    
    I have a test to make sure that even if the delete is not successful, the recovery is robust, and will delete the file once the next cleanup request is sent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155258460
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by brkyvz <gi...@git.apache.org>.
Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44606410
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,23 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def seqToParIterator[I, O](
    +      tpool: ThreadPoolExecutor,
    +      source: Seq[I],
    +      handler: I => Iterator[O]): Iterator[O] = {
    +    val taskSupport = new ThreadPoolTaskSupport(tpool)
    +    val groupSize = math.max(math.max(tpool.getCorePoolSize, tpool.getPoolSize), 8)
    --- End diff --
    
    I actually don't want to limit the number of threads in `ThreadUtils.newDaemonCachedThreadPool`. I believe it spawns as many threads as there are needed (deletion may need many). But if there are none, I want at least `8` threads


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155273237
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44508920
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -251,4 +261,22 @@ private[streaming] object FileBasedWriteAheadLog {
           }
         }.sortBy { _.startTime }
       }
    +
    +  /**
    +   * This creates an iterator from a parallel collection, by keeping at most `n` objects in memory
    +   * at any given time, where `n` is the size of the thread pool. This is crucial for use cases
    +   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. We don't want to
    +   * open up `k` streams altogether where `k` is the size of the Seq that we want to parallelize.
    +   */
    +  def parallelIteratorCreator[I, O](
    +      threadpool: ThreadPoolExecutor,
    +      source: Seq[I],
    +      handler: I => Iterator[O]): Iterator[O] = {
    +    val taskSupport = new ThreadPoolTaskSupport(threadpool)
    +    source.grouped(threadpool.getCorePoolSize).flatMap { element =>
    --- End diff --
    
    Nvm, i see you have implemented it differently than what we discussed. Seems to work. In fact this is more evenly load balanced i think. Just change `element` to `group`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155252283
  
    **[Test build #45456 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45456/consoleFull)** for PR 9373 at commit [`98da092`](https://github.com/apache/spark/commit/98da092d14dfa3f9f95261fe1b307f0dda7a785f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155258437
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156182094
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44508962
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -199,6 +198,63 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - parallel readAll opens at most 'numThreads' files") {
    --- End diff --
    
    This really does not test file reading, just the utility function. Make the name only the name of the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155987822
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44615524
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -146,30 +154,32 @@ private[streaming] class FileBasedWriteAheadLog(
        * asynchronously.
        */
       def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
    -    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    val oldLogFiles = synchronized {
    +      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
    +      pastLogs --= expiredLogs
    +      expiredLogs
    +    }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    def deleteFile(walInfo: LogInfo): Unit = {
    --- End diff --
    
    nit: why rename this to walInfo?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r43949437
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala ---
    @@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
               logDebug("Error reading next item, EOF reached", e)
               close()
               false
    +        case e: IOException =>
    +          logWarning("Error while trying to read data. If the file was deleted, " +
    +            "this should be okay.", e)
    +          close()
    +          if (HdfsUtils.checkFileExists(path, conf)) {
    +            // if file exists, this could be a legitimate error
    +            throw e
    +          } else {
    +            // file was deleted. This can occur when the daemon cleanup thread takes time to
    --- End diff --
    
    super nit: file -> File


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156256683
  
    **[Test build #45781 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45781/consoleFull)** for PR 9373 at commit [`79e9b03`](https://github.com/apache/spark/commit/79e9b03e55382d64607ee39ac1a66a102574409a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152441681
  
    **[Test build #44665 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44665/consoleFull)** for PR 9373 at commit [`be5a2ab`](https://github.com/apache/spark/commit/be5a2ab7f1dcd655eefea48918e445ed2bebad55).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156255395
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44507453
  
    --- Diff: streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala ---
    @@ -199,6 +198,63 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - parallel readAll opens at most 'numThreads' files") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating a very large number of
    +     files. This causes recovery to take a very long time. In order to make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we limit the number of
    +     open files to the size of the number of threads in our thread pool rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = {
    --- End diff --
    
    I think this needs synchronization across both max and value. Otherwise there are race conditions. Put increment and decrement in `synchronized`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44615540
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -146,30 +154,32 @@ private[streaming] class FileBasedWriteAheadLog(
        * asynchronously.
        */
       def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
    -    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } }
    +    val oldLogFiles = synchronized {
    +      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
    +      pastLogs --= expiredLogs
    +      expiredLogs
    +    }
         logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " +
           s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
    -
    -    def deleteFiles() {
    -      oldLogFiles.foreach { logInfo =>
    -        try {
    -          val path = new Path(logInfo.path)
    -          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
    -          fs.delete(path, true)
    -          synchronized { pastLogs -= logInfo }
    -          logDebug(s"Cleared log file $logInfo")
    -        } catch {
    -          case ex: Exception =>
    -            logWarning(s"Error clearing write ahead log file $logInfo", ex)
    -        }
    +    def deleteFile(walInfo: LogInfo): Unit = {
    --- End diff --
    
    nit: empty line missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155556908
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155864120
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45650/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155987823
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45700/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-155520722
  
    **[Test build #45534 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45534/consoleFull)** for PR 9373 at commit [`83aa28e`](https://github.com/apache/spark/commit/83aa28e05de4874eebc89be11dce0a0b8213007e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Par recovery

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-152441894
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11419][STREAMING] Parallel recovery for...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/9373#issuecomment-156293316
  
    LGTM. Merging this to master and 1.6. Thanks @brkyvz, @zsxwing and @harishreedharan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org