You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Simon Kitching <si...@unbelievable-machine.com> on 2018/04/05 12:01:02 UTC

BiqQueryIO.write and Wait.on

Hi All,

I need to write some data to BigQuery (batch-mode) and then send a Pubsub message to trigger further processing.

I found this thread titled "Callbacks/other functions run after a PDone/output transform" on the user-list which was very relevant:
  https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E

Thanks to the author of the Wait transform (Beam 2.4.0)!

Unfortunately, it appears that the Wait.on transform does not work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice appreciated.

Here's (most of) the relevant test code:
        Pipeline p = Pipeline.create(options);
        PCollection<String> lines = p.apply("Read Input", Create.of("line1", "line2", "line3", "line4"));

        TableFieldSchema f1 = new TableFieldSchema().setName("value").setType("string");
        TableSchema s2 = new TableSchema().setFields(Collections.singletonList(f1));

        WriteResult writeResult = lines.apply("Write and load data", BigQueryIO.<String>write() //
                .to(options.getTableSpec()) //
                .withFormatFunction(new SlowFormatter()) //
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
//                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
                .withSchema(s2)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) //
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

        lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new OnCompletion()));

where
+ format-function "SlowFormatter" prints out each line and has a small sleep for testing purposes, and
+ DoFn OnCompletion just prints out the contents of each line

In production code, OnCompletion would be fed some collection derived from lines, eg min/max record id, and the operation would be "send pubsub message" rather than print..

My expectation is that the "SlowFormatter" would run for each line, then the data would be uploaded, then OnCompletion would print each line. And indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS, LinePrinter runs before the upload takes place.

I use WriteResult.getFailedInserts as that is the only "output" that BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but believe that it can be used as a "signal" for the Wait.on - ie the output is "complete for window" only after all data has been uploaded, which is what I need. And that does seem to work for STREAMING_LOADS.

I suspect the reason that this does not work for FILE_LOADS is that method BatchLoads.writeResult returns a WriteResult that wraps an "empty" failedInserts collection, ie data which is not connected to the batch-load-job that is triggered:
  private WriteResult writeResult(Pipeline p) {
    PCollection<TableRow> empty =
        p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class)));
    return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
  }

Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once a job is submitted the code repeatedly polls the job status until it reaches DONE or FAILED. However that information does not appear to be exposed anywhere (unlike streaming which effectively exposes completion-state via the failedInserts stream).

If I have misunderstood something, corrections welcome! If not, suggestions for workarounds or alternate solutions are also welcome :-)

Thanks,
Simon


Re: BiqQueryIO.write and Wait.on

Posted by Reuven Lax <re...@google.com>.
I've been looking at this a bit, and I think it will be tricky to figure
out how to get this working with BigQueryIO. The base problem is that Wait
tries to sequence things by window, as it's hard to come up with a
different definition of sequencing with unbounded inputs. However
BigQueryIO rewindows all input data into the GlobalWindow first thing. This
is usually the right thing for a sink to do as each element should be sent
to external sink as quickly as possible without waiting for windows to
close. However this doesn't play nice with the Wait transform, as the
original windows are lost inside BigQueryIO.

The best solution I can think of is to allow Wait on BigQueryIO _only_ if
the original window is the global window (which is common for batch jobs),
and to disallow it otherwise. This is unfortunate because it means (among
other things) that this won't work at all in streaming. If anyone has a
better idea, I'd love to hear it.

Reuven

On Wed, Jul 25, 2018 at 9:57 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Just opened this PR: https://github.com/apache/beam/pull/6055 to get
> feedback ASAP. Basically what it does is return the job status in a
> PCollection of BigQueryWriteResult objects
>
> On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax <re...@google.com> wrote:
>
>> There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.
>>
>> On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hmm, I think this approach has some complications:
>>> - Using JobStatus makes it tied to using BigQuery batch load jobs, but
>>> the return type ought to be the same regardless of which method of writing
>>> is used (including potential future BigQuery APIs - they are evolving), or
>>> how many BigQuery load jobs are involved in writing a given window (it can
>>> be multiple).
>>> - Returning a success/failure indicator makes it prone to users ignoring
>>> the failure: the default behavior should be that, if the pipeline succeeds,
>>> that means all data was successfully written - if users want different
>>> error handling, e.g. a deadletter queue, they should have to specify it
>>> explicitly.
>>>
>>> I would recommend to return a PCollection of a type that's invariant to
>>> which load method is used (streaming writes, load jobs, multiple load jobs
>>> etc.). If it's unclear what type that should be, you could introduce an
>>> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
>>> signaling success, and later add something to it.
>>>
>>> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> All good so far. I've been a bit side tracked but more or less I have
>>>> the idea of using the JobStatus as part of the collection so that not only
>>>> the completion is signaled, but also the result (success/failure) can be
>>>> accessed, how does it sound?
>>>>
>>>> Regards
>>>>
>>>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Carlos,
>>>>>
>>>>> Any updates / roadblocks you hit?
>>>>>
>>>>>
>>>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Awesome!! Thanks for the heads up, very exciting, this is going to
>>>>>> make a lot of people happy :)
>>>>>>
>>>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> + dev@beam.apache.org
>>>>>>>
>>>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>>>
>>>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>>>> kirpichov@google.com> wrote:
>>>>>>>
>>>>>>>> Hi Carlos,
>>>>>>>>
>>>>>>>> Thank you for expressing interest in taking this on! Let me give
>>>>>>>> you a few pointers to start, and I'll be happy to help everywhere along the
>>>>>>>> way.
>>>>>>>>
>>>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>>>>> directly, instead we should add another component to WriteResult that
>>>>>>>> represents the result of successfully writing some data.
>>>>>>>>
>>>>>>>> Given that BQIO supports dynamic destination writes, I think it
>>>>>>>> makes sense for that to be a PCollection<KV<DestinationT, ???>> so that in
>>>>>>>> theory we could sequence different destinations independently (currently
>>>>>>>> Wait.on() does not provide such a feature, but it could); and it will
>>>>>>>> require changing WriteResult to be WriteResult<DestinationT>. As for what
>>>>>>>> the "???" might be - it is something that represents the result of
>>>>>>>> successfully writing a window of data. I think it can even be Void, or "?"
>>>>>>>> (wildcard type) for now, until we figure out something better.
>>>>>>>>
>>>>>>>> Implementing this would require roughly the following work:
>>>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>>>>> expandTriggered() and expandUntriggered()
>>>>>>>> ...- expandTriggered() itself writes via 2 codepaths:
>>>>>>>> single-partition and multi-partition. Both need to be handled - we need to
>>>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten these
>>>>>>>> two PCollections together to get the final result. The single-partition
>>>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>>>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>>>>>> bit to keep it until the end.
>>>>>>>> ...- expandUntriggered() should be treated the same way.
>>>>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>>>>> should be easy.
>>>>>>>>
>>>>>>>> Another challenge with all of this is backwards compatibility in
>>>>>>>> terms of API and pipeline update.
>>>>>>>> Pipeline update is much less of a concern for the BatchLoads
>>>>>>>> codepath, because it's typically used in batch-mode pipelines that don't
>>>>>>>> get updated. I would recommend to start with this, perhaps even with only
>>>>>>>> the untriggered codepath (it is much more commonly used) - that will pave
>>>>>>>> the way for future work.
>>>>>>>>
>>>>>>>> Hope this helps, please ask more if something is unclear!
>>>>>>>>
>>>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <
>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>
>>>>>>>>> Hey Eugene!!
>>>>>>>>>
>>>>>>>>> I’d gladly take a stab on it although I’m not sure how much
>>>>>>>>> available time I might have to put into but... yeah, let’s try it.
>>>>>>>>>
>>>>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <
>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>>>>>> helping out?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <
>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least
>>>>>>>>>>> to my understanding. I'd also be interested in getting batch load result's
>>>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then
>>>>>>>>>>>> send a Pubsub message to trigger further processing.
>>>>>>>>>>>>
>>>>>>>>>>>> I found this thread titled "Callbacks/other functions run after
>>>>>>>>>>>> a PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>>>>
>>>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>>>>
>>>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not
>>>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to
>>>>>>>>>>>> work. Advice appreciated.
>>>>>>>>>>>>
>>>>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>>>>
>>>>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>>>>         TableSchema s2 = new
>>>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>>>>
>>>>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>>>>> //
>>>>>>>>>>>> //
>>>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>>>>
>>>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>>>> //
>>>>>>>>>>>>
>>>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>>>>> OnCompletion()));
>>>>>>>>>>>>
>>>>>>>>>>>> where
>>>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has
>>>>>>>>>>>> a small sleep for testing purposes, and
>>>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>>>>
>>>>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>>>>
>>>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print each
>>>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>>>>
>>>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>>>>
>>>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps an
>>>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to the
>>>>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>>>>> empty);
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>>>>
>>>>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Simon
>>>>>>>>>>>>
>>>>>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Just opened this PR: https://github.com/apache/beam/pull/6055 to get
feedback ASAP. Basically what it does is return the job status in a
PCollection of BigQueryWriteResult objects

On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax <re...@google.com> wrote:

