You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "mdedetrich (via GitHub)" <gi...@apache.org> on 2024/03/18 12:18:28 UTC

[PR] Fix uncaught exception in Split with Supervision.resumingDecider [incubator-pekko]

mdedetrich opened a new pull request, #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207

   Resolves: https://github.com/apache/incubator-pekko/issues/1205
   
   Turns out the error was quite simple, we weren't using a decider to caught the exception in `onPull`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich merged PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528533530


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   I thought about this again, I may be wrong and the test is indeed valid as its written right now because its `splitAfter` so its its meant to be skipping the next element due to exception being thrown, in which case we have to drop one element and then do `pull(in)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "samueleresca (via GitHub)" <gi...@apache.org>.
samueleresca commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1531160341


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   Thanks folks, I appreciate the detailed explanation. And yes, I was diverting away (not on purpose) from the original goal of the test 
   
   > Note that at this point, we are dealing with the normal logic of SplitWhen. The new functionality of skipping 
   > exceptions with SupervisionDecider.resume occurs earlier when elem == 3 so we are past that and just testing happy > path logic of SplitAfter which is not changed at all (and it shouldn't be either!).
   
   looks good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "samueleresca (via GitHub)" <gi...@apache.org>.
samueleresca commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529416183


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   I was reviewing this test. Isn't the `val substream2 = subscriber.expectNext()` expecting the new stream element (substream2) triggered by the `sendNext(6)` instruction? why do we need to move the `sendNext(7)` above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528439903


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred later as a result of https://github.com/apache/incubator-pekko/pull/252).
   
   Logically when you read the test, its expected that when you do `expectNext` that previously an element should have been sent with `sendNext` and its also the same in  the equivalent [`groupBy` test](https://github.com/apache/incubator-pekko/blob/c44c0b7cbdab11d85176cfe062288fdcba16c56a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala#L306-L308)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529781161


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue because the origin source is already completed.
   > 
   > The problem here is when the `stream2` be generated, I expected the behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
   
   I am not saying that this is necessarily wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.
   
   ```scala
   upstreamSubscription.sendNext(6)
   substreamPuppet1.expectNext(6)
   substreamPuppet1.expectComplete()
   upstreamSubscription.sendNext(7)
   val substream2 = subscriber.expectNext()
   val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
   substreamPuppet2.request(10)
   substreamPuppet2.expectNext(7)
   
   upstreamSubscription.sendComplete()
   subscriber.expectComplete()
   substreamPuppet2.expectComplete()
   ```
   
   we are past recovering from an exception (furthermore if we are somehow changing some fundamental behaviour with `SplitAfter` then other tests would fail, but they are all passing without any changes). The critical part of the test specifically dealing with recovering from exception and resuming is
   
   ```scala
   upstreamSubscription.sendNext(3)
   upstreamSubscription.sendNext(4)
   substreamPuppet1.expectNext(4) // note that 3 was dropped
   ```
   
   And this part is completely unchanged from how the test was originally written.
   
   Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly
   
   ```scala
   decider(ex) match {
     case Supervision.Resume  => pull(in)
     case Supervision.Stop    => onUpstreamFailure(ex)
     case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
   }
   ``` 
   
   in the `case Supervision.Resume  => pull(in)` block, but if thats the case it would also error out earlier since that block is only executed when recovering from exceptions in `onPush` (and again thats only when `elem` is 3, not 6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528439903


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred due to https://github.com/apache/incubator-pekko/pull/252).
   
   Logically when you read the test, its expected that when you do `expectNext` that a previously an element should have been sent with `sendNext` and its also the same in  the equivalent [`groupBy` test](https://github.com/apache/incubator-pekko/blob/c44c0b7cbdab11d85176cfe062288fdcba16c56a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala#L306-L308)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?
   
   Note that at this point, we are dealing with the normal logic of `SplitWhen`. The new functionality of skipping exceptions with `SupervisionDecider.resume` occurs earlier when `elem == 3`, at this point in the test we are past that and just testing happy path logic of `SplitAfter` which is not changed at all (and it shouldn't be either).
   
   This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with `SupervisionDecider.resume`, its testing something entirely different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue because the origin source is already completed.
   > 
   > The problem here is when the `stream2` be generated, I expected the behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
   
   I am not saying that this is wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.
   
   ```scala
   upstreamSubscription.sendNext(6)
   substreamPuppet1.expectNext(6)
   substreamPuppet1.expectComplete()
   upstreamSubscription.sendNext(7)
   val substream2 = subscriber.expectNext()
   val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
   substreamPuppet2.request(10)
   substreamPuppet2.expectNext(7)
   
   upstreamSubscription.sendComplete()
   subscriber.expectComplete()
   substreamPuppet2.expectComplete()
   ```
   
   we are past recovering from an exception. The critical part of the test specifically dealing with recovering from exception and resuming is
   
   ```scala
   upstreamSubscription.sendNext(3)
   upstreamSubscription.sendNext(4)
   substreamPuppet1.expectNext(4) // note that 3 was dropped
   ```
   
   And this part is completely unchanged from how the test was originally written.
   
   Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly
   
   ```scala
   decider(ex) match {
     case Supervision.Resume  => pull(in)
     case Supervision.Stop    => onUpstreamFailure(ex)
     case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
   }
   ``` 
   
   in the `case Supervision.Resume  => pull(in)` block, but if thats the case it would also error out earlier



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?
   
   Note that at this point, we are dealing with the normal logic of `SplitWhen`. The new functionality of skipping exceptions with `SupervisionDecider.resume` occurs earlier when `elem == 3`, at this point in the test we are past that and just testing happy path logic of `SplitAfter` which should not be changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#issuecomment-2007528080

   Thanks, ill go ahead and merge this. With it being included in `-M1` it will get plenty of testing/usage before the final release incase there is any hypothetical issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528765865


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala:
##########
@@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
             else substreamSource.push(elem)
           }
         } catch {
-          case NonFatal(ex) => onUpstreamFailure(ex)
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Resume  => pull(in)
+              case Supervision.Stop    => onUpstreamFailure(ex)
+              case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?

Review Comment:
   > We should add test for  at least ` Supervision.Resume ` and ` Supervision.Stop` strategy.
   
   These already exist, only the case of throwing exception with `Supervision.restart` being missed (check rest of file)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528764028


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala:
##########
@@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
             else substreamSource.push(elem)
           }
         } catch {
-          case NonFatal(ex) => onUpstreamFailure(ex)
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Resume  => pull(in)
+              case Supervision.Stop    => onUpstreamFailure(ex)
+              case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?

Review Comment:
   > After I took a look at the code, I think we should handle `Supervision.Restart` as the `Supervision.Resume`,be cause it happens in `SubstreamHandler`
   
   Does it matter? The code will be updated in https://github.com/apache/incubator-pekko/pull/981? Currently it's a place holder.
   
   Or do you mean  that in https://github.com/apache/incubator-pekko/pull/981 it should behave as you say?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   i.e. at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528439903


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred later as a result of https://github.com/apache/incubator-pekko/pull/252).
   
   Logically when you read the test, its expected that when you do `expectNext` that a previously an element should have been sent with `sendNext` and its also the same in  the equivalent [`groupBy` test](https://github.com/apache/incubator-pekko/blob/c44c0b7cbdab11d85176cfe062288fdcba16c56a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala#L306-L308)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528765865


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala:
##########
@@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
             else substreamSource.push(elem)
           }
         } catch {
-          case NonFatal(ex) => onUpstreamFailure(ex)
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Resume  => pull(in)
+              case Supervision.Stop    => onUpstreamFailure(ex)
+              case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?

Review Comment:
   > We should add test for  at least ` Supervision.Resume ` and ` Supervision.Stop` strategy.
   
   These already exist, only the case of throwing exception with `Supervision.restart` being missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "samueleresca (via GitHub)" <gi...@apache.org>.
samueleresca commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529416183


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   I was reviewing this test. Isn't the `val substream2 = subscriber.expectNext()` expecting the new stream element (substream2) triggered by the `sendNext(6)` instruction? why do we need to move the `sendNext(7)` above?
   
   As referred above:
   
   > but its a little strange now ,that we need send the next(7) to make the next sub source be ready.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528759852


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala:
##########
@@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
             else substreamSource.push(elem)
           }
         } catch {
-          case NonFatal(ex) => onUpstreamFailure(ex)
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Resume  => pull(in)
+              case Supervision.Stop    => onUpstreamFailure(ex)
+              case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?

Review Comment:
   After I took a look at the code, I think we should handle `Supervision.Restart` as the `Supervision.Resume`,be cause it happens in `SubstreamHandler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue because the origin source is already completed.
   > 
   > The problem here is when the `stream2` be generated, I expected the behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
   
   I am not saying that this is wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.
   
   ```scala
   upstreamSubscription.sendNext(6)
   substreamPuppet1.expectNext(6)
   substreamPuppet1.expectComplete()
   upstreamSubscription.sendNext(7)
   val substream2 = subscriber.expectNext()
   val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
   substreamPuppet2.request(10)
   substreamPuppet2.expectNext(7)
   
   upstreamSubscription.sendComplete()
   subscriber.expectComplete()
   substreamPuppet2.expectComplete()
   ```
   
   we are past recovering from an exception. The critical part of the test specifically dealing with recovering from exception and resuming is
   
   ```scala
   upstreamSubscription.sendNext(3)
   upstreamSubscription.sendNext(4)
   substreamPuppet1.expectNext(4) // note that 3 was dropped
   ```
   
   And this part is completely unchanged from how the test was originally written.
   
   Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately
         



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529105068


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   So if I change the logic to
   
   ```scala
             case NonFatal(ex) =>
               decider(ex) match {
                 case Supervision.Resume  =>
                   if (decision == SplitAfter)
                     pushSubstreamSource()
                   else
                     pull(in)
                 case Supervision.Stop    => onUpstreamFailure(ex)
                 case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
               }
   ```
   
   This test now fails earlier on in the happy path, specifically at https://github.com/apache/incubator-pekko/blob/df3b9fccde6b7fbe7829703996379a055bc27b42/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala#L243
   
   Maybe the test really really was just originally written incorrectly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#issuecomment-2003774656

   @He-Pin @Roiocam @jxnu-liguobin @pjfanning @raboof Please take a note of https://github.com/apache/incubator-pekko/pull/1207#pullrequestreview-1942808650 , I might be missing something here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529762768


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > I was reviewing this test. Isn't the val substream2 = subscriber.expectNext() expecting the new stream element (substream2) triggered by the sendNext(6) instruction? why do we need to move the sendNext(7) above?
   
   Well the test doesn't pass otherwise and if you see the equivalent tests both in [`FlowSplitWhenSpec`](https://github.com/apache/incubator-pekko/blob/df3b9fccde6b7fbe7829703996379a055bc27b42/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala#L246-L278) and [`FlowGroupBySpec`](https://github.com/apache/incubator-pekko/blob/c44c0b7cbdab11d85176cfe062288fdcba16c56a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala#L281-L323), its written in the same way that this PR changes it to.
   
   There also doesn't appear to be an obvious solution to make the test as it was originally written to pass, hence why considering everything it could just be that the test was mistakingly written incorrectly (which is quite probably considering the test was written just as an example, it was never executed/passing)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   i.e. at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528440793


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala:
##########
@@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
             else substreamSource.push(elem)
           }
         } catch {
-          case NonFatal(ex) => onUpstreamFailure(ex)
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Resume  => pull(in)
+              case Supervision.Stop    => onUpstreamFailure(ex)
+              case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?

Review Comment:
   This will be implemented as a result of https://github.com/apache/incubator-pekko/pull/981



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528762400


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala:
##########
@@ -640,7 +640,12 @@ import pekko.util.ccompat.JavaConverters._
             else substreamSource.push(elem)
           }
         } catch {
-          case NonFatal(ex) => onUpstreamFailure(ex)
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Resume  => pull(in)
+              case Supervision.Stop    => onUpstreamFailure(ex)
+              case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?

Review Comment:
   We should add test for  at least ` Supervision.Resume ` and ` Supervision.Stop` strategy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529105068


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   So if I change the logic to
   
   ```scala
             case NonFatal(ex) =>
               decider(ex) match {
                 case Supervision.Resume  =>
                   if (decision == SplitAfter)
                     pushSubstreamSource()
                   else
                     pull(in)
                 case Supervision.Stop    => onUpstreamFailure(ex)
                 case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
               }
   ```
   
   This test now fails earlier on in the happy path, specifically at https://github.com/apache/incubator-pekko/blob/df3b9fccde6b7fbe7829703996379a055bc27b42/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala#L243
   
   Maybe the test really was just originally written incorrectly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?
   
   Note that at this point, we are dealing with the normal logic of `SplitWhen`. The new functionality of skipping exceptions with `SupervisionDecider.resume` occurs earlier when `elem == 3`, at this point in the test we are past that and just testing happy path logic of `SplitAfter` which should not be changed.
   
   This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with `SupervisionDecider.resume`, its testing something entirely different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528439903


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   It seems like this is a genuine mistake when the test was originally written (remember that this test never passed, it was written for the future when supervision strategy propagation would be implemented which occurred later as a result of https://github.com/apache/incubator-pekko/pull/252).
   
   Logically when you read the test, its expected that when you do `expectNext` that previously an element should have been sent with `sendNext` and its also the same in  the equivalent [`groupBy` test](https://github.com/apache/incubator-pekko/blob/c44c0b7cbdab11d85176cfe062288fdcba16c56a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGroupBySpec.scala#L306-L308) and also [`splitWhen` test](https://github.com/apache/incubator-pekko/blob/df3b9fccde6b7fbe7829703996379a055bc27b42/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitWhenSpec.scala#L364-L367)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528856829


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   As the `downstreamSubscription.request(100)` , I think the orign test should be right too, but its a little strange now ,that we need send the `next(7)` to make the next sub source be ready.
   
   I think we should call 
   ```scala
         private def pushSubstreamSource(): Unit = {
           push(out, Source.fromGraph(substreamSource.source))
           scheduleOnce(SubscriptionTimer, timeout)
           substreamWaitingToBePushed = false
         }
   ```
   
   now inside the `SubstreamHandler#onPush` when the decision is `SplitAfter` too to keep the behavior the same, as it just a dummy Source.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue because the origin source is already completed.
   > 
   > The problem here is when the `stream2` be generated, I expected the behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
   
   I am not saying that this is wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.
   
   ```scala
   upstreamSubscription.sendNext(6)
   substreamPuppet1.expectNext(6)
   substreamPuppet1.expectComplete()
   upstreamSubscription.sendNext(7)
   val substream2 = subscriber.expectNext()
   val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
   substreamPuppet2.request(10)
   substreamPuppet2.expectNext(7)
   
   upstreamSubscription.sendComplete()
   subscriber.expectComplete()
   substreamPuppet2.expectComplete()
   ```
   
   we are past recovering from an exception (furthermore if we are somehow changing some fundamental behaviour with `SplitAfter` then other tests would fail, but they are all passing without any changes). The critical part of the test specifically dealing with recovering from exception and resuming is
   
   ```scala
   upstreamSubscription.sendNext(3)
   upstreamSubscription.sendNext(4)
   substreamPuppet1.expectNext(4) // note that 3 was dropped
   ```
   
   And this part is completely unchanged from how the test was originally written.
   
   Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly
   
   ```scala
   decider(ex) match {
     case Supervision.Resume  => pull(in)
     case Supervision.Stop    => onUpstreamFailure(ex)
     case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
   }
   ``` 
   
   in the `case Supervision.Resume  => pull(in)` block, but if thats the case it would also error out earlier since that block is only executed when recovering from exceptions in `onPush`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528533530


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   I thought about this again, I may be wrong and the test is indeed valid as its written right now because its `splitAfter` so its its meant to be skipping the next element due to exception being thrown, in which case we have to drop one element and then do `pull(in)`???



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528769483


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > Yes, the element which case the exception is dropped now.
   
   So to confirm, the current PRs implementation is correct and the test as originally written was wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?
   
   Note that at this point, we are dealing with the normal logic of `SplitWhen`. The new functionality of skipping exceptions with `SupervisionDecider.resume` occurs earlier when `elem == 3` so we are past that and just testing happy path logic of `SplitAfter` which is not changed at all (and it shouldn't be either!).
   
   This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with `SupervisionDecider.resume`, its testing something entirely different (but I would personally leave it there now so its consistent with the other tests).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529900580


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not expecting a `complete`,  otherwise, the call `sendNext(7)` will cause issue because the origin source is already completed.
   
   The problem here is when the `stream2` be generated, I expected the behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1528761436


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   Yes, the element which case the exception is dropped now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext` due to the completion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529789529


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > @mdedetrich in the test, it requests 10, so the SubSource is expected to be there after 6.
   
   But its also sending a completion because of this condition `if (elem == 3) throw exc else elem % 3 == 0` (i.e. `6 % 3` == 0 so the split happens right after that) which is why the test is failing with
   
   ```
   Expected OnNext(_), yet no element signaled during 3 seconds
   java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 seconds
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:398)
   	at org.apache.pekko.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:389)
   	at org.apache.pekko.stream.scaladsl.FlowSplitAfterSpec.$anonfun$new$11(FlowSplitAfterSpec.scala:251)
   ```
   
   at `val substream2 = subscriber.expectNext()` because there isn't going to be a `expectNext`, it's expecting a completion instead.
   
   Note that at this point, we are dealing with the normal logic of `SplitWhen`. The new functionality of skipping exceptions with `SupervisionDecider.resume` occurs earlier when `elem == 3` so we are past that and just testing happy path logic of `SplitAfter` which is not changed at all (and it shouldn't be either!).
   
   This is why I think that part of the test is written incorrectly. In fact you can argue it can even be removed since it has nothing to do with recovering from thrown exceptions with `SupervisionDecider.resume`, its testing something entirely different (but I would personally leave it there now so its consistent with the other tests).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] Fix uncaught decider exception in Split with Supervision.resumingDecider [incubator-pekko]

Posted by "mdedetrich (via GitHub)" <gi...@apache.org>.
mdedetrich commented on code in PR #1207:
URL: https://github.com/apache/incubator-pekko/pull/1207#discussion_r1529929645


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowSplitAfterSpec.scala:
##########
@@ -250,10 +248,10 @@ class FlowSplitAfterSpec extends StreamSpec("""
       upstreamSubscription.sendNext(6)
       substreamPuppet1.expectNext(6)
       substreamPuppet1.expectComplete()
+      upstreamSubscription.sendNext(7)

Review Comment:
   > **1. `val substream2 = subscriber.expectNext()`, the `subscriber` is not expecting a `complete`, otherwise, the call `sendNext(7)` will cause issue because the origin source is already completed.
   > 
   > The problem here is when the `stream2` be generated, I expected the behavior the same, just after the `6 % 3 == 0`, wdyt @samueleresca @mdedetrich**
   
   I am not saying that this is wrong, just that its a completely separate issue from what this PR is changing/solving. Or to put it differently, this behaviour is the same as current Pekko since at this point i.e.
   
   ```scala
   upstreamSubscription.sendNext(6)
   substreamPuppet1.expectNext(6)
   substreamPuppet1.expectComplete()
   upstreamSubscription.sendNext(7)
   val substream2 = subscriber.expectNext()
   val substreamPuppet2 = StreamPuppet(substream2.runWith(Sink.asPublisher(false)))
   substreamPuppet2.request(10)
   substreamPuppet2.expectNext(7)
   
   upstreamSubscription.sendComplete()
   subscriber.expectComplete()
   substreamPuppet2.expectComplete()
   ```
   
   we are past recovering from an exception (furthermore if we are somehow changing some fundamental behaviour with `SplitAfter` then other tests would fail, but they are all passing without any changes). The critical part of the test specifically dealing with recovering from exception and resuming is
   
   ```scala
   upstreamSubscription.sendNext(3)
   upstreamSubscription.sendNext(4)
   substreamPuppet1.expectNext(4) // note that 3 was dropped
   ```
   
   And this part is completely unchanged from how the test was originally written.
   
   Given that, I think it makes sense to file a separate issue if this behaviour about completing after a split needs to change and hence to tackle it separately. The only exception to this that I can think of is that some state is not being reset correctly
   
   ```scala
   decider(ex) match {
     case Supervision.Resume  => pull(in)
     case Supervision.Stop    => onUpstreamFailure(ex)
     case Supervision.Restart => onUpstreamFailure(ex) // TODO implement restart?
   }
   ``` 
   
   in the `case Supervision.Resume  => pull(in)` block, but if thats the case it would also error out earlier since that block is only executed when recovering from exceptions in `onPush` (and again thats only when `elem` is 3, not 6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org