You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ranjan Dahal (Jira)" <ji...@apache.org> on 2022/01/29 05:58:00 UTC

[jira] [Comment Edited] (BEAM-13298) Add "withResults()" for KafkaIO.write

    [ https://issues.apache.org/jira/browse/BEAM-13298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484078#comment-17484078 ] 

Ranjan Dahal edited comment on BEAM-13298 at 1/29/22, 5:57 AM:
---------------------------------------------------------------

Happy to review the PR. 

I would like to understand the use case for returning an output collection from a Write transform. In general, Write transform doesn't modify the PCollection. I would imagine we are just looking to return the same PCollection that is passed to the Write transform. I do see the value of returning an output in some cases where read/transformation/write happens at the same chaining of transforms. However, I also think, we should have an option to return void because in some cases, user may not care about the output. The JdbcIO has both implementation that allows flexibility based on the use case the user has. 

To better understand this, see below code snippet with comments:
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) ;//Initial Read transform

PCollection<T> collectionAfterFirstWrite = readCollection.apply(KafkaIO.WriteRecordsWithOutput());  //Aren't both readCollection and collectionAfterFirstWrite are the same? These two collections will increase memory footprint as PCollection is immutable

readCollection.apply(Wait.on(collectionAfterFirstWrite))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of collectionAfterFirstWrite closes.
     .apply(ParDo.of(...write to other destination...));

{code}
My initial thought process behind getting the PCollection<Void> was to be able to wait on the prior transform as such.
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) ;//Initial Read transform

PCollection<Void> collectionAfterWrite = readCollection .apply(KafkaIO.WriteRecordsWithOutput()); //finalCollection and collectionAfterWrite are not the same. The Void PCollection memory footprint should be negligible compared to PCollection<T>

readCollection.apply(Wait.on(collectionAfterFirstWrite)) // Windows of this intermediate PCollection will be processed no earlier than when // the respective window of collectionAfterFirstWrite closes. .apply(ParDo.of(...write to other destination...));{code}
 

With all that said, it would be ideal to get WriteRecordsWithOutput that returns an output collection and WriteRecordsWithOutput.withVoidResults() that returns PCollection<Void>. 

In addition, it would be nice to be consistent across multiple IO's to have same class or method *names* for similar kind of behaviors.  

 


was (Author: rdahal):
Happy to review the PR. 

I would like to understand the use case for returning an output collection from a Write transform. In general, Write transform doesn't modify the PCollection. I would imagine we are just looking to return the same PCollection that is passed to the Write transform. I do see the value of returning an output in some cases where read/transformation/write happens at the same chaining of transforms. However, I also think, we should have an option to return void because in some cases, user may not care about the output. The JdbcIO has both implementation that allows flexibility based on the use case the user has. 

To better understand this, see below code snippet with comments:
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) ;//Initial Read transform

PCollection<T> collectionAfterFirstWrite = readCollection.apply(KafkaIO.WriteRecordsWithOutput());  //Aren't both readCollection and collectionAfterFirstWrite are the same? These two collections will increase memory footprint as PCollection is immutable

readCollection.apply(Wait.on(collectionAfterFirstWrite))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of collectionAfterFirstWrite closes.
     .apply(ParDo.of(...write to other destination...));

{code}
My initial thought process behind getting the PCollection<void> was to be able to wait on the prior transform as such.
{code:java}
PCollection<T> readCollection = pipeline.apply(KafkaIO.<String, String>read()) ;//Initial Read transform

PCollection<void> collectionAfterWrite = readCollection .apply(KafkaIO.WriteRecordsWithOutput()); //finalCollection and collectionAfterWrite are not the same. The void PCollection memory footprint should be negligible compared to PCollection<T>

readCollection.apply(Wait.on(collectionAfterFirstWrite)) // Windows of this intermediate PCollection will be processed no earlier than when // the respective window of collectionAfterFirstWrite closes. .apply(ParDo.of(...write to other destination...));{code}
 

With all that said, it would be ideal to get WriteRecordsWithOutput that returns an output collection and WriteRecordsWithOutput.withVoidResults() that returns PCollection<void>. 

In addition, it would be nice to be consistent across multiple IO's to have same class or method *names* for similar kind of behaviors.  

 

> Add "withResults()" for KafkaIO.write
> -------------------------------------
>
>                 Key: BEAM-13298
>                 URL: https://issues.apache.org/jira/browse/BEAM-13298
>             Project: Beam
>          Issue Type: Sub-task
>          Components: io-ideas, io-java-kafka
>         Environment: Production
>            Reporter: Ranjan Dahal
>            Assignee: Wei Fong Hsia
>            Priority: P2
>              Labels: KafkaIO
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> I am looking at use case where we have to wait until the Kafka Write operation is completed before we can move forward with another transform. Currently, JdbcIO support withResults() which waits for the previous transform to complete as part of Wait.on(Signal) and moves on to the next. 
> Similarly, it would be very beneficial to have this capability on KafkaIO (and others like PubSubIO, BigQueryIO etc). 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)