> There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.
>
> On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hmm, I think this approach has some complications:
>> - Using JobStatus makes it tied to using BigQuery batch load jobs, but
>> the return type ought to be the same regardless of which method of writing
>> is used (including potential future BigQuery APIs - they are evolving), or
>> how many BigQuery load jobs are involved in writing a given window (it can
>> be multiple).
>> - Returning a success/failure indicator makes it prone to users ignoring
>> the failure: the default behavior should be that, if the pipeline succeeds,
>> that means all data was successfully written - if users want different
>> error handling, e.g. a deadletter queue, they should have to specify it
>> explicitly.
>>
>> I would recommend to return a PCollection of a type that's invariant to
>> which load method is used (streaming writes, load jobs, multiple load jobs
>> etc.). If it's unclear what type that should be, you could introduce an
>> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
>> signaling success, and later add something to it.
>>
>> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> All good so far. I've been a bit side tracked but more or less I have
>>> the idea of using the JobStatus as part of the collection so that not only
>>> the completion is signaled, but also the result (success/failure) can be
>>> accessed, how does it sound?
>>>
>>> Regards
>>>
>>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Any updates / roadblocks you hit?
>>>>
>>>>
>>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Awesome!! Thanks for the heads up, very exciting, this is going to
>>>>> make a lot of people happy :)
>>>>>
>>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> + dev@beam.apache.org
>>>>>>
>>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>>
>>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> Hi Carlos,
>>>>>>>
>>>>>>> Thank you for expressing interest in taking this on! Let me give you
>>>>>>> a few pointers to start, and I'll be happy to help everywhere along the way.
>>>>>>>
>>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>>>> directly, instead we should add another component to WriteResult that
>>>>>>> represents the result of successfully writing some data.
>>>>>>>
>>>>>>> Given that BQIO supports dynamic destination writes, I think it
>>>>>>> makes sense for that to be a PCollection<KV<DestinationT, ???>> so that in
>>>>>>> theory we could sequence different destinations independently (currently
>>>>>>> Wait.on() does not provide such a feature, but it could); and it will
>>>>>>> require changing WriteResult to be WriteResult<DestinationT>. As for what
>>>>>>> the "???" might be - it is something that represents the result of
>>>>>>> successfully writing a window of data. I think it can even be Void, or "?"
>>>>>>> (wildcard type) for now, until we figure out something better.
>>>>>>>
>>>>>>> Implementing this would require roughly the following work:
>>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>>>> expandTriggered() and expandUntriggered()
>>>>>>> ...- expandTriggered() itself writes via 2 codepaths:
>>>>>>> single-partition and multi-partition. Both need to be handled - we need to
>>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten these
>>>>>>> two PCollections together to get the final result. The single-partition
>>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>>>>> bit to keep it until the end.
>>>>>>> ...- expandUntriggered() should be treated the same way.
>>>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>>>> should be easy.
>>>>>>>
>>>>>>> Another challenge with all of this is backwards compatibility in
>>>>>>> terms of API and pipeline update.
>>>>>>> Pipeline update is much less of a concern for the BatchLoads
>>>>>>> codepath, because it's typically used in batch-mode pipelines that don't
>>>>>>> get updated. I would recommend to start with this, perhaps even with only
>>>>>>> the untriggered codepath (it is much more commonly used) - that will pave
>>>>>>> the way for future work.
>>>>>>>
>>>>>>> Hope this helps, please ask more if something is unclear!
>>>>>>>
>>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Eugene!!
>>>>>>>>
>>>>>>>> I’d gladly take a stab on it although I’m not sure how much
>>>>>>>> available time I might have to put into but... yeah, let’s try it.
>>>>>>>>
>>>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>>>>> helping out?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <
>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>>>> my understanding. I'd also be interested in getting batch load result's
>>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send
>>>>>>>>>>> a Pubsub message to trigger further processing.
>>>>>>>>>>>
>>>>>>>>>>> I found this thread titled "Callbacks/other functions run after
>>>>>>>>>>> a PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>>>
>>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>>>
>>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>>>
>>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not
>>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to
>>>>>>>>>>> work. Advice appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>>>
>>>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>>>         TableSchema s2 = new
>>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>>>
>>>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>>>> //
>>>>>>>>>>> //
>>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>>>
>>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>>> //
>>>>>>>>>>>
>>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>>>> OnCompletion()));
>>>>>>>>>>>
>>>>>>>>>>> where
>>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>>>
>>>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>>>
>>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print each
>>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>>>
>>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>>>
>>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps an
>>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to the
>>>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>>>> empty);
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>>>
>>>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Simon
>>>>>>>>>>>
>>>>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Just opened this PR: https://github.com/apache/beam/pull/6055 to get
feedback ASAP. Basically what it does is return the job status in a
PCollection of BigQueryWriteResult objects

On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax <re...@google.com> wrote:

> There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.
>
> On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hmm, I think this approach has some complications:
>> - Using JobStatus makes it tied to using BigQuery batch load jobs, but
>> the return type ought to be the same regardless of which method of writing
>> is used (including potential future BigQuery APIs - they are evolving), or
>> how many BigQuery load jobs are involved in writing a given window (it can
>> be multiple).
>> - Returning a success/failure indicator makes it prone to users ignoring
>> the failure: the default behavior should be that, if the pipeline succeeds,
>> that means all data was successfully written - if users want different
>> error handling, e.g. a deadletter queue, they should have to specify it
>> explicitly.
>>
>> I would recommend to return a PCollection of a type that's invariant to
>> which load method is used (streaming writes, load jobs, multiple load jobs
>> etc.). If it's unclear what type that should be, you could introduce an
>> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
>> signaling success, and later add something to it.
>>
>> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> All good so far. I've been a bit side tracked but more or less I have
>>> the idea of using the JobStatus as part of the collection so that not only
>>> the completion is signaled, but also the result (success/failure) can be
>>> accessed, how does it sound?
>>>
>>> Regards
>>>
>>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Any updates / roadblocks you hit?
>>>>
>>>>
>>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Awesome!! Thanks for the heads up, very exciting, this is going to
>>>>> make a lot of people happy :)
>>>>>
>>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> + dev@beam.apache.org
>>>>>>
>>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>>
>>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> Hi Carlos,
>>>>>>>
>>>>>>> Thank you for expressing interest in taking this on! Let me give you
>>>>>>> a few pointers to start, and I'll be happy to help everywhere along the way.
>>>>>>>
>>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>>>> directly, instead we should add another component to WriteResult that
>>>>>>> represents the result of successfully writing some data.
>>>>>>>
>>>>>>> Given that BQIO supports dynamic destination writes, I think it
>>>>>>> makes sense for that to be a PCollection<KV<DestinationT, ???>> so that in
>>>>>>> theory we could sequence different destinations independently (currently
>>>>>>> Wait.on() does not provide such a feature, but it could); and it will
>>>>>>> require changing WriteResult to be WriteResult<DestinationT>. As for what
>>>>>>> the "???" might be - it is something that represents the result of
>>>>>>> successfully writing a window of data. I think it can even be Void, or "?"
>>>>>>> (wildcard type) for now, until we figure out something better.
>>>>>>>
>>>>>>> Implementing this would require roughly the following work:
>>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>>>> expandTriggered() and expandUntriggered()
>>>>>>> ...- expandTriggered() itself writes via 2 codepaths:
>>>>>>> single-partition and multi-partition. Both need to be handled - we need to
>>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten these
>>>>>>> two PCollections together to get the final result. The single-partition
>>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>>>>> bit to keep it until the end.
>>>>>>> ...- expandUntriggered() should be treated the same way.
>>>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>>>> should be easy.
>>>>>>>
>>>>>>> Another challenge with all of this is backwards compatibility in
>>>>>>> terms of API and pipeline update.
>>>>>>> Pipeline update is much less of a concern for the BatchLoads
>>>>>>> codepath, because it's typically used in batch-mode pipelines that don't
>>>>>>> get updated. I would recommend to start with this, perhaps even with only
>>>>>>> the untriggered codepath (it is much more commonly used) - that will pave
>>>>>>> the way for future work.
>>>>>>>
>>>>>>> Hope this helps, please ask more if something is unclear!
>>>>>>>
>>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Eugene!!
>>>>>>>>
>>>>>>>> I’d gladly take a stab on it although I’m not sure how much
>>>>>>>> available time I might have to put into but... yeah, let’s try it.
>>>>>>>>
>>>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>>>>> helping out?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <
>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>>>> my understanding. I'd also be interested in getting batch load result's
>>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send
>>>>>>>>>>> a Pubsub message to trigger further processing.
>>>>>>>>>>>
>>>>>>>>>>> I found this thread titled "Callbacks/other functions run after
>>>>>>>>>>> a PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>>>
>>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>>>
>>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>>>
>>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not
>>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to
>>>>>>>>>>> work. Advice appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>>>
>>>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>>>         TableSchema s2 = new
>>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>>>
>>>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>>>> //
>>>>>>>>>>> //
>>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>>>
>>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>>> //
>>>>>>>>>>>
>>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>>>> OnCompletion()));
>>>>>>>>>>>
>>>>>>>>>>> where
>>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>>>
>>>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>>>
>>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print each
>>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>>>
>>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>>>
>>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps an
>>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to the
>>>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>>>> empty);
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>>>
>>>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Simon
>>>>>>>>>>>
>>>>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Reuven Lax <re...@google.com>.
There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.

