You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "He-Pin (via GitHub)" <gi...@apache.org> on 2024/03/29 17:17:36 UTC

[PR] chore: Refactory UnfoldResourceSource. [pekko]

He-Pin opened a new pull request, #1239:
URL: https://github.com/apache/pekko/pull/1239

   Motivation:
   Refactory UnfoldResourceSource a little instead of https://github.com/apache/pekko/pull/615
   
   Result:
   Clean code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] chore: Refactory UnfoldResourceSource. [pekko]

Posted by "jxnu-liguobin (via GitHub)" <gi...@apache.org>.
jxnu-liguobin commented on code in PR #1239:
URL: https://github.com/apache/pekko/pull/1239#discussion_r1545144014


##########
stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala:
##########
@@ -33,66 +34,69 @@ import pekko.stream.stage._
     extends GraphStage[SourceShape[T]] {
   val out = Outlet[T]("UnfoldResourceSource.out")
   override val shape = SourceShape(out)
-  override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
+  override def initialAttributes: Attributes =
+    DefaultAttributes.unfoldResourceSource and SourceLocation.forLambda(create)
 
-  def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
-    lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
-    var open = false
-    var blockingStream: S = _
-    setHandler(out, this)
+  def createLogic(inheritedAttributes: Attributes) =
+    new GraphStageLogic(shape) with OutHandler {
+      private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private var open = false
+      private var resource: S = _
 
-    override def preStart(): Unit = {
-      blockingStream = create()
-      open = true
-    }
+      override def preStart(): Unit = {
+        resource = create()
+        open = true
+      }
 
-    @tailrec
-    final override def onPull(): Unit = {
-      var resumingMode = false
-      try {
-        readData(blockingStream) match {
-          case Some(data) => push(out, data)
-          case None       => closeStage()
-        }
-      } catch {
-        case NonFatal(ex) =>
-          decider(ex) match {
-            case Supervision.Stop =>
-              open = false
-              close(blockingStream)
-              failStage(ex)
-            case Supervision.Restart =>
-              restartState()
-              resumingMode = true
-            case Supervision.Resume =>
-              resumingMode = true
+      @tailrec
+      final override def onPull(): Unit = {
+        var resumingMode = false
+        try {
+          readData(resource) match {
+            case Some(data) => push(out, data)
+            case None       => closeStage()
           }
+        } catch {
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Stop =>
+                open = false

Review Comment:
   indent 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] chore: Refactory UnfoldResourceSource. [pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1239:
URL: https://github.com/apache/pekko/pull/1239#discussion_r1545158988


##########
stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala:
##########
@@ -33,66 +34,69 @@ import pekko.stream.stage._
     extends GraphStage[SourceShape[T]] {
   val out = Outlet[T]("UnfoldResourceSource.out")
   override val shape = SourceShape(out)
-  override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
+  override def initialAttributes: Attributes =
+    DefaultAttributes.unfoldResourceSource and SourceLocation.forLambda(create)
 
-  def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
-    lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
-    var open = false
-    var blockingStream: S = _
-    setHandler(out, this)
+  def createLogic(inheritedAttributes: Attributes) =
+    new GraphStageLogic(shape) with OutHandler {
+      private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private var open = false
+      private var resource: S = _
 
-    override def preStart(): Unit = {
-      blockingStream = create()
-      open = true
-    }
+      override def preStart(): Unit = {
+        resource = create()
+        open = true
+      }
 
-    @tailrec
-    final override def onPull(): Unit = {
-      var resumingMode = false
-      try {
-        readData(blockingStream) match {
-          case Some(data) => push(out, data)
-          case None       => closeStage()
-        }
-      } catch {
-        case NonFatal(ex) =>
-          decider(ex) match {
-            case Supervision.Stop =>
-              open = false
-              close(blockingStream)
-              failStage(ex)
-            case Supervision.Restart =>
-              restartState()
-              resumingMode = true
-            case Supervision.Resume =>
-              resumingMode = true
+      @tailrec
+      final override def onPull(): Unit = {
+        var resumingMode = false
+        try {
+          readData(resource) match {
+            case Some(data) => push(out, data)
+            case None       => closeStage()
           }
+        } catch {
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Stop =>
+                open = false

Review Comment:
   It's formatted with scala-cli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] chore: Refactory UnfoldResourceSource. [pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1239:
URL: https://github.com/apache/pekko/pull/1239#discussion_r1545571514


##########
stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala:
##########
@@ -33,66 +34,69 @@ import pekko.stream.stage._
     extends GraphStage[SourceShape[T]] {
   val out = Outlet[T]("UnfoldResourceSource.out")
   override val shape = SourceShape(out)
-  override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
+  override def initialAttributes: Attributes =
+    DefaultAttributes.unfoldResourceSource and SourceLocation.forLambda(create)
 
-  def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
-    lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
-    var open = false
-    var blockingStream: S = _
-    setHandler(out, this)
+  def createLogic(inheritedAttributes: Attributes) =
+    new GraphStageLogic(shape) with OutHandler {
+      private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+      private var open = false
+      private var resource: S = _
 
-    override def preStart(): Unit = {
-      blockingStream = create()
-      open = true
-    }
+      override def preStart(): Unit = {
+        resource = create()
+        open = true
+      }
 
-    @tailrec
-    final override def onPull(): Unit = {
-      var resumingMode = false
-      try {
-        readData(blockingStream) match {
-          case Some(data) => push(out, data)
-          case None       => closeStage()
-        }
-      } catch {
-        case NonFatal(ex) =>
-          decider(ex) match {
-            case Supervision.Stop =>
-              open = false
-              close(blockingStream)
-              failStage(ex)
-            case Supervision.Restart =>
-              restartState()
-              resumingMode = true
-            case Supervision.Resume =>
-              resumingMode = true
+      @tailrec
+      final override def onPull(): Unit = {
+        var resumingMode = false
+        try {
+          readData(resource) match {
+            case Some(data) => push(out, data)
+            case None       => closeStage()
           }
+        } catch {
+          case NonFatal(ex) =>
+            decider(ex) match {
+              case Supervision.Stop =>
+                open = false

Review Comment:
   You can review it with `whitespace:off` in the review tab.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [PR] chore: Refactor UnfoldResourceSource. [pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin merged PR #1239:
URL: https://github.com/apache/pekko/pull/1239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org