You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "pjfanning (via GitHub)" <gi...@apache.org> on 2023/08/04 08:41:24 UTC

[GitHub] [incubator-pekko] pjfanning commented on a diff in pull request #275: +str Add `startAfterNrOfConsumers` to BroadcastHub.

pjfanning commented on code in PR #275:
URL: https://github.com/apache/incubator-pekko/pull/275#discussion_r1284152938


##########
stream/src/main/scala/org/apache/pekko/stream/javadsl/Hub.scala:
##########
@@ -164,6 +164,34 @@ object BroadcastHub {
     pekko.stream.scaladsl.BroadcastHub.sink[T](bufferSize).mapMaterializedValue(_.asJava).asJava
   }
 
+  /**
+   * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set
+   * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized
+   * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the
+   * broadcast elements from the original [[Sink]].
+   *
+   * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own
+   * [[Source]] for consuming the [[Sink]] of that materialization.
+   *
+   * If the original [[Sink]] is failed, then the failure is immediately propagated to all of its materialized
+   * [[Source]]s (possibly jumping over already buffered elements). If the original [[Sink]] is completed, then
+   * all corresponding [[Source]]s are completed. Both failure and normal completion is "remembered" and later
+   * materializations of the [[Source]] will see the same (failure or completion) state. [[Source]]s that are
+   * cancelled are simply removed from the dynamic set of consumers.
+   *
+   * @param clazz                   Type of elements this hub emits and consumes
+   * @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
+   *                                This is only used initially when the operator is starting up, i.e. it is not honored when consumers have
+   *                                been removed (canceled).
+   * @param bufferSize              Buffer size used by the producer. Gives an upper bound on how "far" from each other two
+   *                                concurrent consumers can be in terms of element. If the buffer is full, the producer
+   *                                is backpressured. Must be a power of two and less than 4096.
+   */

Review Comment:
   can you add `@since 1.1.0` to all the new functions added to the non-test source?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org