On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Hmm, I think this approach has some complications:
> - Using JobStatus makes it tied to using BigQuery batch load jobs, but the
> return type ought to be the same regardless of which method of writing is
> used (including potential future BigQuery APIs - they are evolving), or how
> many BigQuery load jobs are involved in writing a given window (it can be
> multiple).
> - Returning a success/failure indicator makes it prone to users ignoring
> the failure: the default behavior should be that, if the pipeline succeeds,
> that means all data was successfully written - if users want different
> error handling, e.g. a deadletter queue, they should have to specify it
> explicitly.
>
> I would recommend to return a PCollection of a type that's invariant to
> which load method is used (streaming writes, load jobs, multiple load jobs
> etc.). If it's unclear what type that should be, you could introduce an
> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
> signaling success, and later add something to it.
>
> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> All good so far. I've been a bit side tracked but more or less I have the
>> idea of using the JobStatus as part of the collection so that not only the
>> completion is signaled, but also the result (success/failure) can be
>> accessed, how does it sound?
>>
>> Regards
>>
>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hi Carlos,
>>>
>>> Any updates / roadblocks you hit?
>>>
>>>
>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Awesome!! Thanks for the heads up, very exciting, this is going to make
>>>> a lot of people happy :)
>>>>
>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> + dev@beam.apache.org
>>>>>
>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>
>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>> kirpichov@google.com> wrote:
>>>>>
>>>>>> Hi Carlos,
>>>>>>
>>>>>> Thank you for expressing interest in taking this on! Let me give you
>>>>>> a few pointers to start, and I'll be happy to help everywhere along the way.
>>>>>>
>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>>> directly, instead we should add another component to WriteResult that
>>>>>> represents the result of successfully writing some data.
>>>>>>
>>>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>>>>> we could sequence different destinations independently (currently Wait.on()
>>>>>> does not provide such a feature, but it could); and it will require
>>>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>>>>> might be - it is something that represents the result of successfully
>>>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>>>> type) for now, until we figure out something better.
>>>>>>
>>>>>> Implementing this would require roughly the following work:
>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>>> expandTriggered() and expandUntriggered()
>>>>>> ...- expandTriggered() itself writes via 2 codepaths:
>>>>>> single-partition and multi-partition. Both need to be handled - we need to
>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten these
>>>>>> two PCollections together to get the final result. The single-partition
>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>>>> bit to keep it until the end.
>>>>>> ...- expandUntriggered() should be treated the same way.
>>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>>> should be easy.
>>>>>>
>>>>>> Another challenge with all of this is backwards compatibility in
>>>>>> terms of API and pipeline update.
>>>>>> Pipeline update is much less of a concern for the BatchLoads
>>>>>> codepath, because it's typically used in batch-mode pipelines that don't
>>>>>> get updated. I would recommend to start with this, perhaps even with only
>>>>>> the untriggered codepath (it is much more commonly used) - that will pave
>>>>>> the way for future work.
>>>>>>
>>>>>> Hope this helps, please ask more if something is unclear!
>>>>>>
>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Eugene!!
>>>>>>>
>>>>>>> I’d gladly take a stab on it although I’m not sure how much
>>>>>>> available time I might have to put into but... yeah, let’s try it.
>>>>>>>
>>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>>>> helping out?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>>> my understanding. I'd also be interested in getting batch load result's
>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send
>>>>>>>>>> a Pubsub message to trigger further processing.
>>>>>>>>>>
>>>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>>
>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>>
>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>>
>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not
>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to
>>>>>>>>>> work. Advice appreciated.
>>>>>>>>>>
>>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>>
>>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>>         TableSchema s2 = new
>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>>
>>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>>>> //
>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>>
>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>> //
>>>>>>>>>>
>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>>> OnCompletion()));
>>>>>>>>>>
>>>>>>>>>> where
>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>>
>>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>>
>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print each
>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>>
>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>>
>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps an
>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to the
>>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>>> empty);
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>>
>>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Simon
>>>>>>>>>>
>>>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Hmm, I think this approach has some complications:
- Using JobStatus makes it tied to using BigQuery batch load jobs, but the
return type ought to be the same regardless of which method of writing is
used (including potential future BigQuery APIs - they are evolving), or how
many BigQuery load jobs are involved in writing a given window (it can be
multiple).
- Returning a success/failure indicator makes it prone to users ignoring
the failure: the default behavior should be that, if the pipeline succeeds,
that means all data was successfully written - if users want different
error handling, e.g. a deadletter queue, they should have to specify it
explicitly.

I would recommend to return a PCollection of a type that's invariant to
which load method is used (streaming writes, load jobs, multiple load jobs
etc.). If it's unclear what type that should be, you could introduce an
empty type e.g. "class BigQueryWriteResult {}" just for the sake of
signaling success, and later add something to it.

On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> All good so far. I've been a bit side tracked but more or less I have the
> idea of using the JobStatus as part of the collection so that not only the
> completion is signaled, but also the result (success/failure) can be
> accessed, how does it sound?
>
> Regards
>
> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi Carlos,
>>
>> Any updates / roadblocks you hit?
>>
>>
>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Awesome!! Thanks for the heads up, very exciting, this is going to make
>>> a lot of people happy :)
>>>
>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>>
>>>> + dev@beam.apache.org
>>>>
>>>> Just a quick email to let you know that I'm starting developing this.
>>>>
>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Carlos,
>>>>>
>>>>> Thank you for expressing interest in taking this on! Let me give you a
>>>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>>>
>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>> PCollection) that can be used as input to Wait.on().
>>>>> Currently it returns a WriteResult, which only contains a
>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>> directly, instead we should add another component to WriteResult that
>>>>> represents the result of successfully writing some data.
>>>>>
>>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>>>> we could sequence different destinations independently (currently Wait.on()
>>>>> does not provide such a feature, but it could); and it will require
>>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>>>> might be - it is something that represents the result of successfully
>>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>>> type) for now, until we figure out something better.
>>>>>
>>>>> Implementing this would require roughly the following work:
>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>> expandTriggered() and expandUntriggered()
>>>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>>>> and multi-partition. Both need to be handled - we need to get a
>>>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>>>> PCollections together to get the final result. The single-partition
>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>>> bit to keep it until the end.
>>>>> ...- expandUntriggered() should be treated the same way.
>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>> should be easy.
>>>>>
>>>>> Another challenge with all of this is backwards compatibility in terms
>>>>> of API and pipeline update.
>>>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>>>> because it's typically used in batch-mode pipelines that don't get updated.
>>>>> I would recommend to start with this, perhaps even with only the
>>>>> untriggered codepath (it is much more commonly used) - that will pave the
>>>>> way for future work.
>>>>>
>>>>> Hope this helps, please ask more if something is unclear!
>>>>>
>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Eugene!!
>>>>>>
>>>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>>>> time I might have to put into but... yeah, let’s try it.
>>>>>>
>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>
>>>>>> Thanks!
>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>>> helping out?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>> my understanding. I'd also be interested in getting batch load result's
>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>>>> Pubsub message to trigger further processing.
>>>>>>>>>
>>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>
>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>
>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>
>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>>>>> Advice appreciated.
>>>>>>>>>
>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>
>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>         TableSchema s2 = new
>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>
>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>>> //
>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>
>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>> //
>>>>>>>>>
>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>> OnCompletion()));
>>>>>>>>>
>>>>>>>>> where
>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>
>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>
>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print each
>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>
>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>
>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps an
>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to the
>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>> empty);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>
>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Simon
>>>>>>>>>
>>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Hmm, I think this approach has some complications:
- Using JobStatus makes it tied to using BigQuery batch load jobs, but the
return type ought to be the same regardless of which method of writing is
used (including potential future BigQuery APIs - they are evolving), or how
many BigQuery load jobs are involved in writing a given window (it can be
multiple).
- Returning a success/failure indicator makes it prone to users ignoring
the failure: the default behavior should be that, if the pipeline succeeds,
that means all data was successfully written - if users want different
error handling, e.g. a deadletter queue, they should have to specify it
explicitly.

I would recommend to return a PCollection of a type that's invariant to
which load method is used (streaming writes, load jobs, multiple load jobs
etc.). If it's unclear what type that should be, you could introduce an
empty type e.g. "class BigQueryWriteResult {}" just for the sake of
signaling success, and later add something to it.

On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> All good so far. I've been a bit side tracked but more or less I have the
> idea of using the JobStatus as part of the collection so that not only the
> completion is signaled, but also the result (success/failure) can be
> accessed, how does it sound?
>
> Regards
>
> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi Carlos,
>>
>> Any updates / roadblocks you hit?
>>
>>
>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Awesome!! Thanks for the heads up, very exciting, this is going to make
>>> a lot of people happy :)
>>>
>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>>
>>>> + dev@beam.apache.org
>>>>
>>>> Just a quick email to let you know that I'm starting developing this.
>>>>
>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Carlos,
>>>>>
>>>>> Thank you for expressing interest in taking this on! Let me give you a
>>>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>>>
>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>> PCollection) that can be used as input to Wait.on().
>>>>> Currently it returns a WriteResult, which only contains a
>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>> directly, instead we should add another component to WriteResult that
>>>>> represents the result of successfully writing some data.
>>>>>
>>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>>>> we could sequence different destinations independently (currently Wait.on()
>>>>> does not provide such a feature, but it could); and it will require
>>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>>>> might be - it is something that represents the result of successfully
>>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>>> type) for now, until we figure out something better.
>>>>>
>>>>> Implementing this would require roughly the following work:
>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>> expandTriggered() and expandUntriggered()
>>>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>>>> and multi-partition. Both need to be handled - we need to get a
>>>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>>>> PCollections together to get the final result. The single-partition
>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>>> bit to keep it until the end.
>>>>> ...- expandUntriggered() should be treated the same way.
>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>> should be easy.
>>>>>
>>>>> Another challenge with all of this is backwards compatibility in terms
>>>>> of API and pipeline update.
>>>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>>>> because it's typically used in batch-mode pipelines that don't get updated.
>>>>> I would recommend to start with this, perhaps even with only the
>>>>> untriggered codepath (it is much more commonly used) - that will pave the
>>>>> way for future work.
>>>>>
>>>>> Hope this helps, please ask more if something is unclear!
>>>>>
>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Eugene!!
>>>>>>
>>>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>>>> time I might have to put into but... yeah, let’s try it.
>>>>>>
>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>
>>>>>> Thanks!
>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>>> helping out?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>> my understanding. I'd also be interested in getting batch load result's
>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>>>> Pubsub message to trigger further processing.
>>>>>>>>>
>>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>
>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>
>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>
>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>>>>> Advice appreciated.
>>>>>>>>>
>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>
>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>         TableSchema s2 = new
>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>
>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>>> //
>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>
>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>> //
>>>>>>>>>
>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>> OnCompletion()));
>>>>>>>>>
>>>>>>>>> where
>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>
>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>
>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print each
>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>
>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>
>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps an
>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to the
>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>> empty);
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>
>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Simon
>>>>>>>>>
>>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
All good so far. I've been a bit side tracked but more or less I have the
idea of using the JobStatus as part of the collection so that not only the
completion is signaled, but also the result (success/failure) can be
accessed, how does it sound?

