You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sadhen <gi...@git.apache.org> on 2018/08/31 12:18:16 UTC

[GitHub] spark pull request #22304: [SPARK-25297][Streaming][Test] Fix blocking unit ...

GitHub user sadhen opened a pull request:

    https://github.com/apache/spark/pull/22304

    [SPARK-25297][Streaming][Test] Fix blocking unit tests for Scala 2.12

    ## What changes were proposed in this pull request?
    
    Customize ExecutorContext's reporter to fix blocking unit tests for Scala 2.12
    
    ## How was this patch tested?
    ```
    ./dev/change-scala-version.sh 2.12
    $ sbt -Dscala-2.12
    > ++2.12.6
    > project streaming
    > testOnly *FileBasedWriteAheadLogWithFileCloseAfterWriteSuite
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sadhen/spark blocking_future

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22304.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22304
    
----
commit 60eec773c5dcc605e2491c6c77e60f3ca0d7b6a8
Author: 忍冬 <re...@...>
Date:   2018-08-31T12:13:56Z

    fix blocking unit tests by providing customized reporter

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22304: [SPARK-25297][Streaming][Test] Fix blocking unit ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22304#discussion_r214340859
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -65,7 +65,8 @@ private[streaming] class FileBasedWriteAheadLog(
         "WriteAheadLogManager" + callerName.map(c => s" for $c").getOrElse("")
       }
       private val forkJoinPool = ThreadUtils.newForkJoinPool(threadpoolName, 20)
    -  private val executionContext = ExecutionContext.fromExecutorService(forkJoinPool)
    +  private val executionContext = ExecutionContext
    +    .fromExecutorService(forkJoinPool, { e: Throwable => throw e })
    --- End diff --
    
    do you mean in scala 2.12 the default reporter is not throwing the exception?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22304: [SPARK-25297][Streaming][Test] Fix blocking unit ...

Posted by sadhen <gi...@git.apache.org>.
Github user sadhen closed the pull request at:

    https://github.com/apache/spark/pull/22304


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by sadhen <gi...@git.apache.org>.
Github user sadhen commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    @srowen @cloud-fan  The merged #22292 already fixed it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    The one concern is whether this behavior change impacts anything else. Now some exceptions will cause an actual exception in a caller to methods like Await, rather than just logging and continuing. That might even be a good thing, and is crucial here. But that's what I have in mind. That said, tests all pass.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by sadhen <gi...@git.apache.org>.
Github user sadhen commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    There is simplified project for this specific problem.
    
    https://github.com/sadhen/blocking-future


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    **[Test build #95540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95540/testReport)** for PR 22304 at commit [`60eec77`](https://github.com/apache/spark/commit/60eec773c5dcc605e2491c6c77e60f3ca0d7b6a8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95540/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22304: [SPARK-25297][Streaming][Test] Fix blocking unit ...

Posted by sadhen <gi...@git.apache.org>.
Github user sadhen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22304#discussion_r214342975
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -65,7 +65,8 @@ private[streaming] class FileBasedWriteAheadLog(
         "WriteAheadLogManager" + callerName.map(c => s" for $c").getOrElse("")
       }
       private val forkJoinPool = ThreadUtils.newForkJoinPool(threadpoolName, 20)
    -  private val executionContext = ExecutionContext.fromExecutorService(forkJoinPool)
    +  private val executionContext = ExecutionContext
    +    .fromExecutorService(forkJoinPool, { e: Throwable => throw e })
    --- End diff --
    
    The default reporter does not throw the exception in Scala 2.11 either.
    
    I guess there are bugs in the Future impl in Scala 2.12, since there is a big change at Future.scala in Scala 2.12.
    
    There is a temporary fix.
    
    I will dig into Future.scala, and hopefully propose a PR for Scala.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22304: [SPARK-25297][Streaming][Test] Fix blocking unit ...

Posted by viktorklang <gi...@git.apache.org>.
Github user viktorklang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22304#discussion_r214555205
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -65,7 +65,8 @@ private[streaming] class FileBasedWriteAheadLog(
         "WriteAheadLogManager" + callerName.map(c => s" for $c").getOrElse("")
       }
       private val forkJoinPool = ThreadUtils.newForkJoinPool(threadpoolName, 20)
    -  private val executionContext = ExecutionContext.fromExecutorService(forkJoinPool)
    +  private val executionContext = ExecutionContext
    +    .fromExecutorService(forkJoinPool, { e: Throwable => throw e })
    --- End diff --
    
    The suggested solution will not work as intended as it will prevent subsequent callbacks (esp for other ExecutionContexts) to be submitted, esp. in the case of RejectedExecutionException.
    
    See: 
    https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/Promise.scala#L68
    https://github.com/scala/scala/blob/2.12.x/src/library/scala/concurrent/impl/Promise.scala#L284
    
    Or, in other words, rethrowing is not "reporting" the error.
    
    The new Future encoding included in Scala 2.13.x deals much better with RejectedExecutionExceptions and InterruptedExceptions, but it is unclear right now what parts could be backported within the binary compatibility constraints.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    @srowen are we targeting this patch to Spark 2.4?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    **[Test build #95540 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95540/testReport)** for PR 22304 at commit [`60eec77`](https://github.com/apache/spark/commit/60eec773c5dcc605e2491c6c77e60f3ca0d7b6a8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    More info ... the test failure symptom is that this test never completes, just repeating a `RejectedExecutionException`:
    
    ```
    - FileBasedWriteAheadLog - handling file errors while reading rotating logs
    - FileBasedWriteAheadLog - do not create directories or files unless write
    java.util.concurrent.RejectedExecutionException
    	at java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:2328)
    	at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2419)
    	at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2648)
    	at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:134)
    	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
    	at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete(Promise.scala:368)
    	at scala.concurrent.impl.Promise$KeptPromise$Kept.onComplete$(Promise.scala:367)
    	at scala.concurrent.impl.Promise$KeptPromise$Successful.onComplete(Promise.scala:375)
    	at scala.concurrent.impl.Promise.transform(Promise.scala:29)
    	at scala.concurrent.impl.Promise.transform$(Promise.scala:27)
    	at scala.concurrent.impl.Promise$KeptPromise$Successful.transform(Promise.scala:375)
    	at scala.concurrent.Future.map(Future.scala:288)
    	at scala.concurrent.Future.map$(Future.scala:288)
    	at scala.concurrent.impl.Promise$KeptPromise$Successful.map(Promise.scala:375)
    	at scala.concurrent.Future$.apply(Future.scala:654)
    	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$1(ThreadUtils.scala:314)
    	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
    	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    	at scala.collection.TraversableLike.map(TraversableLike.scala:234)
    	at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
    	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:314)
    	at org.apache.spark.streaming.util.FileBasedWriteAheadLog$.$anonfun$seqToParIterator$1(FileBasedWriteAheadLog.scala:318)
    	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:482)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:456)
    	at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
    	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:456)
    	at scala.collection.Iterator.foreach(Iterator.scala:944)
    	at scala.collection.Iterator.foreach$(Iterator.scala:944)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:59)
    	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:50)
    	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    	at scala.collection.TraversableOnce.to(TraversableOnce.scala:310)
    	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:308)
    	at scala.collection.AbstractIterator.to(Iterator.scala:1432)
    	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:302)
    	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:302)
    	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1432)
    	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:289)
    	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:283)
    	at scala.collection.AbstractIterator.toArray(Iterator.scala:1432)
    	at org.apache.spark.streaming.util.CommonWriteAheadLogTests.$anonfun$new$16(WriteAheadLogSuite.scala:213)
    	at org.scalatest.Assertions.intercept(Assertions.scala:805)
    	at org.scalatest.Assertions.intercept$(Assertions.scala:802)
    	at org.scalatest.FunSuite.intercept(FunSuite.scala:1560)
    	at org.apache.spark.streaming.util.CommonWriteAheadLogTests.$anonfun$new$13(WriteAheadLogSuite.scala:213)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22304: [SPARK-25297][Streaming][Test] Fix blocking unit ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22304#discussion_r214360457
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---
    @@ -65,7 +65,8 @@ private[streaming] class FileBasedWriteAheadLog(
         "WriteAheadLogManager" + callerName.map(c => s" for $c").getOrElse("")
       }
       private val forkJoinPool = ThreadUtils.newForkJoinPool(threadpoolName, 20)
    -  private val executionContext = ExecutionContext.fromExecutorService(forkJoinPool)
    +  private val executionContext = ExecutionContext
    +    .fromExecutorService(forkJoinPool, { e: Throwable => throw e })
    --- End diff --
    
    Thanks @sadhen I was just going to start looking into this. This seems like a reasonable solution.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22304: [SPARK-25297][Streaming][Test] Fix blocking unit tests f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22304
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org