You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "He-Pin (via GitHub)" <gi...@apache.org> on 2023/08/06 10:15:06 UTC

[GitHub] [incubator-pekko] He-Pin commented on a diff in pull request #408: =str Make use of ConucrrentHashMap instead of List to reduce contention.

He-Pin commented on code in PR #408:
URL: https://github.com/apache/incubator-pekko/pull/408#discussion_r1285189907


##########
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala:
##########
@@ -1282,7 +1283,9 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
   private var callbacksWaitingForInterpreter: List[ConcurrentAsyncCallback[_]] = Nil
   // is used for two purposes: keep track of running callbacks and signal that the
   // stage has stopped to fail incoming async callback invocations by being set to null
-  private val asyncCallbacksInProgress = new AtomicReference[List[Promise[Done]]](Nil)
+  // Using ConcurrentHashMap's KeySetView as Set to track the inProgress async callbacks.
+  private val asyncCallbacksInProgress: AtomicReference[java.util.Set[Promise[Done]]] =
+    new AtomicReference(new ConcurrentHashMap[Promise[Done], NotUsed]().keySet(NotUsed))

Review Comment:
   The `AtomicRef` is needed as the callback can be called in a `AsyncCallback` which can be accessed from another thread.
   Otherwise it will be a `@volatile asyncCallbacksInProgress` I think to indicate the stage has been stoped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org