Regards

On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Hi Carlos,
>
> Any updates / roadblocks you hit?
>
>
> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Awesome!! Thanks for the heads up, very exciting, this is going to make a
>> lot of people happy :)
>>
>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>
>>> + dev@beam.apache.org
>>>
>>> Just a quick email to let you know that I'm starting developing this.
>>>
>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Thank you for expressing interest in taking this on! Let me give you a
>>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>>
>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>> PCollection) that can be used as input to Wait.on().
>>>> Currently it returns a WriteResult, which only contains a
>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>> directly, instead we should add another component to WriteResult that
>>>> represents the result of successfully writing some data.
>>>>
>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>>> we could sequence different destinations independently (currently Wait.on()
>>>> does not provide such a feature, but it could); and it will require
>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>>> might be - it is something that represents the result of successfully
>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>> type) for now, until we figure out something better.
>>>>
>>>> Implementing this would require roughly the following work:
>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>> expandTriggered() and expandUntriggered()
>>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>>> and multi-partition. Both need to be handled - we need to get a
>>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>>> PCollections together to get the final result. The single-partition
>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>> bit to keep it until the end.
>>>> ...- expandUntriggered() should be treated the same way.
>>>> - Modify the StreamingWriteTables transform to provide it
>>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>>> the way until the end of StreamingWriteTables - it will need to be
>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>> should be easy.
>>>>
>>>> Another challenge with all of this is backwards compatibility in terms
>>>> of API and pipeline update.
>>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>>> because it's typically used in batch-mode pipelines that don't get updated.
>>>> I would recommend to start with this, perhaps even with only the
>>>> untriggered codepath (it is much more commonly used) - that will pave the
>>>> way for future work.
>>>>
>>>> Hope this helps, please ask more if something is unclear!
>>>>
>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hey Eugene!!
>>>>>
>>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>>> time I might have to put into but... yeah, let’s try it.
>>>>>
>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>
>>>>> Thanks!
>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>> helping out?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>>>> understanding. I'd also be interested in getting batch load result's
>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>>> Pubsub message to trigger further processing.
>>>>>>>>
>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>
>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>
>>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>>>> Advice appreciated.
>>>>>>>>
>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>
>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>         TableSchema s2 = new
>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>
>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>> //
>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>                 .withSchema(s2)
>>>>>>>>
>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>> //
>>>>>>>>
>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>
>>>>>>>>
>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>> OnCompletion()));
>>>>>>>>
>>>>>>>> where
>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>> small sleep for testing purposes, and
>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>
>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>> pubsub message" rather than print..
>>>>>>>>
>>>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>
>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>
>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>>>> batch-load-job that is triggered:
>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>> empty);
>>>>>>>>   }
>>>>>>>>
>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>
>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Simon
>>>>>>>>
>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
All good so far. I've been a bit side tracked but more or less I have the
idea of using the JobStatus as part of the collection so that not only the
completion is signaled, but also the result (success/failure) can be
accessed, how does it sound?

Regards

On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Hi Carlos,
>
> Any updates / roadblocks you hit?
>
>
> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Awesome!! Thanks for the heads up, very exciting, this is going to make a
>> lot of people happy :)
>>
>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>
>>> + dev@beam.apache.org
>>>
>>> Just a quick email to let you know that I'm starting developing this.
>>>
>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Thank you for expressing interest in taking this on! Let me give you a
>>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>>
>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>> PCollection) that can be used as input to Wait.on().
>>>> Currently it returns a WriteResult, which only contains a
>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>> directly, instead we should add another component to WriteResult that
>>>> represents the result of successfully writing some data.
>>>>
>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>>> we could sequence different destinations independently (currently Wait.on()
>>>> does not provide such a feature, but it could); and it will require
>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>>> might be - it is something that represents the result of successfully
>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>> type) for now, until we figure out something better.
>>>>
>>>> Implementing this would require roughly the following work:
>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>> expandTriggered() and expandUntriggered()
>>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>>> and multi-partition. Both need to be handled - we need to get a
>>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>>> PCollections together to get the final result. The single-partition
>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>> bit to keep it until the end.
>>>> ...- expandUntriggered() should be treated the same way.
>>>> - Modify the StreamingWriteTables transform to provide it
>>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>>> the way until the end of StreamingWriteTables - it will need to be
>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>> should be easy.
>>>>
>>>> Another challenge with all of this is backwards compatibility in terms
>>>> of API and pipeline update.
>>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>>> because it's typically used in batch-mode pipelines that don't get updated.
>>>> I would recommend to start with this, perhaps even with only the
>>>> untriggered codepath (it is much more commonly used) - that will pave the
>>>> way for future work.
>>>>
>>>> Hope this helps, please ask more if something is unclear!
>>>>
>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hey Eugene!!
>>>>>
>>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>>> time I might have to put into but... yeah, let’s try it.
>>>>>
>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>
>>>>> Thanks!
>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>>> be a welcome contribution to change this - many people expressed interest
>>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>>> helping out?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>>>> understanding. I'd also be interested in getting batch load result's
>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>>> Pubsub message to trigger further processing.
>>>>>>>>
>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>
>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>
>>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>>>> Advice appreciated.
>>>>>>>>
>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>
>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>         TableSchema s2 = new
>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>
>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>> //
>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>                 .withSchema(s2)
>>>>>>>>
>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>> //
>>>>>>>>
>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>
>>>>>>>>
>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>> OnCompletion()));
>>>>>>>>
>>>>>>>> where
>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>> small sleep for testing purposes, and
>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>
>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>>> pubsub message" rather than print..
>>>>>>>>
>>>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>
>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on - ie
>>>>>>>> the output is "complete for window" only after all data has been uploaded,
>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>
>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>>>> batch-load-job that is triggered:
>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>> empty);
>>>>>>>>   }
>>>>>>>>
>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status
>>>>>>>> until it reaches DONE or FAILED. However that information does not appear
>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>
>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Simon
>>>>>>>>
>>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Carlos,

Any updates / roadblocks you hit?

On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Awesome!! Thanks for the heads up, very exciting, this is going to make a
> lot of people happy :)
>
> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>
>> + dev@beam.apache.org
>>
>> Just a quick email to let you know that I'm starting developing this.
>>
>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hi Carlos,
>>>
>>> Thank you for expressing interest in taking this on! Let me give you a
>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>
>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>> PCollection) that can be used as input to Wait.on().
>>> Currently it returns a WriteResult, which only contains a
>>> PCollection<TableRow> of failed inserts - that one can not be used
>>> directly, instead we should add another component to WriteResult that
>>> represents the result of successfully writing some data.
>>>
>>> Given that BQIO supports dynamic destination writes, I think it makes
>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>> we could sequence different destinations independently (currently Wait.on()
>>> does not provide such a feature, but it could); and it will require
>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>> might be - it is something that represents the result of successfully
>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>> type) for now, until we figure out something better.
>>>
>>> Implementing this would require roughly the following work:
>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>> expandTriggered() and expandUntriggered()
>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>> and multi-partition. Both need to be handled - we need to get a
>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>> PCollections together to get the final result. The single-partition
>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>> codepath drops DestinationT along the way and will need to be refactored a
>>> bit to keep it until the end.
>>> ...- expandUntriggered() should be treated the same way.
>>> - Modify the StreamingWriteTables transform to provide it
>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>> the way until the end of StreamingWriteTables - it will need to be
>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>> should be easy.
>>>
>>> Another challenge with all of this is backwards compatibility in terms
>>> of API and pipeline update.
>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>> because it's typically used in batch-mode pipelines that don't get updated.
>>> I would recommend to start with this, perhaps even with only the
>>> untriggered codepath (it is much more commonly used) - that will pave the
>>> way for future work.
>>>
>>> Hope this helps, please ask more if something is unclear!
>>>
>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hey Eugene!!
>>>>
>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>> time I might have to put into but... yeah, let’s try it.
>>>>
>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>
>>>> Thanks!
>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>> be a welcome contribution to change this - many people expressed interest
>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>> helping out?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>>> understanding. I'd also be interested in getting batch load result's
>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>> Pubsub message to trigger further processing.
>>>>>>>
>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>
>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>
>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>>> Advice appreciated.
>>>>>>>
>>>>>>> Here's (most of) the relevant test code:
>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>
>>>>>>>         TableFieldSchema f1 = new
>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>         TableSchema s2 = new
>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>
>>>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>>>> BigQueryIO.<String>write() //
>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>> //
>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>                 .withSchema(s2)
>>>>>>>
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>> //
>>>>>>>
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>
>>>>>>>
>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>> OnCompletion()));
>>>>>>>
>>>>>>> where
>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>> small sleep for testing purposes, and
>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>
>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>> pubsub message" rather than print..
>>>>>>>
>>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>
>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>
>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>>> batch-load-job that is triggered:
>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>     PCollection<TableRow> empty =
>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>>   }
>>>>>>>
>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>>>> completion-state via the failedInserts stream).
>>>>>>>
>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simon
>>>>>>>
>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Carlos,

Any updates / roadblocks you hit?

On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <ki...@google.com>
wrote:

