You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@pekko.apache.org by "mdedetrich (via GitHub)" <gi...@apache.org> on 2023/12/24 00:27:33 UTC

[I] Add a `Sink.extract` function which pre-materializes data from a `Sink` and lets you replay that entire data stream into an existing Sink [incubator-pekko]

mdedetrich opened a new issue, #867:
URL: https://github.com/apache/incubator-pekko/issues/867

   This is another commonplace problem that happens when using pekko-streams which arises when you have a `Sink` that has some parameters (i.e. lets say an `id`) but the data to create that `id` is in the stream that is being sent to the `Sink` itself.
   
   More concretely, lets assume we have a simple sink that stores some data in some object storage, i.e. `def putData(id: String): Sink[ByteString, _]`. In order to call `putData` we need an `id` however that is contained within the data being sent to the sink itself, i.e.
   
   ```scala
   val source = Source.single(
      ByteString("id:1;moreData")
   )
   
   source.to(putData(???))
   ```
   In this case we want the extract the id (i.e. `???` in above code snippet) from the actual data (i.e. "id:1"). Most importantly is that assuming we do figure out a way to extract the id, we still want all of the data (i.e. `"id:1;moreData"`) to be sent to the `Sink`.
   
   Now while there are tools to do this, i.e.`Framing` they aren't exactly idea. Framing usually only works when you have a very basic/primitive structure (i.e. lets say comma or new line delimited). If we have more complex data structures, lets say JSON we do have [options available](https://pekko.apache.org/docs/pekko-connectors/current/data-transformations/json.html#example) but solving the issue is still messy because honestly `Framing` is not really the right tool, i.e. its not that we want to frame the data coming in in a certain way but rather we wan't to consume + buffer the data until we manage to extract some data (i.e. the `id`) field and then once that `id` field is extract then send the buffered data along with the rest of the incoming data to a supplied `Sink`, i.e.
   
   ```scala
   /**
    * Defers invoking the `create` function to create a sink until data from upstream is sent
    * to the `extractor` Sink and that `extractor` Sink completes. The materialized value
    * of completed `extractor` Sink is provided as a parameter to `create`.
    *
    * The primary use of this function is when you have a pre-existing `Sink` that requires
    * input which is derived from the data itself, i.e. the Sink requires an id as a parameter
    * however that id is contained within the upstream data that is being sent to the Sink
    *
    * @param extractor A Sink which upon completion sends the materialized value as
    *                  a parameter into the `create` function
    * @param extractorFinalizer A sink which tells how to complete the extractor Flow.
    * @param inclusive Whether the already sent data to the `extractor` Sink should be included
    *                  alongside with the rest of the upstream data that is yet to be processed
    *                  to the `create` Sink.
    */
   def extract[T, E, M](extractor: Graph[FlowShape[T, E], _], extractorFinalizer: Graph[SinkShape[E], _], inclusive: Boolean, create: E => Sink[T, M]): Sink[T, M] =
     ???
   
   def extract[T, E, M](extractor: Graph[FlowShape[T, E], _], extractorFinalizer: Graph[SinkShape[E], _], create: E => Sink[T, M]): Sink[T, M] =
     extract(extractor, extractorFinalizer, inclusive = true, create)
   ```
   
   The design of this is still debatable, i.e. of particular note is how to model the `extractor` part. My initial thought was to have `extractor` as a `Sink` but I realized that practically creating a `Sink` from a `Flow` (which is how the extractor's are going to be typically designed) is quite cumbersome so I came up with this API instead, the usage would look like this
   
   ```scala
   val source: Source[ByteString, NotUsed] = Source.single(ByteString(json))
   val extractorFlow: Flow[ByteString, ByteString, NotUsed] = JsonReader.select("$.id")
   source.via(Sink.extract(extractorFlow, Sink.head, id => putData(id.utf8String)))
   ```
   
   @He-Pin @pjfanning @jrudolph @raboof Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org


Re: [I] Add a `Sink.extract` function which pre-materializes data from a `Sink` and lets you replay that entire data stream into an existing Sink [incubator-pekko]

Posted by "He-Pin (via GitHub)" <gi...@apache.org>.
He-Pin commented on issue #867:
URL: https://github.com/apache/incubator-pekko/issues/867#issuecomment-1868443684

   What about the `flatMapPrefix`? I think you can do the same with `flatMapPrefix(1)` and then prepend the first element to the flow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@pekko.apache.org
For additional commands, e-mail: notifications-help@pekko.apache.org