You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "He-Pin (via GitHub)" <gi...@apache.org> on 2024/03/29 17:17:36 UTC
[PR] chore: Refactory UnfoldResourceSource. [pekko]
He-Pin opened a new pull request, #1239:
URL: https://github.com/apache/pekko/pull/1239
Motivation:
Refactory UnfoldResourceSource a little instead of https://github.com/apache/pekko/pull/615
Result:
Clean code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org
Re: [PR] chore: Refactory UnfoldResourceSource. [pekko]
Posted by "jxnu-liguobin (via GitHub)" <gi...@apache.org>.
jxnu-liguobin commented on code in PR #1239:
URL: https://github.com/apache/pekko/pull/1239#discussion_r1545144014
##########
stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala:
##########
@@ -33,66 +34,69 @@ import pekko.stream.stage._
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSource.out")
override val shape = SourceShape(out)
- override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
+ override def initialAttributes: Attributes =
+ DefaultAttributes.unfoldResourceSource and SourceLocation.forLambda(create)
- def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var open = false
- var blockingStream: S = _
- setHandler(out, this)
+ def createLogic(inheritedAttributes: Attributes) =
+ new GraphStageLogic(shape) with OutHandler {
+ private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private var open = false
+ private var resource: S = _
- override def preStart(): Unit = {
- blockingStream = create()
- open = true
- }
+ override def preStart(): Unit = {
+ resource = create()
+ open = true
+ }
- @tailrec
- final override def onPull(): Unit = {
- var resumingMode = false
- try {
- readData(blockingStream) match {
- case Some(data) => push(out, data)
- case None => closeStage()
- }
- } catch {
- case NonFatal(ex) =>
- decider(ex) match {
- case Supervision.Stop =>
- open = false
- close(blockingStream)
- failStage(ex)
- case Supervision.Restart =>
- restartState()
- resumingMode = true
- case Supervision.Resume =>
- resumingMode = true
+ @tailrec
+ final override def onPull(): Unit = {
+ var resumingMode = false
+ try {
+ readData(resource) match {
+ case Some(data) => push(out, data)
+ case None => closeStage()
}
+ } catch {
+ case NonFatal(ex) =>
+ decider(ex) match {
+ case Supervision.Stop =>
+ open = false
Review Comment:
indent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org
Re: [PR] chore: Refactory UnfoldResourceSource. [pekko]
Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1239:
URL: https://github.com/apache/pekko/pull/1239#discussion_r1545158988
##########
stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala:
##########
@@ -33,66 +34,69 @@ import pekko.stream.stage._
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSource.out")
override val shape = SourceShape(out)
- override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
+ override def initialAttributes: Attributes =
+ DefaultAttributes.unfoldResourceSource and SourceLocation.forLambda(create)
- def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var open = false
- var blockingStream: S = _
- setHandler(out, this)
+ def createLogic(inheritedAttributes: Attributes) =
+ new GraphStageLogic(shape) with OutHandler {
+ private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private var open = false
+ private var resource: S = _
- override def preStart(): Unit = {
- blockingStream = create()
- open = true
- }
+ override def preStart(): Unit = {
+ resource = create()
+ open = true
+ }
- @tailrec
- final override def onPull(): Unit = {
- var resumingMode = false
- try {
- readData(blockingStream) match {
- case Some(data) => push(out, data)
- case None => closeStage()
- }
- } catch {
- case NonFatal(ex) =>
- decider(ex) match {
- case Supervision.Stop =>
- open = false
- close(blockingStream)
- failStage(ex)
- case Supervision.Restart =>
- restartState()
- resumingMode = true
- case Supervision.Resume =>
- resumingMode = true
+ @tailrec
+ final override def onPull(): Unit = {
+ var resumingMode = false
+ try {
+ readData(resource) match {
+ case Some(data) => push(out, data)
+ case None => closeStage()
}
+ } catch {
+ case NonFatal(ex) =>
+ decider(ex) match {
+ case Supervision.Stop =>
+ open = false
Review Comment:
It's formatted with scala-cli
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org
Re: [PR] chore: Refactory UnfoldResourceSource. [pekko]
Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on code in PR #1239:
URL: https://github.com/apache/pekko/pull/1239#discussion_r1545571514
##########
stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSource.scala:
##########
@@ -33,66 +34,69 @@ import pekko.stream.stage._
extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSource.out")
override val shape = SourceShape(out)
- override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
+ override def initialAttributes: Attributes =
+ DefaultAttributes.unfoldResourceSource and SourceLocation.forLambda(create)
- def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
- lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
- var open = false
- var blockingStream: S = _
- setHandler(out, this)
+ def createLogic(inheritedAttributes: Attributes) =
+ new GraphStageLogic(shape) with OutHandler {
+ private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
+ private var open = false
+ private var resource: S = _
- override def preStart(): Unit = {
- blockingStream = create()
- open = true
- }
+ override def preStart(): Unit = {
+ resource = create()
+ open = true
+ }
- @tailrec
- final override def onPull(): Unit = {
- var resumingMode = false
- try {
- readData(blockingStream) match {
- case Some(data) => push(out, data)
- case None => closeStage()
- }
- } catch {
- case NonFatal(ex) =>
- decider(ex) match {
- case Supervision.Stop =>
- open = false
- close(blockingStream)
- failStage(ex)
- case Supervision.Restart =>
- restartState()
- resumingMode = true
- case Supervision.Resume =>
- resumingMode = true
+ @tailrec
+ final override def onPull(): Unit = {
+ var resumingMode = false
+ try {
+ readData(resource) match {
+ case Some(data) => push(out, data)
+ case None => closeStage()
}
+ } catch {
+ case NonFatal(ex) =>
+ decider(ex) match {
+ case Supervision.Stop =>
+ open = false
Review Comment:
You can review it with `whitespace:off` in the review tab.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org
Re: [PR] chore: Refactor UnfoldResourceSource. [pekko]
Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin merged PR #1239:
URL: https://github.com/apache/pekko/pull/1239
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org