> Awesome!! Thanks for the heads up, very exciting, this is going to make a
> lot of people happy :)
>
> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>
>> + dev@beam.apache.org
>>
>> Just a quick email to let you know that I'm starting developing this.
>>
>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hi Carlos,
>>>
>>> Thank you for expressing interest in taking this on! Let me give you a
>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>
>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>> PCollection) that can be used as input to Wait.on().
>>> Currently it returns a WriteResult, which only contains a
>>> PCollection<TableRow> of failed inserts - that one can not be used
>>> directly, instead we should add another component to WriteResult that
>>> represents the result of successfully writing some data.
>>>
>>> Given that BQIO supports dynamic destination writes, I think it makes
>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>>> we could sequence different destinations independently (currently Wait.on()
>>> does not provide such a feature, but it could); and it will require
>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>>> might be - it is something that represents the result of successfully
>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>> type) for now, until we figure out something better.
>>>
>>> Implementing this would require roughly the following work:
>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>> expandTriggered() and expandUntriggered()
>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>> and multi-partition. Both need to be handled - we need to get a
>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>>> PCollections together to get the final result. The single-partition
>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>> codepath drops DestinationT along the way and will need to be refactored a
>>> bit to keep it until the end.
>>> ...- expandUntriggered() should be treated the same way.
>>> - Modify the StreamingWriteTables transform to provide it
>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>> the way until the end of StreamingWriteTables - it will need to be
>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>> should be easy.
>>>
>>> Another challenge with all of this is backwards compatibility in terms
>>> of API and pipeline update.
>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>> because it's typically used in batch-mode pipelines that don't get updated.
>>> I would recommend to start with this, perhaps even with only the
>>> untriggered codepath (it is much more commonly used) - that will pave the
>>> way for future work.
>>>
>>> Hope this helps, please ask more if something is unclear!
>>>
>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hey Eugene!!
>>>>
>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>> time I might have to put into but... yeah, let’s try it.
>>>>
>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>
>>>> Thanks!
>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>>> be a welcome contribution to change this - many people expressed interest
>>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>>> helping out?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>>> understanding. I'd also be interested in getting batch load result's
>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>>> Pubsub message to trigger further processing.
>>>>>>>
>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>
>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>
>>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>>> Advice appreciated.
>>>>>>>
>>>>>>> Here's (most of) the relevant test code:
>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>
>>>>>>>         TableFieldSchema f1 = new
>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>         TableSchema s2 = new
>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>
>>>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>>>> BigQueryIO.<String>write() //
>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>> //
>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>                 .withSchema(s2)
>>>>>>>
>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>> //
>>>>>>>
>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>
>>>>>>>
>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>> OnCompletion()));
>>>>>>>
>>>>>>> where
>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>> small sleep for testing purposes, and
>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>
>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>> derived from lines, eg min/max record id, and the operation would be "send
>>>>>>> pubsub message" rather than print..
>>>>>>>
>>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>
>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>
>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>>> batch-load-job that is triggered:
>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>     PCollection<TableRow> empty =
>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>>   }
>>>>>>>
>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>>>> completion-state via the failedInserts stream).
>>>>>>>
>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simon
>>>>>>>
>>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Chaim Turkel <ch...@behalf.com>.
yes, i must say i have been waiting for this for over 6 months, it
would help a lot
chaim
On Tue, Jul 3, 2018 at 5:14 PM Eugene Kirpichov <ki...@google.com> wrote:
>
> Awesome!! Thanks for the heads up, very exciting, this is going to make a lot of people happy :)
>
> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>
>> + dev@beam.apache.org
>>
>> Just a quick email to let you know that I'm starting developing this.
>>
>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com> wrote:
>>>
>>> Hi Carlos,
>>>
>>> Thank you for expressing interest in taking this on! Let me give you a few pointers to start, and I'll be happy to help everywhere along the way.
>>>
>>> Basically we want BigQueryIO.write() to return something (e.g. a PCollection) that can be used as input to Wait.on().
>>> Currently it returns a WriteResult, which only contains a PCollection<TableRow> of failed inserts - that one can not be used directly, instead we should add another component to WriteResult that represents the result of successfully writing some data.
>>>
>>> Given that BQIO supports dynamic destination writes, I think it makes sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory we could sequence different destinations independently (currently Wait.on() does not provide such a feature, but it could); and it will require changing WriteResult to be WriteResult<DestinationT>. As for what the "???" might be - it is something that represents the result of successfully writing a window of data. I think it can even be Void, or "?" (wildcard type) for now, until we figure out something better.
>>>
>>> Implementing this would require roughly the following work:
>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>> - Modify the BatchLoads transform to provide it on both codepaths: expandTriggered() and expandUntriggered()
>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition and multi-partition. Both need to be handled - we need to get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two PCollections together to get the final result. The single-partition codepath (writeSinglePartition) under the hood already uses WriteTables that returns a KV<DestinationT, ...> so it's directly usable. The multi-partition codepath ends in WriteRenameTriggered - unfortunately, this codepath drops DestinationT along the way and will need to be refactored a bit to keep it until the end.
>>> ...- expandUntriggered() should be treated the same way.
>>> - Modify the StreamingWriteTables transform to provide it
>>> ...- Here also, the challenge is to propagate the DestinationT type all the way until the end of StreamingWriteTables - it will need to be refactored. After such a refactoring, returning a KV<DestinationT, ...> should be easy.
>>>
>>> Another challenge with all of this is backwards compatibility in terms of API and pipeline update.
>>> Pipeline update is much less of a concern for the BatchLoads codepath, because it's typically used in batch-mode pipelines that don't get updated. I would recommend to start with this, perhaps even with only the untriggered codepath (it is much more commonly used) - that will pave the way for future work.
>>>
>>> Hope this helps, please ask more if something is unclear!
>>>
>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>>>
>>>> Hey Eugene!!
>>>>
>>>> I’d gladly take a stab on it although I’m not sure how much available time I might have to put into but... yeah, let’s try it.
>>>>
>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>
>>>> Thanks!
>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> Yes, you're both right - BigQueryIO.write() is currently not implemented in a way that it can be used with Wait.on(). It would certainly be a welcome contribution to change this - many people expressed interest in specifically waiting for BigQuery writes. Is any of you interested in helping out?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com> wrote:
>>>>>>
>>>>>> Hi Simon, I think your explanation was very accurate, at least to my understanding. I'd also be interested in getting batch load result's feedback on the pipeline... hopefully someone may suggest something, otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <si...@unbelievable-machine.com> wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a Pubsub message to trigger further processing.
>>>>>>>
>>>>>>> I found this thread titled "Callbacks/other functions run after a PDone/output transform" on the user-list which was very relevant:
>>>>>>>   https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>
>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>
>>>>>>> Unfortunately, it appears that the Wait.on transform does not work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice appreciated.
>>>>>>>
>>>>>>> Here's (most of) the relevant test code:
>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>         PCollection<String> lines = p.apply("Read Input", Create.of("line1", "line2", "line3", "line4"));
>>>>>>>
>>>>>>>         TableFieldSchema f1 = new TableFieldSchema().setName("value").setType("string");
>>>>>>>         TableSchema s2 = new TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>
>>>>>>>         WriteResult writeResult = lines.apply("Write and load data", BigQueryIO.<String>write() //
>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>                 .withSchema(s2)
>>>>>>>                 .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) //
>>>>>>>                 .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>
>>>>>>>         lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new OnCompletion()));
>>>>>>>
>>>>>>> where
>>>>>>> + format-function "SlowFormatter" prints out each line and has a small sleep for testing purposes, and
>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>
>>>>>>> In production code, OnCompletion would be fed some collection derived from lines, eg min/max record id, and the operation would be "send pubsub message" rather than print..
>>>>>>>
>>>>>>> My expectation is that the "SlowFormatter" would run for each line, then the data would be uploaded, then OnCompletion would print each line. And indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>
>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but believe that it can be used as a "signal" for the Wait.on - ie the output is "complete for window" only after all data has been uploaded, which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>
>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that method BatchLoads.writeResult returns a WriteResult that wraps an "empty" failedInserts collection, ie data which is not connected to the batch-load-job that is triggered:
>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>     PCollection<TableRow> empty =
>>>>>>>         p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>>   }
>>>>>>>
>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once a job is submitted the code repeatedly polls the job status until it reaches DONE or FAILED. However that information does not appear to be exposed anywhere (unlike streaming which effectively exposes completion-state via the failedInserts stream).
>>>>>>>
>>>>>>> If I have misunderstood something, corrections welcome! If not, suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Simon
>>>>>>>

-- 


Loans are funded by
FinWise Bank, a Utah-chartered bank located in Sandy, 
Utah, member FDIC, Equal
Opportunity Lender. Merchant Cash Advances are 
made by Behalf. For more
information on ECOA, click here 
<https://www.behalf.com/legal/ecoa/>. For important information about 
opening a new
account, review Patriot Act procedures here 
<https://www.behalf.com/legal/patriot/>.
Visit Legal 
<https://www.behalf.com/legal/> to
review our comprehensive program terms, 
conditions, and disclosures. 

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Awesome!! Thanks for the heads up, very exciting, this is going to make a
lot of people happy :)

