You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ismaël Mejía <ie...@gmail.com> on 2021/03/24 21:35:37 UTC

Re: Write to multiple IOs in linear fashion

+dev

Since we all agree that we should return something different than
PDone the real question is what should we return.
As a reminder we had a pretty interesting discussion about this
already in the past but uniformization of our return values has not
happened.
This thread is worth reading for Vincent or anyone who wants to
contribute Write transforms that return.
https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E

> Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible.

Periodic reminder most IOs are still Experimental so I suppose it is
worth to the maintainers to judge if the upgrade to return someething
different of PDone is worth, in that case we can deprecate and remove
the previous signature in short time (2 releases was the average for
previous cases).


On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
<ar...@gmail.com> wrote:
>
> I thought that was said about returning a PCollection of write results as it’s done in other IOs (as I mentioned as examples) that have _additional_ write methods, like “withWriteResults()” etc, that return PTransform<…, PCollection<WriteResults>>.
> In this case, we keep backwards compatibility and just add new funtionality. Though, we need to follow the same pattern for user API and maybe even naming for this feature across different IOs (like we have for "readAll()” methods).
>
>  I agree that we have to avoid returning PDone for such cases.
>
> On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com> wrote:
>
> Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible. PRs to add non-PDone returning variants (probably as another option to the builders) that compose well with Wait, etc. would be welcome.
>
> On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <ar...@gmail.com> wrote:
>>
>> In this way, I think “Wait” PTransform should work for you but, as it was mentioned before, it doesn’t work with PDone, only with PCollection as a signal.
>>
>> Since you already adjusted your own writer for that, it would be great to contribute it back to Beam in the way as it was done for other IOs (for example, JdbcIO [1] or BigtableIO [2])
>>
>> In general, I think we need to have it for all IOs, at least to use with “Wait” because this pattern it's quite often required.
>>
>> [1] https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>> [2] https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>
>> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com> wrote:
>>
>> No, it only needs to ensure that one record seen on Pubsub has successfully written to a database.  So "record by record" is fine, or even "bundle".
>>
>> ~Vincent
>>
>>
>> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <ar...@gmail.com> wrote:
>>>
>>> Do you want to wait for ALL records are written for Cassandra and then write all successfully written records to PubSub or it should be performed "record by record"?
>>>
>>> On 24 Mar 2021, at 04:58, Vincent Marquez <vi...@gmail.com> wrote:
>>>
>>> I have a common use case where my pipeline looks like this:
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>>>
>>> I do NOT want my pipeline to look like the following:
>>>
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>                                                          |
>>>                                                           -> PubsubIO.write
>>>
>>> Because I need to ensure that only items written to Pubsub have successfully finished a (quorum) write.
>>>
>>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use it here so I often roll my own 'writer', but maybe there is a recommended way of doing this?
>>>
>>> Thanks in advance for any help.
>>>
>>> ~Vincent
>>>
>>>
>>
>

Re: Write to multiple IOs in linear fashion

Posted by Alexey Romanenko <ar...@gmail.com>.
I think you are right, since "writer.close()”  contains a business logic, it must be moved to @FinishBundle. The same thing about DeleteFn.
I’ll create a Jira for that.

> On 25 Mar 2021, at 00:49, Kenneth Knowles <ke...@apache.org> wrote:
> 
> Alex's idea sounds good and like what Vincent maybe implemented. I am just reading really quickly so sorry if I missed something...
> 
> Checking out the code for the WriteFn<T> I see a big problem:
> 
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
> 
>     @ProcessElement
>       public void processElement(ProcessContext c) throws ExecutionException, InterruptedException {
>       writer.mutate(c.element());
>     }
> 
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
> 
> It is only in writer.close() that all async writes are waited on. This needs to happen in @FinishBundle.
> 
> Did you discover this when implementing your own Cassandra.Write?
> 
> Until you have waited on the future, you should not output the element as "has been written". And you cannot output from the @TearDown method which is just for cleaning up resources.
> 
> Am I reading this wrong?
> 
> Kenn
> 
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajamato@google.com <ma...@google.com>> wrote:
> How about a PCollection containing every element which was successfully written?
> Basically the same things which were passed into it.
> 
> Then you could act on every element after its been successfully written to the sink.
> 
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <iemejia@gmail.com <ma...@gmail.com>> wrote:
> +dev
> 
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
> 
> My proposal is that one returns a PCollection<?> that consists, internally, of something contentless like nulls. This is future compatible with returning something more maningful based on the source source or write process itself, but at least this would be followable. 
>  
> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E <https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E>
> 
> Yeah, we should go ahead and finally do something. 
>  
> 
> > Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible.
> 
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
> 
> 
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> >
> > I thought that was said about returning a PCollection of write results as it’s done in other IOs (as I mentioned as examples) that have _additional_ write methods, like “withWriteResults()” etc, that return PTransform<…, PCollection<WriteResults>>.
> > In this case, we keep backwards compatibility and just add new funtionality. Though, we need to follow the same pattern for user API and maybe even naming for this feature across different IOs (like we have for "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible. PRs to add non-PDone returning variants (probably as another option to the builders) that compose well with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it was mentioned before, it doesn’t work with PDone, only with PCollection as a signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great to contribute it back to Beam in the way as it was done for other IOs (for example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use with “Wait” because this pattern it's quite often required.
> >>
> >> [1] https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078 <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078>
> >> [2] https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715 <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715>
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marquez@gmail.com <ma...@gmail.com>> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has successfully written to a database.  So "record by record" is fine, or even "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then write all successfully written records to PubSub or it should be performed "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vincent.marquez@gmail.com <ma...@gmail.com>> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>                                                          |
> >>>                                                           -> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use it here so I often roll my own 'writer', but maybe there is a recommended way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >


Re: Write to multiple IOs in linear fashion

Posted by Kenneth Knowles <ke...@apache.org>.
On Thu, Mar 25, 2021 at 12:55 PM Robert Bradshaw <ro...@google.com>
wrote:

> On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez <vi...@gmail.com>
> wrote:
>
>>
>> *~Vincent*
>>
>>
>> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> The reason I was checking out the code is that sometimes a natural thing
>>> to output would be a summary of what was written. So each chunk of writes
>>> and the final chunk written in @FinishBundle. This is, for example, what
>>> SQL engines do (output # of rows written).
>>>
>>> You could output both the summary and the full list of written elements
>>> to different outputs, and users can choose. Outputs that are never consumed
>>> should be very low or zero cost.n
>>>
>>>
>> I like this approach.  I would much prefer two outputs (one of which is
>> all elements written) to returning an existential/wildcard PCollection.
>>
>
> +1, this would work well too. Returning a PCollectionTuple is extensible
> too, as one could add more (or better) outputs in the future without
> changing the signature.
>

This comment is dangerously close to sparking a philosophical conversation!

Kenn


>
>>
>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> Yeah, the entire input is not always what is needed, and can generally
>>>> be achieved via
>>>>
>>>>     input -> wait(side input of write) -> do something with the input
>>>>
>>>> Of course one could also do
>>>>
>>>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>>>> CombineGlobally(TrivialCombineFn)
>>>>
>>>> to reduce this to a more minimal set with at least one element per
>>>> Window.
>>>>
>>>> The file writing operations emit the actual files that were written,
>>>> which can be handy. My suggestion of PCollection<?> was just so that we can
>>>> emit something usable, and decide exactly what is the most useful is later.
>>>>
>>>>
>>>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I believe that the Wait transform turns this output into a side input,
>>>>> so outputting the input PCollection might be problematic.
>>>>>
>>>>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>>>>> just reading really quickly so sorry if I missed something...
>>>>>>
>>>>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>>>>
>>>>>>     @Setup
>>>>>>     public void setup() {
>>>>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>>>>     }
>>>>>>
>>>>>>     @ProcessElement
>>>>>>       public void processElement(ProcessContext c) throws
>>>>>> ExecutionException, InterruptedException {
>>>>>>       writer.mutate(c.element());
>>>>>>     }
>>>>>>
>>>>>>     @Teardown
>>>>>>     public void teardown() throws Exception {
>>>>>>       writer.close();
>>>>>>       writer = null;
>>>>>>     }
>>>>>>
>>>>>> It is only in writer.close() that all async writes are waited on.
>>>>>> This needs to happen in @FinishBundle.
>>>>>>
>>>>>> Did you discover this when implementing your own Cassandra.Write?
>>>>>>
>>>>>> Until you have waited on the future, you should not output the
>>>>>> element as "has been written". And you cannot output from the @TearDown
>>>>>> method which is just for cleaning up resources.
>>>>>>
>>>>>> Am I reading this wrong?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> How about a PCollection containing every element which was
>>>>>>> successfully written?
>>>>>>> Basically the same things which were passed into it.
>>>>>>>
>>>>>>> Then you could act on every element after its been successfully
>>>>>>> written to the sink.
>>>>>>>
>>>>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> +dev
>>>>>>>>>
>>>>>>>>> Since we all agree that we should return something different than
>>>>>>>>> PDone the real question is what should we return.
>>>>>>>>>
>>>>>>>>
>>>>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>>>>> internally, of something contentless like nulls. This is future compatible
>>>>>>>> with returning something more maningful based on the source source or write
>>>>>>>> process itself, but at least this would be followable.
>>>>>>>>
>>>>>>>>
>>>>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>>>>> already in the past but uniformization of our return values has not
>>>>>>>>> happened.
>>>>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>>>>> contribute Write transforms that return.
>>>>>>>>>
>>>>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>>>>
>>>>>>>>
>>>>>>>> Yeah, we should go ahead and finally do something.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>>>> changing it now would be backwards incompatible.
>>>>>>>>>
>>>>>>>>> Periodic reminder most IOs are still Experimental so I suppose it
>>>>>>>>> is
>>>>>>>>> worth to the maintainers to judge if the upgrade to return
>>>>>>>>> someething
>>>>>>>>> different of PDone is worth, in that case we can deprecate and
>>>>>>>>> remove
>>>>>>>>> the previous signature in short time (2 releases was the average
>>>>>>>>> for
>>>>>>>>> previous cases).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>>>>> <ar...@gmail.com> wrote:
>>>>>>>>> >
>>>>>>>>> > I thought that was said about returning a PCollection of write
>>>>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>>>>>> "readAll()” methods).
>>>>>>>>> >
>>>>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>>>>> >
>>>>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>>>>>> wrote:
>>>>>>>>> >
>>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>>>>> returning variants (probably as another option to the builders) that
>>>>>>>>> compose well with Wait, etc. would be welcome.
>>>>>>>>> >
>>>>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> In this way, I think “Wait” PTransform should work for you but,
>>>>>>>>> as it was mentioned before, it doesn’t work with PDone, only with
>>>>>>>>> PCollection as a signal.
>>>>>>>>> >>
>>>>>>>>> >> Since you already adjusted your own writer for that, it would
>>>>>>>>> be great to contribute it back to Beam in the way as it was done for other
>>>>>>>>> IOs (for example, JdbcIO [1] or BigtableIO [2])
>>>>>>>>> >>
>>>>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>>>>> >>
>>>>>>>>> >> [1]
>>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>>>>> >> [2]
>>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>>>>> >>
>>>>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>>>> >>
>>>>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>>>>>> "bundle".
>>>>>>>>> >>
>>>>>>>>> >> ~Vincent
>>>>>>>>> >>
>>>>>>>>> >>
>>>>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> Do you want to wait for ALL records are written for Cassandra
>>>>>>>>> and then write all successfully written records to PubSub or it should be
>>>>>>>>> performed "record by record"?
>>>>>>>>> >>>
>>>>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>>>> >>>
>>>>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>>>>> PubSubIO.write
>>>>>>>>> >>>
>>>>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>>>>> >>>
>>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>>>>> >>>                                                          |
>>>>>>>>> >>>                                                           ->
>>>>>>>>> PubsubIO.write
>>>>>>>>> >>>
>>>>>>>>> >>> Because I need to ensure that only items written to Pubsub
>>>>>>>>> have successfully finished a (quorum) write.
>>>>>>>>> >>>
>>>>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>>>>> actually use it here so I often roll my own 'writer', but maybe there is a
>>>>>>>>> recommended way of doing this?
>>>>>>>>> >>>
>>>>>>>>> >>> Thanks in advance for any help.
>>>>>>>>> >>>
>>>>>>>>> >>> ~Vincent
>>>>>>>>> >>>
>>>>>>>>> >>>
>>>>>>>>> >>
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Re: Write to multiple IOs in linear fashion

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez <vi...@gmail.com>
wrote:

>
> *~Vincent*
>
>
> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> The reason I was checking out the code is that sometimes a natural thing
>> to output would be a summary of what was written. So each chunk of writes
>> and the final chunk written in @FinishBundle. This is, for example, what
>> SQL engines do (output # of rows written).
>>
>> You could output both the summary and the full list of written elements
>> to different outputs, and users can choose. Outputs that are never consumed
>> should be very low or zero cost.n
>>
>>
> I like this approach.  I would much prefer two outputs (one of which is
> all elements written) to returning an existential/wildcard PCollection.
>

+1, this would work well too. Returning a PCollectionTuple is extensible
too, as one could add more (or better) outputs in the future without
changing the signature.


>
>
>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Yeah, the entire input is not always what is needed, and can generally
>>> be achieved via
>>>
>>>     input -> wait(side input of write) -> do something with the input
>>>
>>> Of course one could also do
>>>
>>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>>> CombineGlobally(TrivialCombineFn)
>>>
>>> to reduce this to a more minimal set with at least one element per
>>> Window.
>>>
>>> The file writing operations emit the actual files that were written,
>>> which can be handy. My suggestion of PCollection<?> was just so that we can
>>> emit something usable, and decide exactly what is the most useful is later.
>>>
>>>
>>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I believe that the Wait transform turns this output into a side input,
>>>> so outputting the input PCollection might be problematic.
>>>>
>>>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>>>> just reading really quickly so sorry if I missed something...
>>>>>
>>>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>>>
>>>>>     @Setup
>>>>>     public void setup() {
>>>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>>>     }
>>>>>
>>>>>     @ProcessElement
>>>>>       public void processElement(ProcessContext c) throws
>>>>> ExecutionException, InterruptedException {
>>>>>       writer.mutate(c.element());
>>>>>     }
>>>>>
>>>>>     @Teardown
>>>>>     public void teardown() throws Exception {
>>>>>       writer.close();
>>>>>       writer = null;
>>>>>     }
>>>>>
>>>>> It is only in writer.close() that all async writes are waited on. This
>>>>> needs to happen in @FinishBundle.
>>>>>
>>>>> Did you discover this when implementing your own Cassandra.Write?
>>>>>
>>>>> Until you have waited on the future, you should not output the element
>>>>> as "has been written". And you cannot output from the @TearDown method
>>>>> which is just for cleaning up resources.
>>>>>
>>>>> Am I reading this wrong?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>>>>>
>>>>>> How about a PCollection containing every element which was
>>>>>> successfully written?
>>>>>> Basically the same things which were passed into it.
>>>>>>
>>>>>> Then you could act on every element after its been successfully
>>>>>> written to the sink.
>>>>>>
>>>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +dev
>>>>>>>>
>>>>>>>> Since we all agree that we should return something different than
>>>>>>>> PDone the real question is what should we return.
>>>>>>>>
>>>>>>>
>>>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>>>> internally, of something contentless like nulls. This is future compatible
>>>>>>> with returning something more maningful based on the source source or write
>>>>>>> process itself, but at least this would be followable.
>>>>>>>
>>>>>>>
>>>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>>>> already in the past but uniformization of our return values has not
>>>>>>>> happened.
>>>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>>>> contribute Write transforms that return.
>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>>>
>>>>>>>
>>>>>>> Yeah, we should go ahead and finally do something.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>>> changing it now would be backwards incompatible.
>>>>>>>>
>>>>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>>>>> worth to the maintainers to judge if the upgrade to return
>>>>>>>> someething
>>>>>>>> different of PDone is worth, in that case we can deprecate and
>>>>>>>> remove
>>>>>>>> the previous signature in short time (2 releases was the average for
>>>>>>>> previous cases).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>>>> <ar...@gmail.com> wrote:
>>>>>>>> >
>>>>>>>> > I thought that was said about returning a PCollection of write
>>>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>>>>> "readAll()” methods).
>>>>>>>> >
>>>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>>>> >
>>>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>>>> returning variants (probably as another option to the builders) that
>>>>>>>> compose well with Wait, etc. would be welcome.
>>>>>>>> >
>>>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> In this way, I think “Wait” PTransform should work for you but,
>>>>>>>> as it was mentioned before, it doesn’t work with PDone, only with
>>>>>>>> PCollection as a signal.
>>>>>>>> >>
>>>>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>>>>> >>
>>>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>>>> >>
>>>>>>>> >> [1]
>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>>>> >> [2]
>>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>>>> >>
>>>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>>>>> "bundle".
>>>>>>>> >>
>>>>>>>> >> ~Vincent
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Do you want to wait for ALL records are written for Cassandra
>>>>>>>> and then write all successfully written records to PubSub or it should be
>>>>>>>> performed "record by record"?
>>>>>>>> >>>
>>>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>>> >>>
>>>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>>>> PubSubIO.write
>>>>>>>> >>>
>>>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>>>> >>>
>>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>>>> >>>                                                          |
>>>>>>>> >>>                                                           ->
>>>>>>>> PubsubIO.write
>>>>>>>> >>>
>>>>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>>>>> successfully finished a (quorum) write.
>>>>>>>> >>>
>>>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>>>> actually use it here so I often roll my own 'writer', but maybe there is a
>>>>>>>> recommended way of doing this?
>>>>>>>> >>>
>>>>>>>> >>> Thanks in advance for any help.
>>>>>>>> >>>
>>>>>>>> >>> ~Vincent
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>

Re: Write to multiple IOs in linear fashion

Posted by Vincent Marquez <vi...@gmail.com>.
*~Vincent*


On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles <ke...@apache.org> wrote:

> The reason I was checking out the code is that sometimes a natural thing
> to output would be a summary of what was written. So each chunk of writes
> and the final chunk written in @FinishBundle. This is, for example, what
> SQL engines do (output # of rows written).
>
> You could output both the summary and the full list of written elements to
> different outputs, and users can choose. Outputs that are never consumed
> should be very low or zero cost.n
>
>
I like this approach.  I would much prefer two outputs (one of which is all
elements written) to returning an existential/wildcard PCollection.



> Kenn
>
> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Yeah, the entire input is not always what is needed, and can generally be
>> achieved via
>>
>>     input -> wait(side input of write) -> do something with the input
>>
>> Of course one could also do
>>
>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>> CombineGlobally(TrivialCombineFn)
>>
>> to reduce this to a more minimal set with at least one element per
>> Window.
>>
>> The file writing operations emit the actual files that were written,
>> which can be handy. My suggestion of PCollection<?> was just so that we can
>> emit something usable, and decide exactly what is the most useful is later.
>>
>>
>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>>
>>> I believe that the Wait transform turns this output into a side input,
>>> so outputting the input PCollection might be problematic.
>>>
>>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>>> just reading really quickly so sorry if I missed something...
>>>>
>>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>>
>>>>     @Setup
>>>>     public void setup() {
>>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>>     }
>>>>
>>>>     @ProcessElement
>>>>       public void processElement(ProcessContext c) throws
>>>> ExecutionException, InterruptedException {
>>>>       writer.mutate(c.element());
>>>>     }
>>>>
>>>>     @Teardown
>>>>     public void teardown() throws Exception {
>>>>       writer.close();
>>>>       writer = null;
>>>>     }
>>>>
>>>> It is only in writer.close() that all async writes are waited on. This
>>>> needs to happen in @FinishBundle.
>>>>
>>>> Did you discover this when implementing your own Cassandra.Write?
>>>>
>>>> Until you have waited on the future, you should not output the element
>>>> as "has been written". And you cannot output from the @TearDown method
>>>> which is just for cleaning up resources.
>>>>
>>>> Am I reading this wrong?
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>>>>
>>>>> How about a PCollection containing every element which was
>>>>> successfully written?
>>>>> Basically the same things which were passed into it.
>>>>>
>>>>> Then you could act on every element after its been successfully
>>>>> written to the sink.
>>>>>
>>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> +dev
>>>>>>>
>>>>>>> Since we all agree that we should return something different than
>>>>>>> PDone the real question is what should we return.
>>>>>>>
>>>>>>
>>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>>> internally, of something contentless like nulls. This is future compatible
>>>>>> with returning something more maningful based on the source source or write
>>>>>> process itself, but at least this would be followable.
>>>>>>
>>>>>>
>>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>>> already in the past but uniformization of our return values has not
>>>>>>> happened.
>>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>>> contribute Write transforms that return.
>>>>>>>
>>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>>
>>>>>>
>>>>>> Yeah, we should go ahead and finally do something.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>> changing it now would be backwards incompatible.
>>>>>>>
>>>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>>>> the previous signature in short time (2 releases was the average for
>>>>>>> previous cases).
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>>> <ar...@gmail.com> wrote:
>>>>>>> >
>>>>>>> > I thought that was said about returning a PCollection of write
>>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>>>> "readAll()” methods).
>>>>>>> >
>>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>>> >
>>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>>> returning variants (probably as another option to the builders) that
>>>>>>> compose well with Wait, etc. would be welcome.
>>>>>>> >
>>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>> >>
>>>>>>> >> In this way, I think “Wait” PTransform should work for you but,
>>>>>>> as it was mentioned before, it doesn’t work with PDone, only with
>>>>>>> PCollection as a signal.
>>>>>>> >>
>>>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>>>> >>
>>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>>> >>
>>>>>>> >> [1]
>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>>> >> [2]
>>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>>> >>
>>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>> >>
>>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>>>> "bundle".
>>>>>>> >>
>>>>>>> >> ~Vincent
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>> >>>
>>>>>>> >>> Do you want to wait for ALL records are written for Cassandra
>>>>>>> and then write all successfully written records to PubSub or it should be
>>>>>>> performed "record by record"?
>>>>>>> >>>
>>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>>> >>>
>>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>>> PubSubIO.write
>>>>>>> >>>
>>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>>> >>>
>>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>>> >>>                                                          |
>>>>>>> >>>                                                           ->
>>>>>>> PubsubIO.write
>>>>>>> >>>
>>>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>>>> successfully finished a (quorum) write.
>>>>>>> >>>
>>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>>> actually use it here so I often roll my own 'writer', but maybe there is a
>>>>>>> recommended way of doing this?
>>>>>>> >>>
>>>>>>> >>> Thanks in advance for any help.
>>>>>>> >>>
>>>>>>> >>> ~Vincent
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>

Re: Write to multiple IOs in linear fashion

Posted by Kenneth Knowles <ke...@apache.org>.
The reason I was checking out the code is that sometimes a natural thing to
output would be a summary of what was written. So each chunk of writes and
the final chunk written in @FinishBundle. This is, for example, what SQL
engines do (output # of rows written).

You could output both the summary and the full list of written elements to
different outputs, and users can choose. Outputs that are never consumed
should be very low or zero cost.

Kenn

On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <ro...@google.com> wrote:

> Yeah, the entire input is not always what is needed, and can generally be
> achieved via
>
>     input -> wait(side input of write) -> do something with the input
>
> Of course one could also do
>
> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
> CombineGlobally(TrivialCombineFn)
>
> to reduce this to a more minimal set with at least one element per Window.
>
> The file writing operations emit the actual files that were written, which
> can be handy. My suggestion of PCollection<?> was just so that we can emit
> something usable, and decide exactly what is the most useful is later.
>
>
> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>
>> I believe that the Wait transform turns this output into a side input, so
>> outputting the input PCollection might be problematic.
>>
>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>> just reading really quickly so sorry if I missed something...
>>>
>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>
>>>     @Setup
>>>     public void setup() {
>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>     }
>>>
>>>     @ProcessElement
>>>       public void processElement(ProcessContext c) throws
>>> ExecutionException, InterruptedException {
>>>       writer.mutate(c.element());
>>>     }
>>>
>>>     @Teardown
>>>     public void teardown() throws Exception {
>>>       writer.close();
>>>       writer = null;
>>>     }
>>>
>>> It is only in writer.close() that all async writes are waited on. This
>>> needs to happen in @FinishBundle.
>>>
>>> Did you discover this when implementing your own Cassandra.Write?
>>>
>>> Until you have waited on the future, you should not output the element
>>> as "has been written". And you cannot output from the @TearDown method
>>> which is just for cleaning up resources.
>>>
>>> Am I reading this wrong?
>>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>>>
>>>> How about a PCollection containing every element which was successfully
>>>> written?
>>>> Basically the same things which were passed into it.
>>>>
>>>> Then you could act on every element after its been successfully written
>>>> to the sink.
>>>>
>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> +dev
>>>>>>
>>>>>> Since we all agree that we should return something different than
>>>>>> PDone the real question is what should we return.
>>>>>>
>>>>>
>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>> internally, of something contentless like nulls. This is future compatible
>>>>> with returning something more maningful based on the source source or write
>>>>> process itself, but at least this would be followable.
>>>>>
>>>>>
>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>> already in the past but uniformization of our return values has not
>>>>>> happened.
>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>> contribute Write transforms that return.
>>>>>>
>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>
>>>>>
>>>>> Yeah, we should go ahead and finally do something.
>>>>>
>>>>>
>>>>>>
>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>> changing it now would be backwards incompatible.
>>>>>>
>>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>>> the previous signature in short time (2 releases was the average for
>>>>>> previous cases).
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>> <ar...@gmail.com> wrote:
>>>>>> >
>>>>>> > I thought that was said about returning a PCollection of write
>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>>> "readAll()” methods).
>>>>>> >
>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>> >
>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>> returning variants (probably as another option to the builders) that
>>>>>> compose well with Wait, etc. would be welcome.
>>>>>> >
>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>> >>
>>>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>>>> as a signal.
>>>>>> >>
>>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>>> >>
>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>> >>
>>>>>> >> [1]
>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>> >> [2]
>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>> >>
>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>> >>
>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>>> "bundle".
>>>>>> >>
>>>>>> >> ~Vincent
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>>>> then write all successfully written records to PubSub or it should be
>>>>>> performed "record by record"?
>>>>>> >>>
>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>> PubSubIO.write
>>>>>> >>>
>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>> >>>
>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>> >>>                                                          |
>>>>>> >>>                                                           ->
>>>>>> PubsubIO.write
>>>>>> >>>
>>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>>> successfully finished a (quorum) write.
>>>>>> >>>
>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>> actually use it here so I often roll my own 'writer', but maybe there is a
>>>>>> recommended way of doing this?
>>>>>> >>>
>>>>>> >>> Thanks in advance for any help.
>>>>>> >>>
>>>>>> >>> ~Vincent
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>

Re: Write to multiple IOs in linear fashion

Posted by Kenneth Knowles <ke...@apache.org>.
The reason I was checking out the code is that sometimes a natural thing to
output would be a summary of what was written. So each chunk of writes and
the final chunk written in @FinishBundle. This is, for example, what SQL
engines do (output # of rows written).

You could output both the summary and the full list of written elements to
different outputs, and users can choose. Outputs that are never consumed
should be very low or zero cost.

Kenn

On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw <ro...@google.com> wrote:

> Yeah, the entire input is not always what is needed, and can generally be
> achieved via
>
>     input -> wait(side input of write) -> do something with the input
>
> Of course one could also do
>
> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
> CombineGlobally(TrivialCombineFn)
>
> to reduce this to a more minimal set with at least one element per Window.
>
> The file writing operations emit the actual files that were written, which
> can be handy. My suggestion of PCollection<?> was just so that we can emit
> something usable, and decide exactly what is the most useful is later.
>
>
> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:
>
>> I believe that the Wait transform turns this output into a side input, so
>> outputting the input PCollection might be problematic.
>>
>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>> just reading really quickly so sorry if I missed something...
>>>
>>> Checking out the code for the WriteFn<T> I see a big problem:
>>>
>>>     @Setup
>>>     public void setup() {
>>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>>     }
>>>
>>>     @ProcessElement
>>>       public void processElement(ProcessContext c) throws
>>> ExecutionException, InterruptedException {
>>>       writer.mutate(c.element());
>>>     }
>>>
>>>     @Teardown
>>>     public void teardown() throws Exception {
>>>       writer.close();
>>>       writer = null;
>>>     }
>>>
>>> It is only in writer.close() that all async writes are waited on. This
>>> needs to happen in @FinishBundle.
>>>
>>> Did you discover this when implementing your own Cassandra.Write?
>>>
>>> Until you have waited on the future, you should not output the element
>>> as "has been written". And you cannot output from the @TearDown method
>>> which is just for cleaning up resources.
>>>
>>> Am I reading this wrong?
>>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>>>
>>>> How about a PCollection containing every element which was successfully
>>>> written?
>>>> Basically the same things which were passed into it.
>>>>
>>>> Then you could act on every element after its been successfully written
>>>> to the sink.
>>>>
>>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> +dev
>>>>>>
>>>>>> Since we all agree that we should return something different than
>>>>>> PDone the real question is what should we return.
>>>>>>
>>>>>
>>>>> My proposal is that one returns a PCollection<?> that consists,
>>>>> internally, of something contentless like nulls. This is future compatible
>>>>> with returning something more maningful based on the source source or write
>>>>> process itself, but at least this would be followable.
>>>>>
>>>>>
>>>>>> As a reminder we had a pretty interesting discussion about this
>>>>>> already in the past but uniformization of our return values has not
>>>>>> happened.
>>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>>> contribute Write transforms that return.
>>>>>>
>>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>>
>>>>>
>>>>> Yeah, we should go ahead and finally do something.
>>>>>
>>>>>
>>>>>>
>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>> changing it now would be backwards incompatible.
>>>>>>
>>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>>> the previous signature in short time (2 releases was the average for
>>>>>> previous cases).
>>>>>>
>>>>>>
>>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>>> <ar...@gmail.com> wrote:
>>>>>> >
>>>>>> > I thought that was said about returning a PCollection of write
>>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>>> > In this case, we keep backwards compatibility and just add new
>>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>>> "readAll()” methods).
>>>>>> >
>>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>>> >
>>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>>> returning variants (probably as another option to the builders) that
>>>>>> compose well with Wait, etc. would be welcome.
>>>>>> >
>>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>> >>
>>>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>>>> as a signal.
>>>>>> >>
>>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>>> >>
>>>>>> >> In general, I think we need to have it for all IOs, at least to
>>>>>> use with “Wait” because this pattern it's quite often required.
>>>>>> >>
>>>>>> >> [1]
>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>>> >> [2]
>>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>>> >>
>>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>> >>
>>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>>> "bundle".
>>>>>> >>
>>>>>> >> ~Vincent
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>>>> then write all successfully written records to PubSub or it should be
>>>>>> performed "record by record"?
>>>>>> >>>
>>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>>> vincent.marquez@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>>> PubSubIO.write
>>>>>> >>>
>>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>>> >>>
>>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>>> >>>                                                          |
>>>>>> >>>                                                           ->
>>>>>> PubsubIO.write
>>>>>> >>>
>>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>>> successfully finished a (quorum) write.
>>>>>> >>>
>>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't
>>>>>> actually use it here so I often roll my own 'writer', but maybe there is a
>>>>>> recommended way of doing this?
>>>>>> >>>
>>>>>> >>> Thanks in advance for any help.
>>>>>> >>>
>>>>>> >>> ~Vincent
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>

Re: Write to multiple IOs in linear fashion

Posted by Robert Bradshaw <ro...@google.com>.
Yeah, the entire input is not always what is needed, and can generally be
achieved via

    input -> wait(side input of write) -> do something with the input

Of course one could also do

entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
CombineGlobally(TrivialCombineFn)

to reduce this to a more minimal set with at least one element per Window.

The file writing operations emit the actual files that were written, which
can be handy. My suggestion of PCollection<?> was just so that we can emit
something usable, and decide exactly what is the most useful is later.


On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:

> I believe that the Wait transform turns this output into a side input, so
> outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn<T> I see a big problem:
>>
>>     @Setup
>>     public void setup() {
>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>     }
>>
>>     @ProcessElement
>>       public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>       writer.mutate(c.element());
>>     }
>>
>>     @Teardown
>>     public void teardown() throws Exception {
>>       writer.close();
>>       writer = null;
>>     }
>>
>> It is only in writer.close() that all async writes are waited on. This
>> needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the element as
>> "has been written". And you cannot output from the @TearDown method which
>> is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>>
>>> How about a PCollection containing every element which was successfully
>>> written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully written
>>> to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> +dev
>>>>>
>>>>> Since we all agree that we should return something different than
>>>>> PDone the real question is what should we return.
>>>>>
>>>>
>>>> My proposal is that one returns a PCollection<?> that consists,
>>>> internally, of something contentless like nulls. This is future compatible
>>>> with returning something more maningful based on the source source or write
>>>> process itself, but at least this would be followable.
>>>>
>>>>
>>>>> As a reminder we had a pretty interesting discussion about this
>>>>> already in the past but uniformization of our return values has not
>>>>> happened.
>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>> contribute Write transforms that return.
>>>>>
>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>
>>>>
>>>> Yeah, we should go ahead and finally do something.
>>>>
>>>>
>>>>>
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible.
>>>>>
>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>> the previous signature in short time (2 releases was the average for
>>>>> previous cases).
>>>>>
>>>>>
>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>> <ar...@gmail.com> wrote:
>>>>> >
>>>>> > I thought that was said about returning a PCollection of write
>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>> > In this case, we keep backwards compatibility and just add new
>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>> "readAll()” methods).
>>>>> >
>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>> >
>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>> >
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>> returning variants (probably as another option to the builders) that
>>>>> compose well with Wait, etc. would be welcome.
>>>>> >
>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>> >>
>>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>>> as a signal.
>>>>> >>
>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>> >>
>>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>>> with “Wait” because this pattern it's quite often required.
>>>>> >>
>>>>> >> [1]
>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>> >> [2]
>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>> >>
>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>> >>
>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>> "bundle".
>>>>> >>
>>>>> >> ~Vincent
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>>> then write all successfully written records to PubSub or it should be
>>>>> performed "record by record"?
>>>>> >>>
>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>> >>>
>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>> PubSubIO.write
>>>>> >>>
>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>> >>>
>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>> >>>                                                          |
>>>>> >>>                                                           ->
>>>>> PubsubIO.write
>>>>> >>>
>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>> successfully finished a (quorum) write.
>>>>> >>>
>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>>> recommended way of doing this?
>>>>> >>>
>>>>> >>> Thanks in advance for any help.
>>>>> >>>
>>>>> >>> ~Vincent
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>

Re: Write to multiple IOs in linear fashion

Posted by Robert Bradshaw <ro...@google.com>.
Yeah, the entire input is not always what is needed, and can generally be
achieved via

    input -> wait(side input of write) -> do something with the input

Of course one could also do

entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
CombineGlobally(TrivialCombineFn)

to reduce this to a more minimal set with at least one element per Window.

The file writing operations emit the actual files that were written, which
can be handy. My suggestion of PCollection<?> was just so that we can emit
something usable, and decide exactly what is the most useful is later.


On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax <re...@google.com> wrote:

> I believe that the Wait transform turns this output into a side input, so
> outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn<T> I see a big problem:
>>
>>     @Setup
>>     public void setup() {
>>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>     }
>>
>>     @ProcessElement
>>       public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>       writer.mutate(c.element());
>>     }
>>
>>     @Teardown
>>     public void teardown() throws Exception {
>>       writer.close();
>>       writer = null;
>>     }
>>
>> It is only in writer.close() that all async writes are waited on. This
>> needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the element as
>> "has been written". And you cannot output from the @TearDown method which
>> is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>>
>>> How about a PCollection containing every element which was successfully
>>> written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully written
>>> to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>>
>>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> +dev
>>>>>
>>>>> Since we all agree that we should return something different than
>>>>> PDone the real question is what should we return.
>>>>>
>>>>
>>>> My proposal is that one returns a PCollection<?> that consists,
>>>> internally, of something contentless like nulls. This is future compatible
>>>> with returning something more maningful based on the source source or write
>>>> process itself, but at least this would be followable.
>>>>
>>>>
>>>>> As a reminder we had a pretty interesting discussion about this
>>>>> already in the past but uniformization of our return values has not
>>>>> happened.
>>>>> This thread is worth reading for Vincent or anyone who wants to
>>>>> contribute Write transforms that return.
>>>>>
>>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>>
>>>>
>>>> Yeah, we should go ahead and finally do something.
>>>>
>>>>
>>>>>
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible.
>>>>>
>>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>>> worth to the maintainers to judge if the upgrade to return someething
>>>>> different of PDone is worth, in that case we can deprecate and remove
>>>>> the previous signature in short time (2 releases was the average for
>>>>> previous cases).
>>>>>
>>>>>
>>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>>> <ar...@gmail.com> wrote:
>>>>> >
>>>>> > I thought that was said about returning a PCollection of write
>>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>>> PTransform<…, PCollection<WriteResults>>.
>>>>> > In this case, we keep backwards compatibility and just add new
>>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>>> maybe even naming for this feature across different IOs (like we have for
>>>>> "readAll()” methods).
>>>>> >
>>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>>> >
>>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>>> wrote:
>>>>> >
>>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>>> returning variants (probably as another option to the builders) that
>>>>> compose well with Wait, etc. would be welcome.
>>>>> >
>>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>> >>
>>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>>> as a signal.
>>>>> >>
>>>>> >> Since you already adjusted your own writer for that, it would be
>>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>>> >>
>>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>>> with “Wait” because this pattern it's quite often required.
>>>>> >>
>>>>> >> [1]
>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>>> >> [2]
>>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>>> >>
>>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>> >>
>>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>>> successfully written to a database.  So "record by record" is fine, or even
>>>>> "bundle".
>>>>> >>
>>>>> >> ~Vincent
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>>> then write all successfully written records to PubSub or it should be
>>>>> performed "record by record"?
>>>>> >>>
>>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>>> vincent.marquez@gmail.com> wrote:
>>>>> >>>
>>>>> >>> I have a common use case where my pipeline looks like this:
>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>>> PubSubIO.write
>>>>> >>>
>>>>> >>> I do NOT want my pipeline to look like the following:
>>>>> >>>
>>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>>> >>>                                                          |
>>>>> >>>                                                           ->
>>>>> PubsubIO.write
>>>>> >>>
>>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>>> successfully finished a (quorum) write.
>>>>> >>>
>>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>>> recommended way of doing this?
>>>>> >>>
>>>>> >>> Thanks in advance for any help.
>>>>> >>>
>>>>> >>> ~Vincent
>>>>> >>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>

Re: Write to multiple IOs in linear fashion

Posted by Reuven Lax <re...@google.com>.
I believe that the Wait transform turns this output into a side input, so
outputting the input PCollection might be problematic.

On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am just
> reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn<T> I see a big problem:
>
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
>
>     @ProcessElement
>       public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>       writer.mutate(c.element());
>     }
>
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element as
> "has been written". And you cannot output from the @TearDown method which
> is just for cleaning up resources.
>
> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>
>> How about a PCollection containing every element which was successfully
>> written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully written
>> to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> +dev
>>>>
>>>> Since we all agree that we should return something different than
>>>> PDone the real question is what should we return.
>>>>
>>>
>>> My proposal is that one returns a PCollection<?> that consists,
>>> internally, of something contentless like nulls. This is future compatible
>>> with returning something more maningful based on the source source or write
>>> process itself, but at least this would be followable.
>>>
>>>
>>>> As a reminder we had a pretty interesting discussion about this
>>>> already in the past but uniformization of our return values has not
>>>> happened.
>>>> This thread is worth reading for Vincent or anyone who wants to
>>>> contribute Write transforms that return.
>>>>
>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>
>>>>
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible.
>>>>
>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>> worth to the maintainers to judge if the upgrade to return someething
>>>> different of PDone is worth, in that case we can deprecate and remove
>>>> the previous signature in short time (2 releases was the average for
>>>> previous cases).
>>>>
>>>>
>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>> <ar...@gmail.com> wrote:
>>>> >
>>>> > I thought that was said about returning a PCollection of write
>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>> PTransform<…, PCollection<WriteResults>>.
>>>> > In this case, we keep backwards compatibility and just add new
>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>> maybe even naming for this feature across different IOs (like we have for
>>>> "readAll()” methods).
>>>> >
>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>> >
>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>> returning variants (probably as another option to the builders) that
>>>> compose well with Wait, etc. would be welcome.
>>>> >
>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >>
>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>> as a signal.
>>>> >>
>>>> >> Since you already adjusted your own writer for that, it would be
>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>> >>
>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>> with “Wait” because this pattern it's quite often required.
>>>> >>
>>>> >> [1]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>> >> [2]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>> >>
>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>> successfully written to a database.  So "record by record" is fine, or even
>>>> "bundle".
>>>> >>
>>>> >> ~Vincent
>>>> >>
>>>> >>
>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >>>
>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>> then write all successfully written records to PubSub or it should be
>>>> performed "record by record"?
>>>> >>>
>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>> >>>
>>>> >>> I have a common use case where my pipeline looks like this:
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>> PubSubIO.write
>>>> >>>
>>>> >>> I do NOT want my pipeline to look like the following:
>>>> >>>
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>> >>>                                                          |
>>>> >>>                                                           ->
>>>> PubsubIO.write
>>>> >>>
>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>> successfully finished a (quorum) write.
>>>> >>>
>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>> recommended way of doing this?
>>>> >>>
>>>> >>> Thanks in advance for any help.
>>>> >>>
>>>> >>> ~Vincent
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>

Re: Write to multiple IOs in linear fashion

Posted by Vincent Marquez <vi...@gmail.com>.
On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am just
> reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn<T> I see a big problem:
>
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
>
>     @ProcessElement
>       public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>       writer.mutate(c.element());
>     }
>
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element as
> "has been written". And you cannot output from the @TearDown method which
> is just for cleaning up resources.
>
>
I didn't use an async call, I did a blocking write.  I actually think using
Futures/async to write here is an anti-pattern, as it prevents the ability
to linearize your writes, which is often necessary when doing high
throughput with many millions of updates so you don't overload a specific
shard/core, but eager to hear more if my reasoning isn't correct.




> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>
>> How about a PCollection containing every element which was successfully
>> written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully written
>> to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> +dev
>>>>
>>>> Since we all agree that we should return something different than
>>>> PDone the real question is what should we return.
>>>>
>>>
>>> My proposal is that one returns a PCollection<?> that consists,
>>> internally, of something contentless like nulls. This is future compatible
>>> with returning something more maningful based on the source source or write
>>> process itself, but at least this would be followable.
>>>
>>>
>>>> As a reminder we had a pretty interesting discussion about this
>>>> already in the past but uniformization of our return values has not
>>>> happened.
>>>> This thread is worth reading for Vincent or anyone who wants to
>>>> contribute Write transforms that return.
>>>>
>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>
>>>>
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible.
>>>>
>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>> worth to the maintainers to judge if the upgrade to return someething
>>>> different of PDone is worth, in that case we can deprecate and remove
>>>> the previous signature in short time (2 releases was the average for
>>>> previous cases).
>>>>
>>>>
>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>> <ar...@gmail.com> wrote:
>>>> >
>>>> > I thought that was said about returning a PCollection of write
>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>> PTransform<…, PCollection<WriteResults>>.
>>>> > In this case, we keep backwards compatibility and just add new
>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>> maybe even naming for this feature across different IOs (like we have for
>>>> "readAll()” methods).
>>>> >
>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>> >
>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>> returning variants (probably as another option to the builders) that
>>>> compose well with Wait, etc. would be welcome.
>>>> >
>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >>
>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>> as a signal.
>>>> >>
>>>> >> Since you already adjusted your own writer for that, it would be
>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>> >>
>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>> with “Wait” because this pattern it's quite often required.
>>>> >>
>>>> >> [1]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>> >> [2]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>> >>
>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>> successfully written to a database.  So "record by record" is fine, or even
>>>> "bundle".
>>>> >>
>>>> >> ~Vincent
>>>> >>
>>>> >>
>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >>>
>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>> then write all successfully written records to PubSub or it should be
>>>> performed "record by record"?
>>>> >>>
>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>> >>>
>>>> >>> I have a common use case where my pipeline looks like this:
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>> PubSubIO.write
>>>> >>>
>>>> >>> I do NOT want my pipeline to look like the following:
>>>> >>>
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>> >>>                                                          |
>>>> >>>                                                           ->
>>>> PubsubIO.write
>>>> >>>
>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>> successfully finished a (quorum) write.
>>>> >>>
>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>> recommended way of doing this?
>>>> >>>
>>>> >>> Thanks in advance for any help.
>>>> >>>
>>>> >>> ~Vincent
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>

Re: Write to multiple IOs in linear fashion

Posted by Alexey Romanenko <ar...@gmail.com>.
I think you are right, since "writer.close()”  contains a business logic, it must be moved to @FinishBundle. The same thing about DeleteFn.
I’ll create a Jira for that.

> On 25 Mar 2021, at 00:49, Kenneth Knowles <ke...@apache.org> wrote:
> 
> Alex's idea sounds good and like what Vincent maybe implemented. I am just reading really quickly so sorry if I missed something...
> 
> Checking out the code for the WriteFn<T> I see a big problem:
> 
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
> 
>     @ProcessElement
>       public void processElement(ProcessContext c) throws ExecutionException, InterruptedException {
>       writer.mutate(c.element());
>     }
> 
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
> 
> It is only in writer.close() that all async writes are waited on. This needs to happen in @FinishBundle.
> 
> Did you discover this when implementing your own Cassandra.Write?
> 
> Until you have waited on the future, you should not output the element as "has been written". And you cannot output from the @TearDown method which is just for cleaning up resources.
> 
> Am I reading this wrong?
> 
> Kenn
> 
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <ajamato@google.com <ma...@google.com>> wrote:
> How about a PCollection containing every element which was successfully written?
> Basically the same things which were passed into it.
> 
> Then you could act on every element after its been successfully written to the sink.
> 
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <iemejia@gmail.com <ma...@gmail.com>> wrote:
> +dev
> 
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
> 
> My proposal is that one returns a PCollection<?> that consists, internally, of something contentless like nulls. This is future compatible with returning something more maningful based on the source source or write process itself, but at least this would be followable. 
>  
> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E <https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E>
> 
> Yeah, we should go ahead and finally do something. 
>  
> 
> > Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible.
> 
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
> 
> 
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> >
> > I thought that was said about returning a PCollection of write results as it’s done in other IOs (as I mentioned as examples) that have _additional_ write methods, like “withWriteResults()” etc, that return PTransform<…, PCollection<WriteResults>>.
> > In this case, we keep backwards compatibility and just add new funtionality. Though, we need to follow the same pattern for user API and maybe even naming for this feature across different IOs (like we have for "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw <robertwb@google.com <ma...@google.com>> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible. PRs to add non-PDone returning variants (probably as another option to the builders) that compose well with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it was mentioned before, it doesn’t work with PDone, only with PCollection as a signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great to contribute it back to Beam in the way as it was done for other IOs (for example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use with “Wait” because this pattern it's quite often required.
> >>
> >> [1] https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078 <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078>
> >> [2] https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715 <https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715>
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vincent.marquez@gmail.com <ma...@gmail.com>> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has successfully written to a database.  So "record by record" is fine, or even "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then write all successfully written records to PubSub or it should be performed "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vincent.marquez@gmail.com <ma...@gmail.com>> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>                                                          |
> >>>                                                           -> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use it here so I often roll my own 'writer', but maybe there is a recommended way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >


Re: Write to multiple IOs in linear fashion

Posted by Reuven Lax <re...@google.com>.
I believe that the Wait transform turns this output into a side input, so
outputting the input PCollection might be problematic.

On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles <ke...@apache.org> wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am just
> reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn<T> I see a big problem:
>
>     @Setup
>     public void setup() {
>       writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>     }
>
>     @ProcessElement
>       public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>       writer.mutate(c.element());
>     }
>
>     @Teardown
>     public void teardown() throws Exception {
>       writer.close();
>       writer = null;
>     }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element as
> "has been written". And you cannot output from the @TearDown method which
> is just for cleaning up resources.
>
> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:
>
>> How about a PCollection containing every element which was successfully
>> written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully written
>> to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> +dev
>>>>
>>>> Since we all agree that we should return something different than
>>>> PDone the real question is what should we return.
>>>>
>>>
>>> My proposal is that one returns a PCollection<?> that consists,
>>> internally, of something contentless like nulls. This is future compatible
>>> with returning something more maningful based on the source source or write
>>> process itself, but at least this would be followable.
>>>
>>>
>>>> As a reminder we had a pretty interesting discussion about this
>>>> already in the past but uniformization of our return values has not
>>>> happened.
>>>> This thread is worth reading for Vincent or anyone who wants to
>>>> contribute Write transforms that return.
>>>>
>>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>
>>>>
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible.
>>>>
>>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>>> worth to the maintainers to judge if the upgrade to return someething
>>>> different of PDone is worth, in that case we can deprecate and remove
>>>> the previous signature in short time (2 releases was the average for
>>>> previous cases).
>>>>
>>>>
>>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>> <ar...@gmail.com> wrote:
>>>> >
>>>> > I thought that was said about returning a PCollection of write
>>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>>> PTransform<…, PCollection<WriteResults>>.
>>>> > In this case, we keep backwards compatibility and just add new
>>>> funtionality. Though, we need to follow the same pattern for user API and
>>>> maybe even naming for this feature across different IOs (like we have for
>>>> "readAll()” methods).
>>>> >
>>>> >  I agree that we have to avoid returning PDone for such cases.
>>>> >
>>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com>
>>>> wrote:
>>>> >
>>>> > Returning PDone is an anti-pattern that should be avoided, but
>>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>>> returning variants (probably as another option to the builders) that
>>>> compose well with Wait, etc. would be welcome.
>>>> >
>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >>
>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>> as a signal.
>>>> >>
>>>> >> Since you already adjusted your own writer for that, it would be
>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>> >>
>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>> with “Wait” because this pattern it's quite often required.
>>>> >>
>>>> >> [1]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>> >> [2]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>> >>
>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>> successfully written to a database.  So "record by record" is fine, or even
>>>> "bundle".
>>>> >>
>>>> >> ~Vincent
>>>> >>
>>>> >>
>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>> >>>
>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>> then write all successfully written records to PubSub or it should be
>>>> performed "record by record"?
>>>> >>>
>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>> vincent.marquez@gmail.com> wrote:
>>>> >>>
>>>> >>> I have a common use case where my pipeline looks like this:
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>> PubSubIO.write
>>>> >>>
>>>> >>> I do NOT want my pipeline to look like the following:
>>>> >>>
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>> >>>                                                          |
>>>> >>>                                                           ->
>>>> PubsubIO.write
>>>> >>>
>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>> successfully finished a (quorum) write.
>>>> >>>
>>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>> recommended way of doing this?
>>>> >>>
>>>> >>> Thanks in advance for any help.
>>>> >>>
>>>> >>> ~Vincent
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>

Re: Write to multiple IOs in linear fashion

Posted by Kenneth Knowles <ke...@apache.org>.
Alex's idea sounds good and like what Vincent maybe implemented. I am just
reading really quickly so sorry if I missed something...

Checking out the code for the WriteFn<T> I see a big problem:

    @Setup
    public void setup() {
      writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
    }

    @ProcessElement
      public void processElement(ProcessContext c) throws
ExecutionException, InterruptedException {
      writer.mutate(c.element());
    }

    @Teardown
    public void teardown() throws Exception {
      writer.close();
      writer = null;
    }

It is only in writer.close() that all async writes are waited on. This
needs to happen in @FinishBundle.

Did you discover this when implementing your own Cassandra.Write?

Until you have waited on the future, you should not output the element as
"has been written". And you cannot output from the @TearDown method which
is just for cleaning up resources.

Am I reading this wrong?

Kenn

On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:

> How about a PCollection containing every element which was successfully
> written?
> Basically the same things which were passed into it.
>
> Then you could act on every element after its been successfully written to
> the sink.
>
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> +dev
>>>
>>> Since we all agree that we should return something different than
>>> PDone the real question is what should we return.
>>>
>>
>> My proposal is that one returns a PCollection<?> that consists,
>> internally, of something contentless like nulls. This is future compatible
>> with returning something more maningful based on the source source or write
>> process itself, but at least this would be followable.
>>
>>
>>> As a reminder we had a pretty interesting discussion about this
>>> already in the past but uniformization of our return values has not
>>> happened.
>>> This thread is worth reading for Vincent or anyone who wants to
>>> contribute Write transforms that return.
>>>
>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>
>>
>> Yeah, we should go ahead and finally do something.
>>
>>
>>>
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible.
>>>
>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>> worth to the maintainers to judge if the upgrade to return someething
>>> different of PDone is worth, in that case we can deprecate and remove
>>> the previous signature in short time (2 releases was the average for
>>> previous cases).
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>> <ar...@gmail.com> wrote:
>>> >
>>> > I thought that was said about returning a PCollection of write results
>>> as it’s done in other IOs (as I mentioned as examples) that have
>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>> PTransform<…, PCollection<WriteResults>>.
>>> > In this case, we keep backwards compatibility and just add new
>>> funtionality. Though, we need to follow the same pattern for user API and
>>> maybe even naming for this feature across different IOs (like we have for
>>> "readAll()” methods).
>>> >
>>> >  I agree that we have to avoid returning PDone for such cases.
>>> >
>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com> wrote:
>>> >
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>> returning variants (probably as another option to the builders) that
>>> compose well with Wait, etc. would be welcome.
>>> >
>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>> >>
>>> >> In this way, I think “Wait” PTransform should work for you but, as it
>>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>>> a signal.
>>> >>
>>> >> Since you already adjusted your own writer for that, it would be
>>> great to contribute it back to Beam in the way as it was done for other IOs
>>> (for example, JdbcIO [1] or BigtableIO [2])
>>> >>
>>> >> In general, I think we need to have it for all IOs, at least to use
>>> with “Wait” because this pattern it's quite often required.
>>> >>
>>> >> [1]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>> >> [2]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>> >>
>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
>>> wrote:
>>> >>
>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>> successfully written to a database.  So "record by record" is fine, or even
>>> "bundle".
>>> >>
>>> >> ~Vincent
>>> >>
>>> >>
>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>> >>>
>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>> then write all successfully written records to PubSub or it should be
>>> performed "record by record"?
>>> >>>
>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vi...@gmail.com>
>>> wrote:
>>> >>>
>>> >>> I have a common use case where my pipeline looks like this:
>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>> PubSubIO.write
>>> >>>
>>> >>> I do NOT want my pipeline to look like the following:
>>> >>>
>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>> >>>                                                          |
>>> >>>                                                           ->
>>> PubsubIO.write
>>> >>>
>>> >>> Because I need to ensure that only items written to Pubsub have
>>> successfully finished a (quorum) write.
>>> >>>
>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>> use it here so I often roll my own 'writer', but maybe there is a
>>> recommended way of doing this?
>>> >>>
>>> >>> Thanks in advance for any help.
>>> >>>
>>> >>> ~Vincent
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>

Re: Write to multiple IOs in linear fashion

Posted by Kenneth Knowles <ke...@apache.org>.
Alex's idea sounds good and like what Vincent maybe implemented. I am just
reading really quickly so sorry if I missed something...

Checking out the code for the WriteFn<T> I see a big problem:

    @Setup
    public void setup() {
      writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
    }

    @ProcessElement
      public void processElement(ProcessContext c) throws
ExecutionException, InterruptedException {
      writer.mutate(c.element());
    }

    @Teardown
    public void teardown() throws Exception {
      writer.close();
      writer = null;
    }

It is only in writer.close() that all async writes are waited on. This
needs to happen in @FinishBundle.

Did you discover this when implementing your own Cassandra.Write?

Until you have waited on the future, you should not output the element as
"has been written". And you cannot output from the @TearDown method which
is just for cleaning up resources.

Am I reading this wrong?

Kenn

On Wed, Mar 24, 2021 at 4:35 PM Alex Amato <aj...@google.com> wrote:

> How about a PCollection containing every element which was successfully
> written?
> Basically the same things which were passed into it.
>
> Then you could act on every element after its been successfully written to
> the sink.
>
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> +dev
>>>
>>> Since we all agree that we should return something different than
>>> PDone the real question is what should we return.
>>>
>>
>> My proposal is that one returns a PCollection<?> that consists,
>> internally, of something contentless like nulls. This is future compatible
>> with returning something more maningful based on the source source or write
>> process itself, but at least this would be followable.
>>
>>
>>> As a reminder we had a pretty interesting discussion about this
>>> already in the past but uniformization of our return values has not
>>> happened.
>>> This thread is worth reading for Vincent or anyone who wants to
>>> contribute Write transforms that return.
>>>
>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>
>>
>> Yeah, we should go ahead and finally do something.
>>
>>
>>>
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible.
>>>
>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>> worth to the maintainers to judge if the upgrade to return someething
>>> different of PDone is worth, in that case we can deprecate and remove
>>> the previous signature in short time (2 releases was the average for
>>> previous cases).
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>> <ar...@gmail.com> wrote:
>>> >
>>> > I thought that was said about returning a PCollection of write results
>>> as it’s done in other IOs (as I mentioned as examples) that have
>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>> PTransform<…, PCollection<WriteResults>>.
>>> > In this case, we keep backwards compatibility and just add new
>>> funtionality. Though, we need to follow the same pattern for user API and
>>> maybe even naming for this feature across different IOs (like we have for
>>> "readAll()” methods).
>>> >
>>> >  I agree that we have to avoid returning PDone for such cases.
>>> >
>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com> wrote:
>>> >
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>> returning variants (probably as another option to the builders) that
>>> compose well with Wait, etc. would be welcome.
>>> >
>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>> >>
>>> >> In this way, I think “Wait” PTransform should work for you but, as it
>>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>>> a signal.
>>> >>
>>> >> Since you already adjusted your own writer for that, it would be
>>> great to contribute it back to Beam in the way as it was done for other IOs
>>> (for example, JdbcIO [1] or BigtableIO [2])
>>> >>
>>> >> In general, I think we need to have it for all IOs, at least to use
>>> with “Wait” because this pattern it's quite often required.
>>> >>
>>> >> [1]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>> >> [2]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>> >>
>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
>>> wrote:
>>> >>
>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>> successfully written to a database.  So "record by record" is fine, or even
>>> "bundle".
>>> >>
>>> >> ~Vincent
>>> >>
>>> >>
>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>> >>>
>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>> then write all successfully written records to PubSub or it should be
>>> performed "record by record"?
>>> >>>
>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vi...@gmail.com>
>>> wrote:
>>> >>>
>>> >>> I have a common use case where my pipeline looks like this:
>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>> PubSubIO.write
>>> >>>
>>> >>> I do NOT want my pipeline to look like the following:
>>> >>>
>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>> >>>                                                          |
>>> >>>                                                           ->
>>> PubsubIO.write
>>> >>>
>>> >>> Because I need to ensure that only items written to Pubsub have
>>> successfully finished a (quorum) write.
>>> >>>
>>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>>> use it here so I often roll my own 'writer', but maybe there is a
>>> recommended way of doing this?
>>> >>>
>>> >>> Thanks in advance for any help.
>>> >>>
>>> >>> ~Vincent
>>> >>>
>>> >>>
>>> >>
>>> >
>>>
>>

Re: Write to multiple IOs in linear fashion

Posted by Alex Amato <aj...@google.com>.
How about a PCollection containing every element which was successfully
written?
Basically the same things which were passed into it.

Then you could act on every element after its been successfully written to
the sink.

On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw <ro...@google.com> wrote:

> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> +dev
>>
>> Since we all agree that we should return something different than
>> PDone the real question is what should we return.
>>
>
> My proposal is that one returns a PCollection<?> that consists,
> internally, of something contentless like nulls. This is future compatible
> with returning something more maningful based on the source source or write
> process itself, but at least this would be followable.
>
>
>> As a reminder we had a pretty interesting discussion about this
>> already in the past but uniformization of our return values has not
>> happened.
>> This thread is worth reading for Vincent or anyone who wants to
>> contribute Write transforms that return.
>>
>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>
>
> Yeah, we should go ahead and finally do something.
>
>
>>
>> > Returning PDone is an anti-pattern that should be avoided, but changing
>> it now would be backwards incompatible.
>>
>> Periodic reminder most IOs are still Experimental so I suppose it is
>> worth to the maintainers to judge if the upgrade to return someething
>> different of PDone is worth, in that case we can deprecate and remove
>> the previous signature in short time (2 releases was the average for
>> previous cases).
>>
>>
>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>> <ar...@gmail.com> wrote:
>> >
>> > I thought that was said about returning a PCollection of write results
>> as it’s done in other IOs (as I mentioned as examples) that have
>> _additional_ write methods, like “withWriteResults()” etc, that return
>> PTransform<…, PCollection<WriteResults>>.
>> > In this case, we keep backwards compatibility and just add new
>> funtionality. Though, we need to follow the same pattern for user API and
>> maybe even naming for this feature across different IOs (like we have for
>> "readAll()” methods).
>> >
>> >  I agree that we have to avoid returning PDone for such cases.
>> >
>> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com> wrote:
>> >
>> > Returning PDone is an anti-pattern that should be avoided, but changing
>> it now would be backwards incompatible. PRs to add non-PDone returning
>> variants (probably as another option to the builders) that compose well
>> with Wait, etc. would be welcome.
>> >
>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>> >>
>> >> In this way, I think “Wait” PTransform should work for you but, as it
>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>> a signal.
>> >>
>> >> Since you already adjusted your own writer for that, it would be great
>> to contribute it back to Beam in the way as it was done for other IOs (for
>> example, JdbcIO [1] or BigtableIO [2])
>> >>
>> >> In general, I think we need to have it for all IOs, at least to use
>> with “Wait” because this pattern it's quite often required.
>> >>
>> >> [1]
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>> >> [2]
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>> >>
>> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
>> wrote:
>> >>
>> >> No, it only needs to ensure that one record seen on Pubsub has
>> successfully written to a database.  So "record by record" is fine, or even
>> "bundle".
>> >>
>> >> ~Vincent
>> >>
>> >>
>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>> >>>
>> >>> Do you want to wait for ALL records are written for Cassandra and
>> then write all successfully written records to PubSub or it should be
>> performed "record by record"?
>> >>>
>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vi...@gmail.com>
>> wrote:
>> >>>
>> >>> I have a common use case where my pipeline looks like this:
>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>> PubSubIO.write
>> >>>
>> >>> I do NOT want my pipeline to look like the following:
>> >>>
>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>> >>>                                                          |
>> >>>                                                           ->
>> PubsubIO.write
>> >>>
>> >>> Because I need to ensure that only items written to Pubsub have
>> successfully finished a (quorum) write.
>> >>>
>> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually
>> use it here so I often roll my own 'writer', but maybe there is a
>> recommended way of doing this?
>> >>>
>> >>> Thanks in advance for any help.
>> >>>
>> >>> ~Vincent
>> >>>
>> >>>
>> >>
>> >
>>
>

Re: Write to multiple IOs in linear fashion

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

My proposal is that one returns a PCollection<?> that consists, internally,
of something contentless like nulls. This is future compatible
with returning something more maningful based on the source source or write
process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible.
>
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
>
>
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> <ar...@gmail.com> wrote:
> >
> > I thought that was said about returning a PCollection of write results
> as it’s done in other IOs (as I mentioned as examples) that have
> _additional_ write methods, like “withWriteResults()” etc, that return
> PTransform<…, PCollection<WriteResults>>.
> > In this case, we keep backwards compatibility and just add new
> funtionality. Though, we need to follow the same pattern for user API and
> maybe even naming for this feature across different IOs (like we have for
> "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible. PRs to add non-PDone returning
> variants (probably as another option to the builders) that compose well
> with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it
> was mentioned before, it doesn’t work with PDone, only with PCollection as
> a signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great
> to contribute it back to Beam in the way as it was done for other IOs (for
> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use
> with “Wait” because this pattern it's quite often required.
> >>
> >> [1]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
> >> [2]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has
> successfully written to a database.  So "record by record" is fine, or even
> "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then
> write all successfully written records to PubSub or it should be performed
> "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vi...@gmail.com>
> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>                                                          |
> >>>                                                           ->
> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have
> successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use
> it here so I often roll my own 'writer', but maybe there is a recommended
> way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >
>

Re: Write to multiple IOs in linear fashion

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía <ie...@gmail.com> wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

My proposal is that one returns a PCollection<?> that consists, internally,
of something contentless like nulls. This is future compatible
with returning something more maningful based on the source source or write
process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible.
>
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
>
>
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> <ar...@gmail.com> wrote:
> >
> > I thought that was said about returning a PCollection of write results
> as it’s done in other IOs (as I mentioned as examples) that have
> _additional_ write methods, like “withWriteResults()” etc, that return
> PTransform<…, PCollection<WriteResults>>.
> > In this case, we keep backwards compatibility and just add new
> funtionality. Though, we need to follow the same pattern for user API and
> maybe even naming for this feature across different IOs (like we have for
> "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw <ro...@google.com> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible. PRs to add non-PDone returning
> variants (probably as another option to the builders) that compose well
> with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it
> was mentioned before, it doesn’t work with PDone, only with PCollection as
> a signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great
> to contribute it back to Beam in the way as it was done for other IOs (for
> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use
> with “Wait” because this pattern it's quite often required.
> >>
> >> [1]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
> >> [2]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez <vi...@gmail.com>
> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has
> successfully written to a database.  So "record by record" is fine, or even
> "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then
> write all successfully written records to PubSub or it should be performed
> "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <vi...@gmail.com>
> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>                                                          |
> >>>                                                           ->
> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have
> successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform<A, PDone> I can't actually use
> it here so I often roll my own 'writer', but maybe there is a recommended
> way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >
>