You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "GreyPlane (via GitHub)" <gi...@apache.org> on 2023/08/31 16:02:26 UTC

[GitHub] [incubator-pekko] GreyPlane opened a new pull request, #603: Fix statefulMap may call onComplete twice if it throws

GreyPlane opened a new pull request, #603:
URL: https://github.com/apache/incubator-pekko/pull/603

   #596 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311897194


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -35,6 +36,8 @@ class FlowStatefulMapSpec extends StreamSpec {
 
   val ex = new Exception("TEST") with NoStackTrace
 
+  val dtorException = new Exception("bad dtor") with NoStackTrace

Review Comment:
   dtor means `destructor`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin closed pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin closed pull request #603: Fix statefulMap may call onComplete twice if it throws
URL: https://github.com/apache/incubator-pekko/pull/603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311905533


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.never[String]
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.cancelled)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream failed even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.failed[String](ex)
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.ignore)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream finished even if onComplete throws" in {

Review Comment:
   `in assertAllStagesStopped {`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311905533


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.never[String]
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.cancelled)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream failed even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.failed[String](ex)
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.ignore)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream finished even if onComplete throws" in {

Review Comment:
   `in assertAllStagesStopped {` or jsut remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] pjfanning commented on pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#issuecomment-1701616999

   @kerr Let us do a clean room fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#issuecomment-1701674799

   @GreyPlane Sorry, I think this fix should be closed, even I knew you come up this independently, but as you are involved/leved comments in the akka pr, so this fix should be closed and come out with someone else, thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311898120


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0

Review Comment:
   Make use of AtomicInteger, as the  stream can be run on a separated thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311927683


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.never[String]
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.cancelled)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream failed even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.failed[String](ex)
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.ignore)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream finished even if onComplete throws" in {

Review Comment:
   Or just remove it .



##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.never[String]
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.cancelled)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream failed even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.failed[String](ex)
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.ignore)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream finished even if onComplete throws" in {

Review Comment:
   Or just remove it .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#issuecomment-1701610020

   @pjfanning The fix of my came out independently,  but the tests were what I copied for quick validation, let's make that clear, and then i checked the Akka fix which is not the same as mine.
   So it's the tests I took not the fix.
   
   Many ones in Wechat group can prove this.
   
   I was want to make sure the onComplete to be called and then mark the boolean to false, but that will need the onComplete be idempotent and not easy to use.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#issuecomment-1701618708

   And you may find someone not in tech to check and validation those fix is not the same


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311905533


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.never[String]
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.cancelled)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream failed even if onComplete throws" in {
+    var counter = 0
+
+    def badDtor() = {
+      counter += 1
+      throw dtorException
+    }
+
+    Source.failed[String](ex)
+      .statefulMap(() => None)((s, e) => s -> e,
+        _ => {
+          badDtor()
+          None
+        })
+      .runWith(Sink.ignore)
+
+    StreamTestKit.assertAllStagesStopped(None)
+    counter shouldBe 1
+  }
+
+  "won't call onComplete twice when upstream finished even if onComplete throws" in {

Review Comment:
   `in assertAllStagesStopped {` or jsut remove it.



##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -35,6 +36,8 @@ class FlowStatefulMapSpec extends StreamSpec {
 
   val ex = new Exception("TEST") with NoStackTrace
 
+  val dtorException = new Exception("bad dtor") with NoStackTrace

Review Comment:
   dtor means `destructor`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] pjfanning commented on pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#issuecomment-1701594984

   @GreyPlane could we get you to fill in an Apache CLA for this? https://www.apache.org/licenses/contributor-agreements.html
   
   For a comparatively small change like this, we might not always require the CLA but this issue/PR has become a potential legal issue since @kerr has looked at the Akka solution (and it is not licensed under a license that is compatible with use by us). We don't want any risk that anyone could claim that Category X code has been copied into our project.
   
   https://www.apache.org/legal/resolved.html#category-x


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#issuecomment-1701383667

   LGTM only two comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311898120


##########
stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowStatefulMapSpec.scala:
##########
@@ -280,4 +283,64 @@ class FlowStatefulMapSpec extends StreamSpec {
         .expectComplete()
     }
   }
+
+  "won't call onComplete twice when downstream cancelled even if onComplete throws" in {
+    var counter = 0

Review Comment:
   Make use of AtomicInteger, as the  stream can be run on a separated thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


[GitHub] [incubator-pekko] pjfanning commented on a diff in pull request #603: Fix statefulMap may call onComplete twice if it throws

Posted by "pjfanning (via GitHub)" <gi...@apache.org>.
pjfanning commented on code in PR #603:
URL: https://github.com/apache/incubator-pekko/pull/603#discussion_r1311865142


##########
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala:
##########
@@ -2273,25 +2273,27 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
       }
 
       private def closeStateAndComplete(): Unit = {
+        needInvokeOnCompleteCallback = false
         onComplete(state) match {
           case Some(elem) => emit(out, elem, () => completeStage())
           case None       => completeStage()
         }
-        needInvokeOnCompleteCallback = false
       }
 
       private def closeStateAndFail(ex: Throwable): Unit = {
+        println("changed 1")
+        needInvokeOnCompleteCallback = false
         onComplete(state) match {
           case Some(elem) => emit(out, elem, () => failStage(ex))
           case None       => failStage(ex)
         }
-        needInvokeOnCompleteCallback = false
       }
 
       override def onPull(): Unit = pull(in)
 
       override def postStop(): Unit = {
         if (needInvokeOnCompleteCallback) {
+          println("changed 2")

Review Comment:
   can you remove the printlns?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org