On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> + dev@beam.apache.org
>
> Just a quick email to let you know that I'm starting developing this.
>
> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi Carlos,
>>
>> Thank you for expressing interest in taking this on! Let me give you a
>> few pointers to start, and I'll be happy to help everywhere along the way.
>>
>> Basically we want BigQueryIO.write() to return something (e.g. a
>> PCollection) that can be used as input to Wait.on().
>> Currently it returns a WriteResult, which only contains a
>> PCollection<TableRow> of failed inserts - that one can not be used
>> directly, instead we should add another component to WriteResult that
>> represents the result of successfully writing some data.
>>
>> Given that BQIO supports dynamic destination writes, I think it makes
>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>> we could sequence different destinations independently (currently Wait.on()
>> does not provide such a feature, but it could); and it will require
>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>> might be - it is something that represents the result of successfully
>> writing a window of data. I think it can even be Void, or "?" (wildcard
>> type) for now, until we figure out something better.
>>
>> Implementing this would require roughly the following work:
>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>> - Modify the BatchLoads transform to provide it on both codepaths:
>> expandTriggered() and expandUntriggered()
>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>> and multi-partition. Both need to be handled - we need to get a
>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>> PCollections together to get the final result. The single-partition
>> codepath (writeSinglePartition) under the hood already uses WriteTables
>> that returns a KV<DestinationT, ...> so it's directly usable. The
>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>> codepath drops DestinationT along the way and will need to be refactored a
>> bit to keep it until the end.
>> ...- expandUntriggered() should be treated the same way.
>> - Modify the StreamingWriteTables transform to provide it
>> ...- Here also, the challenge is to propagate the DestinationT type all
>> the way until the end of StreamingWriteTables - it will need to be
>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>> should be easy.
>>
>> Another challenge with all of this is backwards compatibility in terms of
>> API and pipeline update.
>> Pipeline update is much less of a concern for the BatchLoads codepath,
>> because it's typically used in batch-mode pipelines that don't get updated.
>> I would recommend to start with this, perhaps even with only the
>> untriggered codepath (it is much more commonly used) - that will pave the
>> way for future work.
>>
>> Hope this helps, please ask more if something is unclear!
>>
>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> Hey Eugene!!
>>>
>>> I’d gladly take a stab on it although I’m not sure how much available
>>> time I might have to put into but... yeah, let’s try it.
>>>
>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>
>>> Thanks!
>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>> be a welcome contribution to change this - many people expressed interest
>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>> helping out?
>>>>
>>>> Thanks.
>>>>
>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>> understanding. I'd also be interested in getting batch load result's
>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>> Pubsub message to trigger further processing.
>>>>>>
>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>
>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>
>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>
>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>> Advice appreciated.
>>>>>>
>>>>>> Here's (most of) the relevant test code:
>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>
>>>>>>         TableFieldSchema f1 = new
>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>         TableSchema s2 = new
>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>
>>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>>> BigQueryIO.<String>write() //
>>>>>>                 .to(options.getTableSpec()) //
>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>> //
>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>                 .withSchema(s2)
>>>>>>
>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>> //
>>>>>>
>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>
>>>>>>
>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>> OnCompletion()));
>>>>>>
>>>>>> where
>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>> small sleep for testing purposes, and
>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>
>>>>>> In production code, OnCompletion would be fed some collection derived
>>>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>>>> message" rather than print..
>>>>>>
>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>
>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>
>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>> batch-load-job that is triggered:
>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>     PCollection<TableRow> empty =
>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>   }
>>>>>>
>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>>> completion-state via the failedInserts stream).
>>>>>>
>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>
>>>>>> Thanks,
>>>>>> Simon
>>>>>>
>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Awesome!! Thanks for the heads up, very exciting, this is going to make a
lot of people happy :)

On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> + dev@beam.apache.org
>
> Just a quick email to let you know that I'm starting developing this.
>
> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi Carlos,
>>
>> Thank you for expressing interest in taking this on! Let me give you a
>> few pointers to start, and I'll be happy to help everywhere along the way.
>>
>> Basically we want BigQueryIO.write() to return something (e.g. a
>> PCollection) that can be used as input to Wait.on().
>> Currently it returns a WriteResult, which only contains a
>> PCollection<TableRow> of failed inserts - that one can not be used
>> directly, instead we should add another component to WriteResult that
>> represents the result of successfully writing some data.
>>
>> Given that BQIO supports dynamic destination writes, I think it makes
>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
>> we could sequence different destinations independently (currently Wait.on()
>> does not provide such a feature, but it could); and it will require
>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
>> might be - it is something that represents the result of successfully
>> writing a window of data. I think it can even be Void, or "?" (wildcard
>> type) for now, until we figure out something better.
>>
>> Implementing this would require roughly the following work:
>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>> - Modify the BatchLoads transform to provide it on both codepaths:
>> expandTriggered() and expandUntriggered()
>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>> and multi-partition. Both need to be handled - we need to get a
>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
>> PCollections together to get the final result. The single-partition
>> codepath (writeSinglePartition) under the hood already uses WriteTables
>> that returns a KV<DestinationT, ...> so it's directly usable. The
>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>> codepath drops DestinationT along the way and will need to be refactored a
>> bit to keep it until the end.
>> ...- expandUntriggered() should be treated the same way.
>> - Modify the StreamingWriteTables transform to provide it
>> ...- Here also, the challenge is to propagate the DestinationT type all
>> the way until the end of StreamingWriteTables - it will need to be
>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>> should be easy.
>>
>> Another challenge with all of this is backwards compatibility in terms of
>> API and pipeline update.
>> Pipeline update is much less of a concern for the BatchLoads codepath,
>> because it's typically used in batch-mode pipelines that don't get updated.
>> I would recommend to start with this, perhaps even with only the
>> untriggered codepath (it is much more commonly used) - that will pave the
>> way for future work.
>>
>> Hope this helps, please ask more if something is unclear!
>>
>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> Hey Eugene!!
>>>
>>> I’d gladly take a stab on it although I’m not sure how much available
>>> time I might have to put into but... yeah, let’s try it.
>>>
>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>
>>> Thanks!
>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>> implemented in a way that it can be used with Wait.on(). It would certainly
>>>> be a welcome contribution to change this - many people expressed interest
>>>> in specifically waiting for BigQuery writes. Is any of you interested in
>>>> helping out?
>>>>
>>>> Thanks.
>>>>
>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>>> understanding. I'd also be interested in getting batch load result's
>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>>> Pubsub message to trigger further processing.
>>>>>>
>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>
>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>
>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>
>>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>>> Advice appreciated.
>>>>>>
>>>>>> Here's (most of) the relevant test code:
>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>
>>>>>>         TableFieldSchema f1 = new
>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>         TableSchema s2 = new
>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>
>>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>>> BigQueryIO.<String>write() //
>>>>>>                 .to(options.getTableSpec()) //
>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>> //
>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>                 .withSchema(s2)
>>>>>>
>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>> //
>>>>>>
>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>
>>>>>>
>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>> OnCompletion()));
>>>>>>
>>>>>> where
>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>> small sleep for testing purposes, and
>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>
>>>>>> In production code, OnCompletion would be fed some collection derived
>>>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>>>> message" rather than print..
>>>>>>
>>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>
>>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>
>>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>>> failedInserts collection, ie data which is not connected to the
>>>>>> batch-load-job that is triggered:
>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>     PCollection<TableRow> empty =
>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>>   }
>>>>>>
>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>>> completion-state via the failedInserts stream).
>>>>>>
>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>>
>>>>>> Thanks,
>>>>>> Simon
>>>>>>
>>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
+ dev@beam.apache.org

Just a quick email to let you know that I'm starting developing this.

On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Hi Carlos,
>
> Thank you for expressing interest in taking this on! Let me give you a few
> pointers to start, and I'll be happy to help everywhere along the way.
>
> Basically we want BigQueryIO.write() to return something (e.g. a
> PCollection) that can be used as input to Wait.on().
> Currently it returns a WriteResult, which only contains a
> PCollection<TableRow> of failed inserts - that one can not be used
> directly, instead we should add another component to WriteResult that
> represents the result of successfully writing some data.
>
> Given that BQIO supports dynamic destination writes, I think it makes
> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
> we could sequence different destinations independently (currently Wait.on()
> does not provide such a feature, but it could); and it will require
> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
> might be - it is something that represents the result of successfully
> writing a window of data. I think it can even be Void, or "?" (wildcard
> type) for now, until we figure out something better.
>
> Implementing this would require roughly the following work:
> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
> - Modify the BatchLoads transform to provide it on both codepaths:
> expandTriggered() and expandUntriggered()
> ...- expandTriggered() itself writes via 2 codepaths: single-partition and
> multi-partition. Both need to be handled - we need to get a
> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
> PCollections together to get the final result. The single-partition
> codepath (writeSinglePartition) under the hood already uses WriteTables
> that returns a KV<DestinationT, ...> so it's directly usable. The
> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
> codepath drops DestinationT along the way and will need to be refactored a
> bit to keep it until the end.
> ...- expandUntriggered() should be treated the same way.
> - Modify the StreamingWriteTables transform to provide it
> ...- Here also, the challenge is to propagate the DestinationT type all
> the way until the end of StreamingWriteTables - it will need to be
> refactored. After such a refactoring, returning a KV<DestinationT, ...>
> should be easy.
>
> Another challenge with all of this is backwards compatibility in terms of
> API and pipeline update.
> Pipeline update is much less of a concern for the BatchLoads codepath,
> because it's typically used in batch-mode pipelines that don't get updated.
> I would recommend to start with this, perhaps even with only the
> untriggered codepath (it is much more commonly used) - that will pave the
> way for future work.
>
> Hope this helps, please ask more if something is unclear!
>
> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Hey Eugene!!
>>
>> I’d gladly take a stab on it although I’m not sure how much available
>> time I might have to put into but... yeah, let’s try it.
>>
>> Where should I begin? Is there a Jira issue or shall I file one?
>>
>> Thanks!
>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Yes, you're both right - BigQueryIO.write() is currently not implemented
>>> in a way that it can be used with Wait.on(). It would certainly be a
>>> welcome contribution to change this - many people expressed interest in
>>> specifically waiting for BigQuery writes. Is any of you interested in
>>> helping out?
>>>
>>> Thanks.
>>>
>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>> understanding. I'd also be interested in getting batch load result's
>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>> Pubsub message to trigger further processing.
>>>>>
>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>
>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>
>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>
>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>> Advice appreciated.
>>>>>
>>>>> Here's (most of) the relevant test code:
>>>>>         Pipeline p = Pipeline.create(options);
>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>
>>>>>         TableFieldSchema f1 = new
>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>         TableSchema s2 = new
>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>
>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>> BigQueryIO.<String>write() //
>>>>>                 .to(options.getTableSpec()) //
>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>> //
>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>                 .withSchema(s2)
>>>>>
>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>> //
>>>>>
>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>
>>>>>
>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>> OnCompletion()));
>>>>>
>>>>> where
>>>>> + format-function "SlowFormatter" prints out each line and has a small
>>>>> sleep for testing purposes, and
>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>
>>>>> In production code, OnCompletion would be fed some collection derived
>>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>>> message" rather than print..
>>>>>
>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>
>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>
>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>> failedInserts collection, ie data which is not connected to the
>>>>> batch-load-job that is triggered:
>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>     PCollection<TableRow> empty =
>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>   }
>>>>>
>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>> completion-state via the failedInserts stream).
>>>>>
>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>
>>>>> Thanks,
>>>>> Simon
>>>>>
>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
+ dev@beam.apache.org

Just a quick email to let you know that I'm starting developing this.

On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <ki...@google.com>
wrote:

> Hi Carlos,
>
> Thank you for expressing interest in taking this on! Let me give you a few
> pointers to start, and I'll be happy to help everywhere along the way.
>
> Basically we want BigQueryIO.write() to return something (e.g. a
> PCollection) that can be used as input to Wait.on().
> Currently it returns a WriteResult, which only contains a
> PCollection<TableRow> of failed inserts - that one can not be used
> directly, instead we should add another component to WriteResult that
> represents the result of successfully writing some data.
>
> Given that BQIO supports dynamic destination writes, I think it makes
> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory
> we could sequence different destinations independently (currently Wait.on()
> does not provide such a feature, but it could); and it will require
> changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
> might be - it is something that represents the result of successfully
> writing a window of data. I think it can even be Void, or "?" (wildcard
> type) for now, until we figure out something better.
>
> Implementing this would require roughly the following work:
> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
> - Modify the BatchLoads transform to provide it on both codepaths:
> expandTriggered() and expandUntriggered()
> ...- expandTriggered() itself writes via 2 codepaths: single-partition and
> multi-partition. Both need to be handled - we need to get a
> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
> PCollections together to get the final result. The single-partition
> codepath (writeSinglePartition) under the hood already uses WriteTables
> that returns a KV<DestinationT, ...> so it's directly usable. The
> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
> codepath drops DestinationT along the way and will need to be refactored a
> bit to keep it until the end.
> ...- expandUntriggered() should be treated the same way.
> - Modify the StreamingWriteTables transform to provide it
> ...- Here also, the challenge is to propagate the DestinationT type all
> the way until the end of StreamingWriteTables - it will need to be
> refactored. After such a refactoring, returning a KV<DestinationT, ...>
> should be easy.
>
> Another challenge with all of this is backwards compatibility in terms of
> API and pipeline update.
> Pipeline update is much less of a concern for the BatchLoads codepath,
> because it's typically used in batch-mode pipelines that don't get updated.
> I would recommend to start with this, perhaps even with only the
> untriggered codepath (it is much more commonly used) - that will pave the
> way for future work.
>
> Hope this helps, please ask more if something is unclear!
>
> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Hey Eugene!!
>>
>> I’d gladly take a stab on it although I’m not sure how much available
>> time I might have to put into but... yeah, let’s try it.
>>
>> Where should I begin? Is there a Jira issue or shall I file one?
>>
>> Thanks!
>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Yes, you're both right - BigQueryIO.write() is currently not implemented
>>> in a way that it can be used with Wait.on(). It would certainly be a
>>> welcome contribution to change this - many people expressed interest in
>>> specifically waiting for BigQuery writes. Is any of you interested in
>>> helping out?
>>>
>>> Thanks.
>>>
>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>> understanding. I'd also be interested in getting batch load result's
>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>> simon.kitching@unbelievable-machine.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>> Pubsub message to trigger further processing.
>>>>>
>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>
>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>
>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>
>>>>> Unfortunately, it appears that the Wait.on transform does not work
>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work.
>>>>> Advice appreciated.
>>>>>
>>>>> Here's (most of) the relevant test code:
>>>>>         Pipeline p = Pipeline.create(options);
>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>
>>>>>         TableFieldSchema f1 = new
>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>         TableSchema s2 = new
>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>
>>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>>> BigQueryIO.<String>write() //
>>>>>                 .to(options.getTableSpec()) //
>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>> //
>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>                 .withSchema(s2)
>>>>>
>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>> //
>>>>>
>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>
>>>>>
>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>> OnCompletion()));
>>>>>
>>>>> where
>>>>> + format-function "SlowFormatter" prints out each line and has a small
>>>>> sleep for testing purposes, and
>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>
>>>>> In production code, OnCompletion would be fed some collection derived
>>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>>> message" rather than print..
>>>>>
>>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>
>>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>>> is "complete for window" only after all data has been uploaded, which is
>>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>>
>>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>>> failedInserts collection, ie data which is not connected to the
>>>>> batch-load-job that is triggered:
>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>     PCollection<TableRow> empty =
>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>>   }
>>>>>
>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>>> once a job is submitted the code repeatedly polls the job status until it
>>>>> reaches DONE or FAILED. However that information does not appear to be
>>>>> exposed anywhere (unlike streaming which effectively exposes
>>>>> completion-state via the failedInserts stream).
>>>>>
>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>>
>>>>> Thanks,
>>>>> Simon
>>>>>
>>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Hi Carlos,

Thank you for expressing interest in taking this on! Let me give you a few
pointers to start, and I'll be happy to help everywhere along the way.

Basically we want BigQueryIO.write() to return something (e.g. a
PCollection) that can be used as input to Wait.on().
Currently it returns a WriteResult, which only contains a
PCollection<TableRow> of failed inserts - that one can not be used
directly, instead we should add another component to WriteResult that
represents the result of successfully writing some data.

Given that BQIO supports dynamic destination writes, I think it makes sense
for that to be a PCollection<KV<DestinationT, ???>> so that in theory we
could sequence different destinations independently (currently Wait.on()
does not provide such a feature, but it could); and it will require
changing WriteResult to be WriteResult<DestinationT>. As for what the "???"
might be - it is something that represents the result of successfully
writing a window of data. I think it can even be Void, or "?" (wildcard
type) for now, until we figure out something better.

Implementing this would require roughly the following work:
- Add this PCollection<KV<DestinationT, ?>> to WriteResult
- Modify the BatchLoads transform to provide it on both codepaths:
expandTriggered() and expandUntriggered()
...- expandTriggered() itself writes via 2 codepaths: single-partition and
multi-partition. Both need to be handled - we need to get a
PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two
PCollections together to get the final result. The single-partition
codepath (writeSinglePartition) under the hood already uses WriteTables
that returns a KV<DestinationT, ...> so it's directly usable. The
multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
codepath drops DestinationT along the way and will need to be refactored a
bit to keep it until the end.
...- expandUntriggered() should be treated the same way.
- Modify the StreamingWriteTables transform to provide it
...- Here also, the challenge is to propagate the DestinationT type all the
way until the end of StreamingWriteTables - it will need to be refactored.
After such a refactoring, returning a KV<DestinationT, ...> should be easy.

Another challenge with all of this is backwards compatibility in terms of
API and pipeline update.
Pipeline update is much less of a concern for the BatchLoads codepath,
because it's typically used in batch-mode pipelines that don't get updated.
I would recommend to start with this, perhaps even with only the
untriggered codepath (it is much more commonly used) - that will pave the
way for future work.

Hope this helps, please ask more if something is unclear!

On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Hey Eugene!!
>
> I’d gladly take a stab on it although I’m not sure how much available time
> I might have to put into but... yeah, let’s try it.
>
> Where should I begin? Is there a Jira issue or shall I file one?
>
> Thanks!
> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> Hi,
>>
>> Yes, you're both right - BigQueryIO.write() is currently not implemented
>> in a way that it can be used with Wait.on(). It would certainly be a
>> welcome contribution to change this - many people expressed interest in
>> specifically waiting for BigQuery writes. Is any of you interested in
>> helping out?
>>
>> Thanks.
>>
>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
>> wrote:
>>
>>> Hi Simon, I think your explanation was very accurate, at least to my
>>> understanding. I'd also be interested in getting batch load result's
>>> feedback on the pipeline... hopefully someone may suggest something,
>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>
>>> Thanks!
>>>
>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>> simon.kitching@unbelievable-machine.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>> Pubsub message to trigger further processing.
>>>>
>>>> I found this thread titled "Callbacks/other functions run after a
>>>> PDone/output transform" on the user-list which was very relevant:
>>>>
>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>
>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>
>>>> Unfortunately, it appears that the Wait.on transform does not work with
>>>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
>>>> appreciated.
>>>>
>>>> Here's (most of) the relevant test code:
>>>>         Pipeline p = Pipeline.create(options);
>>>>         PCollection<String> lines = p.apply("Read Input",
>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>
>>>>         TableFieldSchema f1 = new
>>>> TableFieldSchema().setName("value").setType("string");
>>>>         TableSchema s2 = new
>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>
>>>>         WriteResult writeResult = lines.apply("Write and load data",
>>>> BigQueryIO.<String>write() //
>>>>                 .to(options.getTableSpec()) //
>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>> //
>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>                 .withSchema(s2)
>>>>
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>> //
>>>>
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>
>>>>
>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>> OnCompletion()));
>>>>
>>>> where
>>>> + format-function "SlowFormatter" prints out each line and has a small
>>>> sleep for testing purposes, and
>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>
>>>> In production code, OnCompletion would be fed some collection derived
>>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>>> message" rather than print..
>>>>
>>>> My expectation is that the "SlowFormatter" would run for each line,
>>>> then the data would be uploaded, then OnCompletion would print each line.
>>>> And indeed that happens when STREAMING_INSERTS is used. However for
>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>
>>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>>> is "complete for window" only after all data has been uploaded, which is
>>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>>
>>>> I suspect the reason that this does not work for FILE_LOADS is that
>>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>>> failedInserts collection, ie data which is not connected to the
>>>> batch-load-job that is triggered:
>>>>   private WriteResult writeResult(Pipeline p) {
>>>>     PCollection<TableRow> empty =
>>>>         p.apply("CreateEmptyFailedInserts",
>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>>   }
>>>>
>>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>>> once a job is submitted the code repeatedly polls the job status until it
>>>> reaches DONE or FAILED. However that information does not appear to be
>>>> exposed anywhere (unlike streaming which effectively exposes
>>>> completion-state via the failedInserts stream).
>>>>
>>>> If I have misunderstood something, corrections welcome! If not,
>>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>>
>>>> Thanks,
>>>> Simon
>>>>
>>>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Hey Eugene!!

I’d gladly take a stab on it although I’m not sure how much available time
I might have to put into but... yeah, let’s try it.

Where should I begin? Is there a Jira issue or shall I file one?

Thanks!
On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <ki...@google.com> wrote:

> Hi,
>
> Yes, you're both right - BigQueryIO.write() is currently not implemented
> in a way that it can be used with Wait.on(). It would certainly be a
> welcome contribution to change this - many people expressed interest in
> specifically waiting for BigQuery writes. Is any of you interested in
> helping out?
>
> Thanks.
>
> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com>
> wrote:
>
>> Hi Simon, I think your explanation was very accurate, at least to my
>> understanding. I'd also be interested in getting batch load result's
>> feedback on the pipeline... hopefully someone may suggest something,
>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>
>> Thanks!
>>
>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>> simon.kitching@unbelievable-machine.com> wrote:
>>
>>> Hi All,
>>>
>>> I need to write some data to BigQuery (batch-mode) and then send a
>>> Pubsub message to trigger further processing.
>>>
>>> I found this thread titled "Callbacks/other functions run after a
>>> PDone/output transform" on the user-list which was very relevant:
>>>
>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>
>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>
>>> Unfortunately, it appears that the Wait.on transform does not work with
>>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
>>> appreciated.
>>>
>>> Here's (most of) the relevant test code:
>>>         Pipeline p = Pipeline.create(options);
>>>         PCollection<String> lines = p.apply("Read Input",
>>> Create.of("line1", "line2", "line3", "line4"));
>>>
>>>         TableFieldSchema f1 = new
>>> TableFieldSchema().setName("value").setType("string");
>>>         TableSchema s2 = new
>>> TableSchema().setFields(Collections.singletonList(f1));
>>>
>>>         WriteResult writeResult = lines.apply("Write and load data",
>>> BigQueryIO.<String>write() //
>>>                 .to(options.getTableSpec()) //
>>>                 .withFormatFunction(new SlowFormatter()) //
>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
>>> //
>>>                 .withSchema(s2)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>> //
>>>
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>
>>>
>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>> OnCompletion()));
>>>
>>> where
>>> + format-function "SlowFormatter" prints out each line and has a small
>>> sleep for testing purposes, and
>>> + DoFn OnCompletion just prints out the contents of each line
>>>
>>> In production code, OnCompletion would be fed some collection derived
>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>> message" rather than print..
>>>
>>> My expectation is that the "SlowFormatter" would run for each line, then
>>> the data would be uploaded, then OnCompletion would print each line. And
>>> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
>>> LinePrinter runs before the upload takes place.
>>>
>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>> is "complete for window" only after all data has been uploaded, which is
>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>
>>> I suspect the reason that this does not work for FILE_LOADS is that
>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>> failedInserts collection, ie data which is not connected to the
>>> batch-load-job that is triggered:
>>>   private WriteResult writeResult(Pipeline p) {
>>>     PCollection<TableRow> empty =
>>>         p.apply("CreateEmptyFailedInserts",
>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>   }
>>>
>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>> once a job is submitted the code repeatedly polls the job status until it
>>> reaches DONE or FAILED. However that information does not appear to be
>>> exposed anywhere (unlike streaming which effectively exposes
>>> completion-state via the failedInserts stream).
>>>
>>> If I have misunderstood something, corrections welcome! If not,
>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>
>>> Thanks,
>>> Simon
>>>
>>>

Re: BiqQueryIO.write and Wait.on

Posted by Eugene Kirpichov <ki...@google.com>.
Hi,

Yes, you're both right - BigQueryIO.write() is currently not implemented in
a way that it can be used with Wait.on(). It would certainly be a welcome
contribution to change this - many people expressed interest in
specifically waiting for BigQuery writes. Is any of you interested in
helping out?

Thanks.

On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <ca...@mrcalonso.com> wrote:

> Hi Simon, I think your explanation was very accurate, at least to my
> understanding. I'd also be interested in getting batch load result's
> feedback on the pipeline... hopefully someone may suggest something,
> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>
> Thanks!
>
> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
> simon.kitching@unbelievable-machine.com> wrote:
>
>> Hi All,
>>
>> I need to write some data to BigQuery (batch-mode) and then send a Pubsub
>> message to trigger further processing.
>>
>> I found this thread titled "Callbacks/other functions run after a
>> PDone/output transform" on the user-list which was very relevant:
>>
>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>
>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>
>> Unfortunately, it appears that the Wait.on transform does not work with
>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
>> appreciated.
>>
>> Here's (most of) the relevant test code:
>>         Pipeline p = Pipeline.create(options);
>>         PCollection<String> lines = p.apply("Read Input",
>> Create.of("line1", "line2", "line3", "line4"));
>>
>>         TableFieldSchema f1 = new
>> TableFieldSchema().setName("value").setType("string");
>>         TableSchema s2 = new
>> TableSchema().setFields(Collections.singletonList(f1));
>>
>>         WriteResult writeResult = lines.apply("Write and load data",
>> BigQueryIO.<String>write() //
>>                 .to(options.getTableSpec()) //
>>                 .withFormatFunction(new SlowFormatter()) //
>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
>> //
>>                 .withSchema(s2)
>>
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>> //
>>
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>
>>
>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>> OnCompletion()));
>>
>> where
>> + format-function "SlowFormatter" prints out each line and has a small
>> sleep for testing purposes, and
>> + DoFn OnCompletion just prints out the contents of each line
>>
>> In production code, OnCompletion would be fed some collection derived
>> from lines, eg min/max record id, and the operation would be "send pubsub
>> message" rather than print..
>>
>> My expectation is that the "SlowFormatter" would run for each line, then
>> the data would be uploaded, then OnCompletion would print each line. And
>> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
>> LinePrinter runs before the upload takes place.
>>
>> I use WriteResult.getFailedInserts as that is the only "output" that
>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>> believe that it can be used as a "signal" for the Wait.on - ie the output
>> is "complete for window" only after all data has been uploaded, which is
>> what I need. And that does seem to work for STREAMING_LOADS.
>>
>> I suspect the reason that this does not work for FILE_LOADS is that
>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>> failedInserts collection, ie data which is not connected to the
>> batch-load-job that is triggered:
>>   private WriteResult writeResult(Pipeline p) {
>>     PCollection<TableRow> empty =
>>         p.apply("CreateEmptyFailedInserts",
>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>   }
>>
>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once
>> a job is submitted the code repeatedly polls the job status until it
>> reaches DONE or FAILED. However that information does not appear to be
>> exposed anywhere (unlike streaming which effectively exposes
>> completion-state via the failedInserts stream).
>>
>> If I have misunderstood something, corrections welcome! If not,
>> suggestions for workarounds or alternate solutions are also welcome :-)
>>
>> Thanks,
>> Simon
>>
>>

Re: BiqQueryIO.write and Wait.on

Posted by Carlos Alonso <ca...@mrcalonso.com>.
Hi Simon, I think your explanation was very accurate, at least to my
understanding. I'd also be interested in getting batch load result's
feedback on the pipeline... hopefully someone may suggest something,
otherwise we could propose submitting a Jira, or even better, a PR!! :)

Thanks!

On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
simon.kitching@unbelievable-machine.com> wrote:

> Hi All,
>
> I need to write some data to BigQuery (batch-mode) and then send a Pubsub
> message to trigger further processing.
>
> I found this thread titled "Callbacks/other functions run after a
> PDone/output transform" on the user-list which was very relevant:
>
> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>
> Thanks to the author of the Wait transform (Beam 2.4.0)!
>
> Unfortunately, it appears that the Wait.on transform does not work with
> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
> appreciated.
>
> Here's (most of) the relevant test code:
>         Pipeline p = Pipeline.create(options);
>         PCollection<String> lines = p.apply("Read Input",
> Create.of("line1", "line2", "line3", "line4"));
>
>         TableFieldSchema f1 = new
> TableFieldSchema().setName("value").setType("string");
>         TableSchema s2 = new
> TableSchema().setFields(Collections.singletonList(f1));
>
>         WriteResult writeResult = lines.apply("Write and load data",
> BigQueryIO.<String>write() //
>                 .to(options.getTableSpec()) //
>                 .withFormatFunction(new SlowFormatter()) //
>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
> //                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>                 .withSchema(s2)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> //
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
>
> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
> OnCompletion()));
>
> where
> + format-function "SlowFormatter" prints out each line and has a small
> sleep for testing purposes, and
> + DoFn OnCompletion just prints out the contents of each line
>
> In production code, OnCompletion would be fed some collection derived from
> lines, eg min/max record id, and the operation would be "send pubsub
> message" rather than print..
>
> My expectation is that the "SlowFormatter" would run for each line, then
> the data would be uploaded, then OnCompletion would print each line. And
> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
> LinePrinter runs before the upload takes place.
>
> I use WriteResult.getFailedInserts as that is the only "output" that
> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
> believe that it can be used as a "signal" for the Wait.on - ie the output
> is "complete for window" only after all data has been uploaded, which is
> what I need. And that does seem to work for STREAMING_LOADS.
>
> I suspect the reason that this does not work for FILE_LOADS is that method
> BatchLoads.writeResult returns a WriteResult that wraps an "empty"
> failedInserts collection, ie data which is not connected to the
> batch-load-job that is triggered:
>   private WriteResult writeResult(Pipeline p) {
>     PCollection<TableRow> empty =
>         p.apply("CreateEmptyFailedInserts",
> Create.empty(TypeDescriptor.of(TableRow.class)));
>     return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>   }
>
> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once
> a job is submitted the code repeatedly polls the job status until it
> reaches DONE or FAILED. However that information does not appear to be
> exposed anywhere (unlike streaming which effectively exposes
> completion-state via the failedInserts stream).
>
> If I have misunderstood something, corrections welcome! If not,
> suggestions for workarounds or alternate solutions are also welcome :-)
>
> Thanks,
> Simon
>
>