You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Chaim Turkel <ch...@behalf.com> on 2017/09/07 12:10:32 UTC

BigQueryIO withSchemaFromView

Hi,
  I have a pipline that bases on documents from mongo updates the
schema and then adds the records to mongo. Since i want a partitioned
table, i have a dally window.
How do i get the schema view to be a window, i get the exception of:

Attempted to get side input window for GlobalWindow from non-global WindowFn"

chaim

Re: BigQueryIO withSchemaFromView

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
To expand on that: the batch runner's work scheduling and ordering does not
depend on windowing *at all. *It would be not too much of an exaggeration
to say that it treats windows simply as another grouping key - because it
assumes that input data is completely out of order, so optimizing for
latency and delivering results for earlier windows earlier is impossible or
at least not required - instead it processes the full data as a batch.

On Sun, Sep 10, 2017 at 8:32 AM Reuven Lax <re...@google.com.invalid> wrote:

> In that case I can say unequivocally that Dataflow (in batch mode) does not
> produce results for a stage until it has processed that entire stage. The
> reason for this is that the batch runner is optimized for throughput, not
> latency; it wants to minimize the time for the entire job to finish, not
> the time till first output. The side input will not be materialized until
> all of the data for all of the windows of the side input have been
> processed. The streaming runner on the other hand will produce windows as
> they finish. So for the batch runner, there is no performance advantage you
> get for windowing the side input.
>
> The fact that BigQueryIO needs the schema side input to be globally
> windowed is a bit confusing and not well documented. We should add better
> javadoc explaining this.
>
> Reuven
>
> On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
> > batch on dataflow
> >
> > On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <re...@google.com.invalid>
> > wrote:
> > > Which runner are you using? And is this a batch pipeline?
> > >
> > > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com>
> wrote:
> > >
> > >> Thank for the answer, but i don't think that that is the case. From
> > >> what i have seen, since i have other code to update status based on
> > >> the window, it does get called before all the windows are calculated.
> > >> There is no logical reason to wait, once the window has finished, the
> > >> rest of the pipeline should run and the BigQuery should start to write
> > >> the results.
> > >>
> > >>
> > >>
> > >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <relax@google.com.invalid
> >
> > >> wrote:
> > >> > Logically the BigQuery write does not depend on windows, and writing
> > it
> > >> > windowed would result in incorrect output. For this reason,
> BigQueryIO
> > >> > rewindows int global windows before actually writing to BigQuery.
> > >> >
> > >> > If you are running in batch mode, there is no performance difference
> > >> > between windowed and unwindowed side inputs. I believe that all of
> the
> > >> > batch runners wait until all windows are calculated before
> > materializing
> > >> > the output.
> > >> >
> > >> > Reuven
> > >> >
> > >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com>
> > wrote:
> > >> >
> > >> >> the schema depends on the data per window.
> > >> >> when i added the global window it works, but then i loose the
> > >> >> performance, since the secound stage of writing will begin only
> after
> > >> >> the side input has read all the data and updated the schema
> > >> >> The batchmode of the BigqueryIO seems to use a global window that i
> > >> >> don't know why?
> > >> >>
> > >> >> chaim
> > >> >>
> > >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> > >> >> <ki...@google.com.invalid> wrote:
> > >> >> > Are your schemas actually supposed to be different between
> > different
> > >> >> > windows, or do they depend only on data?
> > >> >> > I see you have a commented-out Window.into(new GlobalWindows())
> for
> > >> your
> > >> >> > side input - did that work when it wasn't commented out?
> > >> >> >
> > >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com>
> > wrote:
> > >> >> >
> > >> >> >> my code is:
> > >> >> >>
> > >> >> >>                     //read docs from mongo
> > >> >> >>                     final PCollection<Document> docs = pipeline
> > >> >> >>                             .apply(table.getTableName(),
> > >> >> MongoDbIO.read()
> > >> >> >>                                     .withUri("mongodb://" +
> > >> >> >> connectionParams)
> > >> >> >>                                     .withFilter(filter)
> > >> >> >>                                     .withDatabase(options.
> > >> getDBName())
> > >> >> >>                                     .withCollection(table.
> > >> >> getTableName()))
> > >> >> >>                             .apply("AddEventTimestamps",
> > >> >> >> WithTimestamps.of((Document doc) -> new
> > >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> > >> >> >>                             .apply("Window Daily",
> > >> >> >> Window.into(CalendarWindows.days(1)));
> > >> >> >>
> > >> >> >>                     //update bq schema based on window
> > >> >> >>                     final PCollectionView<Map<String, String>>
> > >> >> >> tableSchemas = docs
> > >> >> >> //                            .apply("Global
> > Window",Window.into(new
> > >> >> >> GlobalWindows()))
> > >> >> >>                             .apply("extract schema " +
> > >> >> >> table.getTableName(), new
> > >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> > >> >> >>                             .apply("getTableSchemaMemory " +
> > >> >> >> table.getTableName(),
> > >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> > >> >> >>                             .apply(View.asMap());
> > >> >> >>
> > >> >> >>                     final PCollection<TableRow> docsRows = docs
> > >> >> >>                             .apply("doc to row " +
> > >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> > getBqTableName(),
> > >> >> >> tableSchemas))
> > >> >> >>
>  .withSideInputs(tableSchemas))
> > ;
> > >> >> >>
> > >> >> >>                     final WriteResult apply = docsRows
> > >> >> >>                             .apply("insert data table - " +
> > >> >> >> table.getTableName(),
> > >> >> >>                                     BigQueryIO.writeTableRows()
> > >> >> >>
> > >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> > >> >> >> options.getDatasetId(), table.getBqTableName()))
> > >> >> >>
> > >> >> >> .withSchemaFromView(tableSchemas)
> > >> >> >>
> > >> >> >> .withCreateDisposition(BigQueryIO.Write.
> > CreateDisposition.CREATE_IF_
> > >> >> NEEDED)
> > >> >> >>
> > >> >> >> .withWriteDisposition(WRITE_APPEND));
> > >> >> >>
> > >> >> >>
> > >> >> >> exception is:
> > >> >> >>
> > >> >> >> Sep 08, 2017 12:16:55 PM
> > >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> > >> >> >> INFO: Opening TableRowWriter to
> > >> >> >>
> > >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> > >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> > d9a12e4fdcfb.
> > >> >> >> Exception in thread "main"
> > >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> > >> >> >> java.lang.IllegalArgumentException: Attempted to get side input
> > >> window
> > >> >> >> for GlobalWindow from non-global WindowFn
> > >> >> >> at
> > >> >> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> > >> >> waitUntilFinish(DirectRunner.java:331)
> > >> >> >> at
> > >> >> >>
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> > >> >> waitUntilFinish(DirectRunner.java:301)
> > >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> > >> >> DirectRunner.java:200)
> > >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> > >> >> DirectRunner.java:63)
> > >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> > >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> > >> >> >> at
> > >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> > >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> > >> >> >> at
> > >> >> >> com.behalf.migration.dataflow.mongodb.
> > LoadMongodbDataPipeline.main(
> > >> >> LoadMongodbDataPipeline.java:372)
> > >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get
> > side
> > >> >> >> input window for GlobalWindow from non-global WindowFn
> > >> >> >> at
> > >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> > >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> > >> >> >> at
> > >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> > >> >> SimplePushbackSideInputDoFnRunner.isReady(
> > >> SimplePushbackSideInputDoFnRun
> > >> >> ner.java:94)
> > >> >> >> at
> > >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> > >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> > >> >> SimplePushbackSideInputDoFnRunner.java:76)
> > >> >> >> Sep 08, 2017 12:16:58 PM
> > >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> > >> >> >>
> > >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> > >> >> >> <ki...@google.com.invalid> wrote:
> > >> >> >> > Please include the full exception and please show the code
> that
> > >> >> produces
> > >> >> >> it.
> > >> >> >> > See also
> > >> >> >> >
> > >> >> >> https://beam.apache.org/documentation/programming-
> > >> >> guide/#transforms-sideio
> > >> >> >> > section
> > >> >> >> > "Side inputs and windowing" - that might be sufficient to
> > resolve
> > >> your
> > >> >> >> > problem.
> > >> >> >> >
> > >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <chaim@behalf.com
> >
> > >> wrote:
> > >> >> >> >
> > >> >> >> >> Hi,
> > >> >> >> >>   I have a pipline that bases on documents from mongo updates
> > the
> > >> >> >> >> schema and then adds the records to mongo. Since i want a
> > >> partitioned
> > >> >> >> >> table, i have a dally window.
> > >> >> >> >> How do i get the schema view to be a window, i get the
> > exception
> > >> of:
> > >> >> >> >>
> > >> >> >> >> Attempted to get side input window for GlobalWindow from
> > >> non-global
> > >> >> >> >> WindowFn"
> > >> >> >> >>
> > >> >> >> >> chaim
> > >> >> >> >>
> > >> >> >>
> > >> >>
> > >>
> >
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
any idea how i can debug it or find the issue?

On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid> wrote:
> Ok, something is going wrong then. It appears that your job created over
> 14,000 BigQuery load jobs, which is not expected (and probably why things
> were so slow).
>
> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> no that specific job created only 2 tables
>>
>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > It looks like your job is creating about 14,45 distinct BigQuery tables.
>> > Does that sound correct to you?
>> >
>> > Reuven
>> >
>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >> as you can see the majority of the time is inserting into bigquery.
>> >> is there any way to parallel this?
>> >>
>> >> My feeling for the windowing is that writing should be done per window
>> >> (my window is daily) or at least to be able to configure it
>> >>
>> >> chaim
>> >>
>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid>
>> >> wrote:
>> >> > So the problem is you are running on Dataflow, and it's taking longer
>> >> than
>> >> > you think it should? If you provide the Dataflow job id we can help
>> you
>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> into a
>> >> > Dataflow debugging session we should move it off of the Beam list and
>> >> onto
>> >> > a Dataflow-specific tread)
>> >> >
>> >> > Reuven
>> >> >
>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >
>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >> >> minutes, while it should be around 15 minutes.
>> >> >> Do i need to chunk the code myself to windows, and run in parallel?
>> >> >> chaim
>> >> >>
>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <relax@google.com.invalid
>> >
>> >> >> wrote:
>> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
>> >> does
>> >> >> not
>> >> >> > produce results for a stage until it has processed that entire
>> stage.
>> >> The
>> >> >> > reason for this is that the batch runner is optimized for
>> throughput,
>> >> not
>> >> >> > latency; it wants to minimize the time for the entire job to
>> finish,
>> >> not
>> >> >> > the time till first output. The side input will not be materialized
>> >> until
>> >> >> > all of the data for all of the windows of the side input have been
>> >> >> > processed. The streaming runner on the other hand will produce
>> >> windows as
>> >> >> > they finish. So for the batch runner, there is no performance
>> >> advantage
>> >> >> you
>> >> >> > get for windowing the side input.
>> >> >> >
>> >> >> > The fact that BigQueryIO needs the schema side input to be globally
>> >> >> > windowed is a bit confusing and not well documented. We should add
>> >> better
>> >> >> > javadoc explaining this.
>> >> >> >
>> >> >> > Reuven
>> >> >> >
>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >> >
>> >> >> >> batch on dataflow
>> >> >> >>
>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> <relax@google.com.invalid
>> >> >
>> >> >> >> wrote:
>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >> >> >> >
>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <chaim@behalf.com
>> >
>> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> Thank for the answer, but i don't think that that is the case.
>> >> From
>> >> >> >> >> what i have seen, since i have other code to update status
>> based
>> >> on
>> >> >> >> >> the window, it does get called before all the windows are
>> >> calculated.
>> >> >> >> >> There is no logical reason to wait, once the window has
>> finished,
>> >> the
>> >> >> >> >> rest of the pipeline should run and the BigQuery should start
>> to
>> >> >> write
>> >> >> >> >> the results.
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> >> <relax@google.com.invalid
>> >> >> >
>> >> >> >> >> wrote:
>> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
>> >> >> writing
>> >> >> >> it
>> >> >> >> >> > windowed would result in incorrect output. For this reason,
>> >> >> BigQueryIO
>> >> >> >> >> > rewindows int global windows before actually writing to
>> >> BigQuery.
>> >> >> >> >> >
>> >> >> >> >> > If you are running in batch mode, there is no performance
>> >> >> difference
>> >> >> >> >> > between windowed and unwindowed side inputs. I believe that
>> all
>> >> of
>> >> >> the
>> >> >> >> >> > batch runners wait until all windows are calculated before
>> >> >> >> materializing
>> >> >> >> >> > the output.
>> >> >> >> >> >
>> >> >> >> >> > Reuven
>> >> >> >> >> >
>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>> chaim@behalf.com
>> >> >
>> >> >> >> wrote:
>> >> >> >> >> >
>> >> >> >> >> >> the schema depends on the data per window.
>> >> >> >> >> >> when i added the global window it works, but then i loose
>> the
>> >> >> >> >> >> performance, since the secound stage of writing will begin
>> only
>> >> >> after
>> >> >> >> >> >> the side input has read all the data and updated the schema
>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
>> >> that
>> >> >> i
>> >> >> >> >> >> don't know why?
>> >> >> >> >> >>
>> >> >> >> >> >> chaim
>> >> >> >> >> >>
>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> >> > Are your schemas actually supposed to be different between
>> >> >> >> different
>> >> >> >> >> >> > windows, or do they depend only on data?
>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>> >> GlobalWindows())
>> >> >> for
>> >> >> >> >> your
>> >> >> >> >> >> > side input - did that work when it wasn't commented out?
>> >> >> >> >> >> >
>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> >> chaim@behalf.com>
>> >> >> >> wrote:
>> >> >> >> >> >> >
>> >> >> >> >> >> >> my code is:
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     //read docs from mongo
>> >> >> >> >> >> >>                     final PCollection<Document> docs =
>> >> pipeline
>> >> >> >> >> >> >>                             .apply(table.getTableName(),
>> >> >> >> >> >> MongoDbIO.read()
>> >> >> >> >> >> >>
>>  .withUri("mongodb://" +
>> >> >> >> >> >> >> connectionParams)
>> >> >> >> >> >> >>                                     .withFilter(filter)
>> >> >> >> >> >> >>
>>  .withDatabase(options.
>> >> >> >> >> getDBName())
>> >> >> >> >> >> >>
>>  .withCollection(table.
>> >> >> >> >> >> getTableName()))
>> >> >> >> >> >> >>                             .apply("AddEventTimestamps",
>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >> >> >> >> >>                             .apply("Window Daily",
>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     //update bq schema based on window
>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>> >> String>>
>> >> >> >> >> >> >> tableSchemas = docs
>> >> >> >> >> >> >> //                            .apply("Global
>> >> >> >> Window",Window.into(new
>> >> >> >> >> >> >> GlobalWindows()))
>> >> >> >> >> >> >>                             .apply("extract schema " +
>> >> >> >> >> >> >> table.getTableName(), new
>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >> >> >> >> >>                             .apply("getTableSchemaMemory
>> " +
>> >> >> >> >> >> >> table.getTableName(),
>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >> >> >> >> >>                             .apply(View.asMap());
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     final PCollection<TableRow> docsRows
>> =
>> >> docs
>> >> >> >> >> >> >>                             .apply("doc to row " +
>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >> >> >> getBqTableName(),
>> >> >> >> >> >> >> tableSchemas))
>> >> >> >> >> >> >>
>> >> >>  .withSideInputs(tableSchemas))
>> >> >> >> ;
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     final WriteResult apply = docsRows
>> >> >> >> >> >> >>                             .apply("insert data table -
>> " +
>> >> >> >> >> >> >> table.getTableName(),
>> >> >> >> >> >> >>
>> >>  BigQueryIO.writeTableRows()
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >> >> >> CreateDisposition.CREATE_IF_
>> >> >> >> >> >> NEEDED)
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> exception is:
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> <init>
>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>> >> >> >> d9a12e4fdcfb.
>> >> >> >> >> >> >> Exception in thread "main"
>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>> side
>> >> >> input
>> >> >> >> >> window
>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> >> DirectPipelineResult.
>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> >> DirectPipelineResult.
>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> >> >> DirectRunner.java:200)
>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> >> >> DirectRunner.java:63)
>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> LoadMongodbDataPipeline.
>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> >> >> LoadMongodbDataPipeline.main(
>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>> Attempted to
>> >> >> get
>> >> >> >> side
>> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >> >> PartitioningWindowFn$1.
>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >> >> >> >> SimplePushbackSideInputDoFnRun
>> >> >> >> >> >> ner.java:94)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> >> processElementInReadyWindows(
>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> <init>
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> >> >> > Please include the full exception and please show the
>> code
>> >> >> that
>> >> >> >> >> >> produces
>> >> >> >> >> >> >> it.
>> >> >> >> >> >> >> > See also
>> >> >> >> >> >> >> >
>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >> >> >> >> >> guide/#transforms-sideio
>> >> >> >> >> >> >> > section
>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient
>> to
>> >> >> >> resolve
>> >> >> >> >> your
>> >> >> >> >> >> >> > problem.
>> >> >> >> >> >> >> >
>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >> >> chaim@behalf.com>
>> >> >> >> >> wrote:
>> >> >> >> >> >> >> >
>> >> >> >> >> >> >> >> Hi,
>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
>> >> >> updates
>> >> >> >> the
>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>> want a
>> >> >> >> >> partitioned
>> >> >> >> >> >> >> >> table, i have a dally window.
>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
>> >> >> >> exception
>> >> >> >> >> of:
>> >> >> >> >> >> >> >>
>> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow
>> from
>> >> >> >> >> non-global
>> >> >> >> >> >> >> >> WindowFn"
>> >> >> >> >> >> >> >>
>> >> >> >> >> >> >> >> chaim
>> >> >> >> >> >> >> >>
>> >> >> >> >> >> >>
>> >> >> >> >> >>
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
just went over the changes for the streaming method.
That looks great.
How about adding the option to continue the apply after success with
statistics or something like in the failure

On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <re...@google.com.invalid> wrote:
> Ah, so you are loading each window into a separate BigQuery table? That
> might be the reason things are slow. Remembert a batch job doesn't return
> until everything finishes, and if you are loading that many tables it's
> entirely possible that BigQuery will throttle you, causing the slowdown.
>
> A couple of options:
>
> 1. Instead of loading into separate BigQuery tables, you could load into
> separate partitions of the same table. See this page for more info:
> https://cloud.google.com/bigquery/docs/partitioned-tables
>
> 2. If you have a streaming unbounded source for your data, you can run
> using a streaming runner. That will load each window as it becomes
> available instead of waiting for everything to load.
>
> Reuven
>
> On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> from what i found it I have the windowing with bigquery partition (per
>> day - 1545 partitions) the insert can take 5 hours, where if there is
>> no partitions then it takes about 12 minutes
>>
>> I have 13,843,080 recrods 6.76 GB.
>> Any ideas how to get the partition to work faster.
>>
>> Is there a way to get the BigQueryIO to use streaming and not jobs?
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com> wrote:
>> > i am using windowing for the partion of the table, maybe that has to do
>> with it?
>> >
>> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> >> Ok, something is going wrong then. It appears that your job created over
>> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> things
>> >> were so slow).
>> >>
>> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >>
>> >>> no that specific job created only 2 tables
>> >>>
>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
>> >>> wrote:
>> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> tables.
>> >>> > Does that sound correct to you?
>> >>> >
>> >>> > Reuven
>> >>> >
>> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >>> >
>> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >>> >> as you can see the majority of the time is inserting into bigquery.
>> >>> >> is there any way to parallel this?
>> >>> >>
>> >>> >> My feeling for the windowing is that writing should be done per
>> window
>> >>> >> (my window is daily) or at least to be able to configure it
>> >>> >>
>> >>> >> chaim
>> >>> >>
>> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> <re...@google.com.invalid>
>> >>> >> wrote:
>> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> longer
>> >>> >> than
>> >>> >> > you think it should? If you provide the Dataflow job id we can
>> help
>> >>> you
>> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> >>> into a
>> >>> >> > Dataflow debugging session we should move it off of the Beam list
>> and
>> >>> >> onto
>> >>> >> > a Dataflow-specific tread)
>> >>> >> >
>> >>> >> > Reuven
>> >>> >> >
>> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
>> >>> wrote:
>> >>> >> >
>> >>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >>> >> >> minutes, while it should be around 15 minutes.
>> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> parallel?
>> >>> >> >> chaim
>> >>> >> >>
>> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> <relax@google.com.invalid
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> mode)
>> >>> >> does
>> >>> >> >> not
>> >>> >> >> > produce results for a stage until it has processed that entire
>> >>> stage.
>> >>> >> The
>> >>> >> >> > reason for this is that the batch runner is optimized for
>> >>> throughput,
>> >>> >> not
>> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >>> finish,
>> >>> >> not
>> >>> >> >> > the time till first output. The side input will not be
>> materialized
>> >>> >> until
>> >>> >> >> > all of the data for all of the windows of the side input have
>> been
>> >>> >> >> > processed. The streaming runner on the other hand will produce
>> >>> >> windows as
>> >>> >> >> > they finish. So for the batch runner, there is no performance
>> >>> >> advantage
>> >>> >> >> you
>> >>> >> >> > get for windowing the side input.
>> >>> >> >> >
>> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> globally
>> >>> >> >> > windowed is a bit confusing and not well documented. We should
>> add
>> >>> >> better
>> >>> >> >> > javadoc explaining this.
>> >>> >> >> >
>> >>> >> >> > Reuven
>> >>> >> >> >
>> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
>> chaim@behalf.com>
>> >>> >> wrote:
>> >>> >> >> >
>> >>> >> >> >> batch on dataflow
>> >>> >> >> >>
>> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> >>> <relax@google.com.invalid
>> >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >>> >> >> >> >
>> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
>> chaim@behalf.com
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> >> >
>> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
>> case.
>> >>> >> From
>> >>> >> >> >> >> what i have seen, since i have other code to update status
>> >>> based
>> >>> >> on
>> >>> >> >> >> >> the window, it does get called before all the windows are
>> >>> >> calculated.
>> >>> >> >> >> >> There is no logical reason to wait, once the window has
>> >>> finished,
>> >>> >> the
>> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
>> start
>> >>> to
>> >>> >> >> write
>> >>> >> >> >> >> the results.
>> >>> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> >>> >> <relax@google.com.invalid
>> >>> >> >> >
>> >>> >> >> >> >> wrote:
>> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows,
>> and
>> >>> >> >> writing
>> >>> >> >> >> it
>> >>> >> >> >> >> > windowed would result in incorrect output. For this
>> reason,
>> >>> >> >> BigQueryIO
>> >>> >> >> >> >> > rewindows int global windows before actually writing to
>> >>> >> BigQuery.
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > If you are running in batch mode, there is no performance
>> >>> >> >> difference
>> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
>> that
>> >>> all
>> >>> >> of
>> >>> >> >> the
>> >>> >> >> >> >> > batch runners wait until all windows are calculated
>> before
>> >>> >> >> >> materializing
>> >>> >> >> >> >> > the output.
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > Reuven
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>> >>> chaim@behalf.com
>> >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> >> >
>> >>> >> >> >> >> >> the schema depends on the data per window.
>> >>> >> >> >> >> >> when i added the global window it works, but then i
>> loose
>> >>> the
>> >>> >> >> >> >> >> performance, since the secound stage of writing will
>> begin
>> >>> only
>> >>> >> >> after
>> >>> >> >> >> >> >> the side input has read all the data and updated the
>> schema
>> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
>> window
>> >>> >> that
>> >>> >> >> i
>> >>> >> >> >> >> >> don't know why?
>> >>> >> >> >> >> >>
>> >>> >> >> >> >> >> chaim
>> >>> >> >> >> >> >>
>> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
>> between
>> >>> >> >> >> different
>> >>> >> >> >> >> >> > windows, or do they depend only on data?
>> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>> >>> >> GlobalWindows())
>> >>> >> >> for
>> >>> >> >> >> >> your
>> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
>> out?
>> >>> >> >> >> >> >> >
>> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> >>> >> chaim@behalf.com>
>> >>> >> >> >> wrote:
>> >>> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> my code is:
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     //read docs from mongo
>> >>> >> >> >> >> >> >>                     final PCollection<Document> docs
>> =
>> >>> >> pipeline
>> >>> >> >> >> >> >> >>
>>  .apply(table.getTableName(),
>> >>> >> >> >> >> >> MongoDbIO.read()
>> >>> >> >> >> >> >> >>
>> >>>  .withUri("mongodb://" +
>> >>> >> >> >> >> >> >> connectionParams)
>> >>> >> >> >> >> >> >>
>>  .withFilter(filter)
>> >>> >> >> >> >> >> >>
>> >>>  .withDatabase(options.
>> >>> >> >> >> >> getDBName())
>> >>> >> >> >> >> >> >>
>> >>>  .withCollection(table.
>> >>> >> >> >> >> >> getTableName()))
>> >>> >> >> >> >> >> >>
>>  .apply("AddEventTimestamps",
>> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >>> >> >> >> >> >> >>                             .apply("Window Daily",
>> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     //update bq schema based on
>> window
>> >>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>> >>> >> String>>
>> >>> >> >> >> >> >> >> tableSchemas = docs
>> >>> >> >> >> >> >> >> //                            .apply("Global
>> >>> >> >> >> Window",Window.into(new
>> >>> >> >> >> >> >> >> GlobalWindows()))
>> >>> >> >> >> >> >> >>                             .apply("extract schema "
>> +
>> >>> >> >> >> >> >> >> table.getTableName(), new
>> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
>> DocsToSchemaTransform(table))
>> >>> >> >> >> >> >> >>
>>  .apply("getTableSchemaMemory
>> >>> " +
>> >>> >> >> >> >> >> >> table.getTableName(),
>> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
>> table.getTableName())))
>> >>> >> >> >> >> >> >>                             .apply(View.asMap());
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
>> docsRows
>> >>> =
>> >>> >> docs
>> >>> >> >> >> >> >> >>                             .apply("doc to row " +
>> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >>> >> >> >> getBqTableName(),
>> >>> >> >> >> >> >> >> tableSchemas))
>> >>> >> >> >> >> >> >>
>> >>> >> >>  .withSideInputs(tableSchemas))
>> >>> >> >> >> ;
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     final WriteResult apply =
>> docsRows
>> >>> >> >> >> >> >> >>                             .apply("insert data
>> table -
>> >>> " +
>> >>> >> >> >> >> >> >> table.getTableName(),
>> >>> >> >> >> >> >> >>
>> >>> >>  BigQueryIO.writeTableRows()
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >>> >> >> >> CreateDisposition.CREATE_IF_
>> >>> >> >> >> >> >> NEEDED)
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> exception is:
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> >>> <init>
>> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
>> 0d/cb3f0aef-9aeb-47ac-93dc-
>> >>> >> >> >> d9a12e4fdcfb.
>> >>> >> >> >> >> >> >> Exception in thread "main"
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
>> PipelineExecutionException:
>> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>> >>> side
>> >>> >> >> input
>> >>> >> >> >> >> window
>> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >>> >> >> DirectPipelineResult.
>> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >>> >> >> DirectPipelineResult.
>> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >>> >> >> >> >> >> DirectRunner.java:200)
>> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >>> >> >> >> >> >> DirectRunner.java:63)
>> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> run(Pipeline.java:297)
>> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> run(Pipeline.java:283)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >>> >> LoadMongodbDataPipeline.
>> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >>> >> >> >> LoadMongodbDataPipeline.main(
>> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>> >>> Attempted to
>> >>> >> >> get
>> >>> >> >> >> side
>> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
>> WindowFn
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >>> >> >> PartitioningWindowFn$1.
>> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
>> core.
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
>> >>> >> >> >> >> >> ner.java:94)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
>> core.
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> >>> >> processElementInReadyWindows(
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> >>> <init>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >>> >> >> >> >> >> >> > Please include the full exception and please show
>> the
>> >>> code
>> >>> >> >> that
>> >>> >> >> >> >> >> produces
>> >>> >> >> >> >> >> >> it.
>> >>> >> >> >> >> >> >> > See also
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >>> >> >> >> >> >> guide/#transforms-sideio
>> >>> >> >> >> >> >> >> > section
>> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
>> sufficient
>> >>> to
>> >>> >> >> >> resolve
>> >>> >> >> >> >> your
>> >>> >> >> >> >> >> >> > problem.
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >>> >> >> chaim@behalf.com>
>> >>> >> >> >> >> wrote:
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> >> Hi,
>> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
>> mongo
>> >>> >> >> updates
>> >>> >> >> >> the
>> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>> >>> want a
>> >>> >> >> >> >> partitioned
>> >>> >> >> >> >> >> >> >> table, i have a dally window.
>> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
>> get the
>> >>> >> >> >> exception
>> >>> >> >> >> >> of:
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> >> Attempted to get side input window for
>> GlobalWindow
>> >>> from
>> >>> >> >> >> >> non-global
>> >>> >> >> >> >> >> >> >> WindowFn"
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> >> chaim
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >>
>> >>> >> >>
>> >>> >>
>> >>>
>>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
.apply("insert data table - " + table.getTableName(),
        BigQueryIO.writeTableRows()
                .to(TableRefPartition.perDay(options.getBQProject(),
options.getDatasetId(), table.getBqTableName()))
                .withSchemaFromView(tableSchemas)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)


public class TableRefPartition implements
SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>
{

    private final String projectId;
    private final String datasetId;
    private final String pattern;
    private final String table;

    public static TableRefPartition perDay(String projectId, String
datasetId, String tablePrefix) {
        return new TableRefPartition(projectId, datasetId, "yyyyMMdd",
tablePrefix + "$");
    }

    private TableRefPartition(String projectId, String datasetId,
String pattern, String table) {
        this.projectId = projectId;
        this.datasetId = datasetId;
        this.pattern = pattern;
        this.table = table;
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter partition =
DateTimeFormat.forPattern(pattern).withZoneUTC();

        TableReference reference = new TableReference();
        reference.setProjectId(this.projectId);
        reference.setDatasetId(this.datasetId);

        reference.setTableId(table +
input.getWindow().maxTimestamp().toString(partition));
        return new TableDestination(reference, null);
    }
}

On Wed, Sep 13, 2017 at 9:40 PM, Reuven Lax <re...@google.com.invalid> wrote:
> Can you show us some of the code you are using? How are you loading into
> separate partitions?
>
> Reuven
>
> On Wed, Sep 13, 2017 at 10:13 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
>>  I am loading into separate partitions of the same table.
>> I want to see it streaming will be faster.
>>
>> Is there a repository where i can use the snapshot version?
>>
>>
>> On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > Ah, so you are loading each window into a separate BigQuery table? That
>> > might be the reason things are slow. Remembert a batch job doesn't return
>> > until everything finishes, and if you are loading that many tables it's
>> > entirely possible that BigQuery will throttle you, causing the slowdown.
>> >
>> > A couple of options:
>> >
>> > 1. Instead of loading into separate BigQuery tables, you could load into
>> > separate partitions of the same table. See this page for more info:
>> > https://cloud.google.com/bigquery/docs/partitioned-tables
>> >
>> > 2. If you have a streaming unbounded source for your data, you can run
>> > using a streaming runner. That will load each window as it becomes
>> > available instead of waiting for everything to load.
>> >
>> > Reuven
>> >
>> > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> from what i found it I have the windowing with bigquery partition (per
>> >> day - 1545 partitions) the insert can take 5 hours, where if there is
>> >> no partitions then it takes about 12 minutes
>> >>
>> >> I have 13,843,080 recrods 6.76 GB.
>> >> Any ideas how to get the partition to work faster.
>> >>
>> >> Is there a way to get the BigQueryIO to use streaming and not jobs?
>> >>
>> >> chaim
>> >>
>> >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> > i am using windowing for the partion of the table, maybe that has to
>> do
>> >> with it?
>> >> >
>> >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <relax@google.com.invalid
>> >
>> >> wrote:
>> >> >> Ok, something is going wrong then. It appears that your job created
>> over
>> >> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> >> things
>> >> >> were so slow).
>> >> >>
>> >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >>
>> >> >>> no that specific job created only 2 tables
>> >> >>>
>> >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax
>> <re...@google.com.invalid>
>> >> >>> wrote:
>> >> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> >> tables.
>> >> >>> > Does that sound correct to you?
>> >> >>> >
>> >> >>> > Reuven
>> >> >>> >
>> >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >>> >
>> >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >> >>> >> as you can see the majority of the time is inserting into
>> bigquery.
>> >> >>> >> is there any way to parallel this?
>> >> >>> >>
>> >> >>> >> My feeling for the windowing is that writing should be done per
>> >> window
>> >> >>> >> (my window is daily) or at least to be able to configure it
>> >> >>> >>
>> >> >>> >> chaim
>> >> >>> >>
>> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> >> <re...@google.com.invalid>
>> >> >>> >> wrote:
>> >> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> >> longer
>> >> >>> >> than
>> >> >>> >> > you think it should? If you provide the Dataflow job id we can
>> >> help
>> >> >>> you
>> >> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this
>> turns
>> >> >>> into a
>> >> >>> >> > Dataflow debugging session we should move it off of the Beam
>> list
>> >> and
>> >> >>> >> onto
>> >> >>> >> > a Dataflow-specific tread)
>> >> >>> >> >
>> >> >>> >> > Reuven
>> >> >>> >> >
>> >> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <
>> chaim@behalf.com>
>> >> >>> wrote:
>> >> >>> >> >
>> >> >>> >> >> is there a way around this, my time for 13gb is not close to
>> 30
>> >> >>> >> >> minutes, while it should be around 15 minutes.
>> >> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> >> parallel?
>> >> >>> >> >> chaim
>> >> >>> >> >>
>> >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> >> <relax@google.com.invalid
>> >> >>> >
>> >> >>> >> >> wrote:
>> >> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> >> mode)
>> >> >>> >> does
>> >> >>> >> >> not
>> >> >>> >> >> > produce results for a stage until it has processed that
>> entire
>> >> >>> stage.
>> >> >>> >> The
>> >> >>> >> >> > reason for this is that the batch runner is optimized for
>> >> >>> throughput,
>> >> >>> >> not
>> >> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >> >>> finish,
>> >> >>> >> not
>> >> >>> >> >> > the time till first output. The side input will not be
>> >> materialized
>> >> >>> >> until
>> >> >>> >> >> > all of the data for all of the windows of the side input
>> have
>> >> been
>> >> >>> >> >> > processed. The streaming runner on the other hand will
>> produce
>> >> >>> >> windows as
>> >> >>> >> >> > they finish. So for the batch runner, there is no
>> performance
>> >> >>> >> advantage
>> >> >>> >> >> you
>> >> >>> >> >> > get for windowing the side input.
>> >> >>> >> >> >
>> >> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> >> globally
>> >> >>> >> >> > windowed is a bit confusing and not well documented. We
>> should
>> >> add
>> >> >>> >> better
>> >> >>> >> >> > javadoc explaining this.
>> >> >>> >> >> >
>> >> >>> >> >> > Reuven
>> >> >>> >> >> >
>> >> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
>> >> chaim@behalf.com>
>> >> >>> >> wrote:
>> >> >>> >> >> >
>> >> >>> >> >> >> batch on dataflow
>> >> >>> >> >> >>
>> >> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> >> >>> <relax@google.com.invalid
>> >> >>> >> >
>> >> >>> >> >> >> wrote:
>> >> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >> >>> >> >> >> >
>> >> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
>> >> chaim@behalf.com
>> >> >>> >
>> >> >>> >> >> wrote:
>> >> >>> >> >> >> >
>> >> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
>> >> case.
>> >> >>> >> From
>> >> >>> >> >> >> >> what i have seen, since i have other code to update
>> status
>> >> >>> based
>> >> >>> >> on
>> >> >>> >> >> >> >> the window, it does get called before all the windows
>> are
>> >> >>> >> calculated.
>> >> >>> >> >> >> >> There is no logical reason to wait, once the window has
>> >> >>> finished,
>> >> >>> >> the
>> >> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
>> >> start
>> >> >>> to
>> >> >>> >> >> write
>> >> >>> >> >> >> >> the results.
>> >> >>> >> >> >> >>
>> >> >>> >> >> >> >>
>> >> >>> >> >> >> >>
>> >> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> >> >>> >> <relax@google.com.invalid
>> >> >>> >> >> >
>> >> >>> >> >> >> >> wrote:
>> >> >>> >> >> >> >> > Logically the BigQuery write does not depend on
>> windows,
>> >> and
>> >> >>> >> >> writing
>> >> >>> >> >> >> it
>> >> >>> >> >> >> >> > windowed would result in incorrect output. For this
>> >> reason,
>> >> >>> >> >> BigQueryIO
>> >> >>> >> >> >> >> > rewindows int global windows before actually writing
>> to
>> >> >>> >> BigQuery.
>> >> >>> >> >> >> >> >
>> >> >>> >> >> >> >> > If you are running in batch mode, there is no
>> performance
>> >> >>> >> >> difference
>> >> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
>> >> that
>> >> >>> all
>> >> >>> >> of
>> >> >>> >> >> the
>> >> >>> >> >> >> >> > batch runners wait until all windows are calculated
>> >> before
>> >> >>> >> >> >> materializing
>> >> >>> >> >> >> >> > the output.
>> >> >>> >> >> >> >> >
>> >> >>> >> >> >> >> > Reuven
>> >> >>> >> >> >> >> >
>> >> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>> >> >>> chaim@behalf.com
>> >> >>> >> >
>> >> >>> >> >> >> wrote:
>> >> >>> >> >> >> >> >
>> >> >>> >> >> >> >> >> the schema depends on the data per window.
>> >> >>> >> >> >> >> >> when i added the global window it works, but then i
>> >> loose
>> >> >>> the
>> >> >>> >> >> >> >> >> performance, since the secound stage of writing will
>> >> begin
>> >> >>> only
>> >> >>> >> >> after
>> >> >>> >> >> >> >> >> the side input has read all the data and updated the
>> >> schema
>> >> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
>> >> window
>> >> >>> >> that
>> >> >>> >> >> i
>> >> >>> >> >> >> >> >> don't know why?
>> >> >>> >> >> >> >> >>
>> >> >>> >> >> >> >> >> chaim
>> >> >>> >> >> >> >> >>
>> >> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
>> >> between
>> >> >>> >> >> >> different
>> >> >>> >> >> >> >> >> > windows, or do they depend only on data?
>> >> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>> >> >>> >> GlobalWindows())
>> >> >>> >> >> for
>> >> >>> >> >> >> >> your
>> >> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
>> >> out?
>> >> >>> >> >> >> >> >> >
>> >> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> >> >>> >> chaim@behalf.com>
>> >> >>> >> >> >> wrote:
>> >> >>> >> >> >> >> >> >
>> >> >>> >> >> >> >> >> >> my code is:
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >>                     //read docs from mongo
>> >> >>> >> >> >> >> >> >>                     final PCollection<Document>
>> docs
>> >> =
>> >> >>> >> pipeline
>> >> >>> >> >> >> >> >> >>
>> >>  .apply(table.getTableName(),
>> >> >>> >> >> >> >> >> MongoDbIO.read()
>> >> >>> >> >> >> >> >> >>
>> >> >>>  .withUri("mongodb://" +
>> >> >>> >> >> >> >> >> >> connectionParams)
>> >> >>> >> >> >> >> >> >>
>> >>  .withFilter(filter)
>> >> >>> >> >> >> >> >> >>
>> >> >>>  .withDatabase(options.
>> >> >>> >> >> >> >> getDBName())
>> >> >>> >> >> >> >> >> >>
>> >> >>>  .withCollection(table.
>> >> >>> >> >> >> >> >> getTableName()))
>> >> >>> >> >> >> >> >> >>
>> >>  .apply("AddEventTimestamps",
>> >> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >>> >> >> >> >> >> >>                             .apply("Window Daily",
>> >> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >>                     //update bq schema based on
>> >> window
>> >> >>> >> >> >> >> >> >>                     final
>> PCollectionView<Map<String,
>> >> >>> >> String>>
>> >> >>> >> >> >> >> >> >> tableSchemas = docs
>> >> >>> >> >> >> >> >> >> //                            .apply("Global
>> >> >>> >> >> >> Window",Window.into(new
>> >> >>> >> >> >> >> >> >> GlobalWindows()))
>> >> >>> >> >> >> >> >> >>                             .apply("extract
>> schema "
>> >> +
>> >> >>> >> >> >> >> >> >> table.getTableName(), new
>> >> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
>> >> DocsToSchemaTransform(table))
>> >> >>> >> >> >> >> >> >>
>> >>  .apply("getTableSchemaMemory
>> >> >>> " +
>> >> >>> >> >> >> >> >> >> table.getTableName(),
>> >> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
>> >> table.getTableName())))
>> >> >>> >> >> >> >> >> >>                             .apply(View.asMap());
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
>> >> docsRows
>> >> >>> =
>> >> >>> >> docs
>> >> >>> >> >> >> >> >> >>                             .apply("doc to row " +
>> >> >>> >> >> >> >> >> >> table.getTableName(),
>> ParDo.of(docToTableRow(table.
>> >> >>> >> >> >> getBqTableName(),
>> >> >>> >> >> >> >> >> >> tableSchemas))
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >>  .withSideInputs(tableSchemas))
>> >> >>> >> >> >> ;
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >>                     final WriteResult apply =
>> >> docsRows
>> >> >>> >> >> >> >> >> >>                             .apply("insert data
>> >> table -
>> >> >>> " +
>> >> >>> >> >> >> >> >> >> table.getTableName(),
>> >> >>> >> >> >> >> >> >>
>> >> >>> >>  BigQueryIO.writeTableRows()
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(
>> options.getBQProject(),
>> >> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >> >>> >> >> >> CreateDisposition.CREATE_IF_
>> >> >>> >> >> >> >> >> NEEDED)
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> exception is:
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.
>> bigquery.TableRowWriter
>> >> >>> <init>
>> >> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
>> >> 0d/cb3f0aef-9aeb-47ac-93dc-
>> >> >>> >> >> >> d9a12e4fdcfb.
>> >> >>> >> >> >> >> >> >> Exception in thread "main"
>> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
>> >> PipelineExecutionException:
>> >> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to
>> get
>> >> >>> side
>> >> >>> >> >> input
>> >> >>> >> >> >> >> window
>> >> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> >>> >> >> DirectPipelineResult.
>> >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> >>> >> >> DirectPipelineResult.
>> >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >> >>> >> >> >> >> >> >> at org.apache.beam.runners.
>> direct.DirectRunner.run(
>> >> >>> >> >> >> >> >> DirectRunner.java:200)
>> >> >>> >> >> >> >> >> >> at org.apache.beam.runners.
>> direct.DirectRunner.run(
>> >> >>> >> >> >> >> >> DirectRunner.java:63)
>> >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> >> run(Pipeline.java:297)
>> >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> >> run(Pipeline.java:283)
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> >>> >> LoadMongodbDataPipeline.
>> >> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> >>> >> >> >> LoadMongodbDataPipeline.main(
>> >> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>> >> >>> Attempted to
>> >> >>> >> >> get
>> >> >>> >> >> >> side
>> >> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
>> >> WindowFn
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >> >>> >> >> PartitioningWindowFn$1.
>> >> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> org.apache.beam.runners.
>> direct.repackaged.runners.
>> >> core.
>> >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
>> >> >>> >> >> >> >> >> ner.java:94)
>> >> >>> >> >> >> >> >> >> at
>> >> >>> >> >> >> >> >> >> org.apache.beam.runners.
>> direct.repackaged.runners.
>> >> core.
>> >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> >> >>> >> processElementInReadyWindows(
>> >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.
>> bigquery.TableRowWriter
>> >> >>> <init>
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >>> >> >> >> >> >> >> > Please include the full exception and please
>> show
>> >> the
>> >> >>> code
>> >> >>> >> >> that
>> >> >>> >> >> >> >> >> produces
>> >> >>> >> >> >> >> >> >> it.
>> >> >>> >> >> >> >> >> >> > See also
>> >> >>> >> >> >> >> >> >> >
>> >> >>> >> >> >> >> >> >> https://beam.apache.org/
>> documentation/programming-
>> >> >>> >> >> >> >> >> guide/#transforms-sideio
>> >> >>> >> >> >> >> >> >> > section
>> >> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
>> >> sufficient
>> >> >>> to
>> >> >>> >> >> >> resolve
>> >> >>> >> >> >> >> your
>> >> >>> >> >> >> >> >> >> > problem.
>> >> >>> >> >> >> >> >> >> >
>> >> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >> >>> >> >> chaim@behalf.com>
>> >> >>> >> >> >> >> wrote:
>> >> >>> >> >> >> >> >> >> >
>> >> >>> >> >> >> >> >> >> >> Hi,
>> >> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
>> >> mongo
>> >> >>> >> >> updates
>> >> >>> >> >> >> the
>> >> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo.
>> Since i
>> >> >>> want a
>> >> >>> >> >> >> >> partitioned
>> >> >>> >> >> >> >> >> >> >> table, i have a dally window.
>> >> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
>> >> get the
>> >> >>> >> >> >> exception
>> >> >>> >> >> >> >> of:
>> >> >>> >> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> >> Attempted to get side input window for
>> >> GlobalWindow
>> >> >>> from
>> >> >>> >> >> >> >> non-global
>> >> >>> >> >> >> >> >> >> >> WindowFn"
>> >> >>> >> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >> >> chaim
>> >> >>> >> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >> >>
>> >> >>> >> >> >> >> >>
>> >> >>> >> >> >> >>
>> >> >>> >> >> >>
>> >> >>> >> >>
>> >> >>> >>
>> >> >>>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
Can you show us some of the code you are using? How are you loading into
separate partitions?

Reuven

On Wed, Sep 13, 2017 at 10:13 AM, Chaim Turkel <ch...@behalf.com> wrote:

>  I am loading into separate partitions of the same table.
> I want to see it streaming will be faster.
>
> Is there a repository where i can use the snapshot version?
>
>
> On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > Ah, so you are loading each window into a separate BigQuery table? That
> > might be the reason things are slow. Remembert a batch job doesn't return
> > until everything finishes, and if you are loading that many tables it's
> > entirely possible that BigQuery will throttle you, causing the slowdown.
> >
> > A couple of options:
> >
> > 1. Instead of loading into separate BigQuery tables, you could load into
> > separate partitions of the same table. See this page for more info:
> > https://cloud.google.com/bigquery/docs/partitioned-tables
> >
> > 2. If you have a streaming unbounded source for your data, you can run
> > using a streaming runner. That will load each window as it becomes
> > available instead of waiting for everything to load.
> >
> > Reuven
> >
> > On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> from what i found it I have the windowing with bigquery partition (per
> >> day - 1545 partitions) the insert can take 5 hours, where if there is
> >> no partitions then it takes about 12 minutes
> >>
> >> I have 13,843,080 recrods 6.76 GB.
> >> Any ideas how to get the partition to work faster.
> >>
> >> Is there a way to get the BigQueryIO to use streaming and not jobs?
> >>
> >> chaim
> >>
> >> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >> > i am using windowing for the partion of the table, maybe that has to
> do
> >> with it?
> >> >
> >> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <relax@google.com.invalid
> >
> >> wrote:
> >> >> Ok, something is going wrong then. It appears that your job created
> over
> >> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> >> things
> >> >> were so slow).
> >> >>
> >> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >>
> >> >>> no that specific job created only 2 tables
> >> >>>
> >> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax
> <re...@google.com.invalid>
> >> >>> wrote:
> >> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> >> tables.
> >> >>> > Does that sound correct to you?
> >> >>> >
> >> >>> > Reuven
> >> >>> >
> >> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
> >> wrote:
> >> >>> >
> >> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >> >>> >> as you can see the majority of the time is inserting into
> bigquery.
> >> >>> >> is there any way to parallel this?
> >> >>> >>
> >> >>> >> My feeling for the windowing is that writing should be done per
> >> window
> >> >>> >> (my window is daily) or at least to be able to configure it
> >> >>> >>
> >> >>> >> chaim
> >> >>> >>
> >> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> >> <re...@google.com.invalid>
> >> >>> >> wrote:
> >> >>> >> > So the problem is you are running on Dataflow, and it's taking
> >> longer
> >> >>> >> than
> >> >>> >> > you think it should? If you provide the Dataflow job id we can
> >> help
> >> >>> you
> >> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this
> turns
> >> >>> into a
> >> >>> >> > Dataflow debugging session we should move it off of the Beam
> list
> >> and
> >> >>> >> onto
> >> >>> >> > a Dataflow-specific tread)
> >> >>> >> >
> >> >>> >> > Reuven
> >> >>> >> >
> >> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <
> chaim@behalf.com>
> >> >>> wrote:
> >> >>> >> >
> >> >>> >> >> is there a way around this, my time for 13gb is not close to
> 30
> >> >>> >> >> minutes, while it should be around 15 minutes.
> >> >>> >> >> Do i need to chunk the code myself to windows, and run in
> >> parallel?
> >> >>> >> >> chaim
> >> >>> >> >>
> >> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
> >> <relax@google.com.invalid
> >> >>> >
> >> >>> >> >> wrote:
> >> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> >> mode)
> >> >>> >> does
> >> >>> >> >> not
> >> >>> >> >> > produce results for a stage until it has processed that
> entire
> >> >>> stage.
> >> >>> >> The
> >> >>> >> >> > reason for this is that the batch runner is optimized for
> >> >>> throughput,
> >> >>> >> not
> >> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >> >>> finish,
> >> >>> >> not
> >> >>> >> >> > the time till first output. The side input will not be
> >> materialized
> >> >>> >> until
> >> >>> >> >> > all of the data for all of the windows of the side input
> have
> >> been
> >> >>> >> >> > processed. The streaming runner on the other hand will
> produce
> >> >>> >> windows as
> >> >>> >> >> > they finish. So for the batch runner, there is no
> performance
> >> >>> >> advantage
> >> >>> >> >> you
> >> >>> >> >> > get for windowing the side input.
> >> >>> >> >> >
> >> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
> >> globally
> >> >>> >> >> > windowed is a bit confusing and not well documented. We
> should
> >> add
> >> >>> >> better
> >> >>> >> >> > javadoc explaining this.
> >> >>> >> >> >
> >> >>> >> >> > Reuven
> >> >>> >> >> >
> >> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
> >> chaim@behalf.com>
> >> >>> >> wrote:
> >> >>> >> >> >
> >> >>> >> >> >> batch on dataflow
> >> >>> >> >> >>
> >> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> >> >>> <relax@google.com.invalid
> >> >>> >> >
> >> >>> >> >> >> wrote:
> >> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >> >>> >> >> >> >
> >> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
> >> chaim@behalf.com
> >> >>> >
> >> >>> >> >> wrote:
> >> >>> >> >> >> >
> >> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
> >> case.
> >> >>> >> From
> >> >>> >> >> >> >> what i have seen, since i have other code to update
> status
> >> >>> based
> >> >>> >> on
> >> >>> >> >> >> >> the window, it does get called before all the windows
> are
> >> >>> >> calculated.
> >> >>> >> >> >> >> There is no logical reason to wait, once the window has
> >> >>> finished,
> >> >>> >> the
> >> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
> >> start
> >> >>> to
> >> >>> >> >> write
> >> >>> >> >> >> >> the results.
> >> >>> >> >> >> >>
> >> >>> >> >> >> >>
> >> >>> >> >> >> >>
> >> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> >> >>> >> <relax@google.com.invalid
> >> >>> >> >> >
> >> >>> >> >> >> >> wrote:
> >> >>> >> >> >> >> > Logically the BigQuery write does not depend on
> windows,
> >> and
> >> >>> >> >> writing
> >> >>> >> >> >> it
> >> >>> >> >> >> >> > windowed would result in incorrect output. For this
> >> reason,
> >> >>> >> >> BigQueryIO
> >> >>> >> >> >> >> > rewindows int global windows before actually writing
> to
> >> >>> >> BigQuery.
> >> >>> >> >> >> >> >
> >> >>> >> >> >> >> > If you are running in batch mode, there is no
> performance
> >> >>> >> >> difference
> >> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
> >> that
> >> >>> all
> >> >>> >> of
> >> >>> >> >> the
> >> >>> >> >> >> >> > batch runners wait until all windows are calculated
> >> before
> >> >>> >> >> >> materializing
> >> >>> >> >> >> >> > the output.
> >> >>> >> >> >> >> >
> >> >>> >> >> >> >> > Reuven
> >> >>> >> >> >> >> >
> >> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
> >> >>> chaim@behalf.com
> >> >>> >> >
> >> >>> >> >> >> wrote:
> >> >>> >> >> >> >> >
> >> >>> >> >> >> >> >> the schema depends on the data per window.
> >> >>> >> >> >> >> >> when i added the global window it works, but then i
> >> loose
> >> >>> the
> >> >>> >> >> >> >> >> performance, since the secound stage of writing will
> >> begin
> >> >>> only
> >> >>> >> >> after
> >> >>> >> >> >> >> >> the side input has read all the data and updated the
> >> schema
> >> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
> >> window
> >> >>> >> that
> >> >>> >> >> i
> >> >>> >> >> >> >> >> don't know why?
> >> >>> >> >> >> >> >>
> >> >>> >> >> >> >> >> chaim
> >> >>> >> >> >> >> >>
> >> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
> >> between
> >> >>> >> >> >> different
> >> >>> >> >> >> >> >> > windows, or do they depend only on data?
> >> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
> >> >>> >> GlobalWindows())
> >> >>> >> >> for
> >> >>> >> >> >> >> your
> >> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
> >> out?
> >> >>> >> >> >> >> >> >
> >> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> >> >>> >> chaim@behalf.com>
> >> >>> >> >> >> wrote:
> >> >>> >> >> >> >> >> >
> >> >>> >> >> >> >> >> >> my code is:
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >>                     //read docs from mongo
> >> >>> >> >> >> >> >> >>                     final PCollection<Document>
> docs
> >> =
> >> >>> >> pipeline
> >> >>> >> >> >> >> >> >>
> >>  .apply(table.getTableName(),
> >> >>> >> >> >> >> >> MongoDbIO.read()
> >> >>> >> >> >> >> >> >>
> >> >>>  .withUri("mongodb://" +
> >> >>> >> >> >> >> >> >> connectionParams)
> >> >>> >> >> >> >> >> >>
> >>  .withFilter(filter)
> >> >>> >> >> >> >> >> >>
> >> >>>  .withDatabase(options.
> >> >>> >> >> >> >> getDBName())
> >> >>> >> >> >> >> >> >>
> >> >>>  .withCollection(table.
> >> >>> >> >> >> >> >> getTableName()))
> >> >>> >> >> >> >> >> >>
> >>  .apply("AddEventTimestamps",
> >> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >>> >> >> >> >> >> >>                             .apply("Window Daily",
> >> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >>                     //update bq schema based on
> >> window
> >> >>> >> >> >> >> >> >>                     final
> PCollectionView<Map<String,
> >> >>> >> String>>
> >> >>> >> >> >> >> >> >> tableSchemas = docs
> >> >>> >> >> >> >> >> >> //                            .apply("Global
> >> >>> >> >> >> Window",Window.into(new
> >> >>> >> >> >> >> >> >> GlobalWindows()))
> >> >>> >> >> >> >> >> >>                             .apply("extract
> schema "
> >> +
> >> >>> >> >> >> >> >> >> table.getTableName(), new
> >> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
> >> DocsToSchemaTransform(table))
> >> >>> >> >> >> >> >> >>
> >>  .apply("getTableSchemaMemory
> >> >>> " +
> >> >>> >> >> >> >> >> >> table.getTableName(),
> >> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
> >> table.getTableName())))
> >> >>> >> >> >> >> >> >>                             .apply(View.asMap());
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
> >> docsRows
> >> >>> =
> >> >>> >> docs
> >> >>> >> >> >> >> >> >>                             .apply("doc to row " +
> >> >>> >> >> >> >> >> >> table.getTableName(),
> ParDo.of(docToTableRow(table.
> >> >>> >> >> >> getBqTableName(),
> >> >>> >> >> >> >> >> >> tableSchemas))
> >> >>> >> >> >> >> >> >>
> >> >>> >> >>  .withSideInputs(tableSchemas))
> >> >>> >> >> >> ;
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >>                     final WriteResult apply =
> >> docsRows
> >> >>> >> >> >> >> >> >>                             .apply("insert data
> >> table -
> >> >>> " +
> >> >>> >> >> >> >> >> >> table.getTableName(),
> >> >>> >> >> >> >> >> >>
> >> >>> >>  BigQueryIO.writeTableRows()
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(
> options.getBQProject(),
> >> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >> >>> >> >> >> CreateDisposition.CREATE_IF_
> >> >>> >> >> >> >> >> NEEDED)
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> exception is:
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.
> bigquery.TableRowWriter
> >> >>> <init>
> >> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
> >> 0d/cb3f0aef-9aeb-47ac-93dc-
> >> >>> >> >> >> d9a12e4fdcfb.
> >> >>> >> >> >> >> >> >> Exception in thread "main"
> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
> >> PipelineExecutionException:
> >> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to
> get
> >> >>> side
> >> >>> >> >> input
> >> >>> >> >> >> >> window
> >> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> >>> >> >> DirectPipelineResult.
> >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> >>> >> >> DirectPipelineResult.
> >> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >>> >> >> >> >> >> >> at org.apache.beam.runners.
> direct.DirectRunner.run(
> >> >>> >> >> >> >> >> DirectRunner.java:200)
> >> >>> >> >> >> >> >> >> at org.apache.beam.runners.
> direct.DirectRunner.run(
> >> >>> >> >> >> >> >> DirectRunner.java:63)
> >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> >> run(Pipeline.java:297)
> >> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> >> run(Pipeline.java:283)
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> >>> >> LoadMongodbDataPipeline.
> >> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> >>> >> >> >> LoadMongodbDataPipeline.main(
> >> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
> >> >>> Attempted to
> >> >>> >> >> get
> >> >>> >> >> >> side
> >> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
> >> WindowFn
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >> >>> >> >> PartitioningWindowFn$1.
> >> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> org.apache.beam.runners.
> direct.repackaged.runners.
> >> core.
> >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
> >> >>> >> >> >> >> >> ner.java:94)
> >> >>> >> >> >> >> >> >> at
> >> >>> >> >> >> >> >> >> org.apache.beam.runners.
> direct.repackaged.runners.
> >> core.
> >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> >> >>> >> processElementInReadyWindows(
> >> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.
> bigquery.TableRowWriter
> >> >>> <init>
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >>> >> >> >> >> >> >> > Please include the full exception and please
> show
> >> the
> >> >>> code
> >> >>> >> >> that
> >> >>> >> >> >> >> >> produces
> >> >>> >> >> >> >> >> >> it.
> >> >>> >> >> >> >> >> >> > See also
> >> >>> >> >> >> >> >> >> >
> >> >>> >> >> >> >> >> >> https://beam.apache.org/
> documentation/programming-
> >> >>> >> >> >> >> >> guide/#transforms-sideio
> >> >>> >> >> >> >> >> >> > section
> >> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
> >> sufficient
> >> >>> to
> >> >>> >> >> >> resolve
> >> >>> >> >> >> >> your
> >> >>> >> >> >> >> >> >> > problem.
> >> >>> >> >> >> >> >> >> >
> >> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >> >>> >> >> chaim@behalf.com>
> >> >>> >> >> >> >> wrote:
> >> >>> >> >> >> >> >> >> >
> >> >>> >> >> >> >> >> >> >> Hi,
> >> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
> >> mongo
> >> >>> >> >> updates
> >> >>> >> >> >> the
> >> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo.
> Since i
> >> >>> want a
> >> >>> >> >> >> >> partitioned
> >> >>> >> >> >> >> >> >> >> table, i have a dally window.
> >> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
> >> get the
> >> >>> >> >> >> exception
> >> >>> >> >> >> >> of:
> >> >>> >> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> >> Attempted to get side input window for
> >> GlobalWindow
> >> >>> from
> >> >>> >> >> >> >> non-global
> >> >>> >> >> >> >> >> >> >> WindowFn"
> >> >>> >> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >> >> chaim
> >> >>> >> >> >> >> >> >> >>
> >> >>> >> >> >> >> >> >>
> >> >>> >> >> >> >> >>
> >> >>> >> >> >> >>
> >> >>> >> >> >>
> >> >>> >> >>
> >> >>> >>
> >> >>>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
 I am loading into separate partitions of the same table.
I want to see it streaming will be faster.

Is there a repository where i can use the snapshot version?


On Wed, Sep 13, 2017 at 7:19 PM, Reuven Lax <re...@google.com.invalid> wrote:
> Ah, so you are loading each window into a separate BigQuery table? That
> might be the reason things are slow. Remembert a batch job doesn't return
> until everything finishes, and if you are loading that many tables it's
> entirely possible that BigQuery will throttle you, causing the slowdown.
>
> A couple of options:
>
> 1. Instead of loading into separate BigQuery tables, you could load into
> separate partitions of the same table. See this page for more info:
> https://cloud.google.com/bigquery/docs/partitioned-tables
>
> 2. If you have a streaming unbounded source for your data, you can run
> using a streaming runner. That will load each window as it becomes
> available instead of waiting for everything to load.
>
> Reuven
>
> On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> from what i found it I have the windowing with bigquery partition (per
>> day - 1545 partitions) the insert can take 5 hours, where if there is
>> no partitions then it takes about 12 minutes
>>
>> I have 13,843,080 recrods 6.76 GB.
>> Any ideas how to get the partition to work faster.
>>
>> Is there a way to get the BigQueryIO to use streaming and not jobs?
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com> wrote:
>> > i am using windowing for the partion of the table, maybe that has to do
>> with it?
>> >
>> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> >> Ok, something is going wrong then. It appears that your job created over
>> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> things
>> >> were so slow).
>> >>
>> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >>
>> >>> no that specific job created only 2 tables
>> >>>
>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
>> >>> wrote:
>> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> tables.
>> >>> > Does that sound correct to you?
>> >>> >
>> >>> > Reuven
>> >>> >
>> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >>> >
>> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >>> >> as you can see the majority of the time is inserting into bigquery.
>> >>> >> is there any way to parallel this?
>> >>> >>
>> >>> >> My feeling for the windowing is that writing should be done per
>> window
>> >>> >> (my window is daily) or at least to be able to configure it
>> >>> >>
>> >>> >> chaim
>> >>> >>
>> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> <re...@google.com.invalid>
>> >>> >> wrote:
>> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> longer
>> >>> >> than
>> >>> >> > you think it should? If you provide the Dataflow job id we can
>> help
>> >>> you
>> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> >>> into a
>> >>> >> > Dataflow debugging session we should move it off of the Beam list
>> and
>> >>> >> onto
>> >>> >> > a Dataflow-specific tread)
>> >>> >> >
>> >>> >> > Reuven
>> >>> >> >
>> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
>> >>> wrote:
>> >>> >> >
>> >>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >>> >> >> minutes, while it should be around 15 minutes.
>> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> parallel?
>> >>> >> >> chaim
>> >>> >> >>
>> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> <relax@google.com.invalid
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> mode)
>> >>> >> does
>> >>> >> >> not
>> >>> >> >> > produce results for a stage until it has processed that entire
>> >>> stage.
>> >>> >> The
>> >>> >> >> > reason for this is that the batch runner is optimized for
>> >>> throughput,
>> >>> >> not
>> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >>> finish,
>> >>> >> not
>> >>> >> >> > the time till first output. The side input will not be
>> materialized
>> >>> >> until
>> >>> >> >> > all of the data for all of the windows of the side input have
>> been
>> >>> >> >> > processed. The streaming runner on the other hand will produce
>> >>> >> windows as
>> >>> >> >> > they finish. So for the batch runner, there is no performance
>> >>> >> advantage
>> >>> >> >> you
>> >>> >> >> > get for windowing the side input.
>> >>> >> >> >
>> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> globally
>> >>> >> >> > windowed is a bit confusing and not well documented. We should
>> add
>> >>> >> better
>> >>> >> >> > javadoc explaining this.
>> >>> >> >> >
>> >>> >> >> > Reuven
>> >>> >> >> >
>> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
>> chaim@behalf.com>
>> >>> >> wrote:
>> >>> >> >> >
>> >>> >> >> >> batch on dataflow
>> >>> >> >> >>
>> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> >>> <relax@google.com.invalid
>> >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >>> >> >> >> >
>> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
>> chaim@behalf.com
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> >> >
>> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
>> case.
>> >>> >> From
>> >>> >> >> >> >> what i have seen, since i have other code to update status
>> >>> based
>> >>> >> on
>> >>> >> >> >> >> the window, it does get called before all the windows are
>> >>> >> calculated.
>> >>> >> >> >> >> There is no logical reason to wait, once the window has
>> >>> finished,
>> >>> >> the
>> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
>> start
>> >>> to
>> >>> >> >> write
>> >>> >> >> >> >> the results.
>> >>> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> >>> >> <relax@google.com.invalid
>> >>> >> >> >
>> >>> >> >> >> >> wrote:
>> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows,
>> and
>> >>> >> >> writing
>> >>> >> >> >> it
>> >>> >> >> >> >> > windowed would result in incorrect output. For this
>> reason,
>> >>> >> >> BigQueryIO
>> >>> >> >> >> >> > rewindows int global windows before actually writing to
>> >>> >> BigQuery.
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > If you are running in batch mode, there is no performance
>> >>> >> >> difference
>> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
>> that
>> >>> all
>> >>> >> of
>> >>> >> >> the
>> >>> >> >> >> >> > batch runners wait until all windows are calculated
>> before
>> >>> >> >> >> materializing
>> >>> >> >> >> >> > the output.
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > Reuven
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>> >>> chaim@behalf.com
>> >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> >> >
>> >>> >> >> >> >> >> the schema depends on the data per window.
>> >>> >> >> >> >> >> when i added the global window it works, but then i
>> loose
>> >>> the
>> >>> >> >> >> >> >> performance, since the secound stage of writing will
>> begin
>> >>> only
>> >>> >> >> after
>> >>> >> >> >> >> >> the side input has read all the data and updated the
>> schema
>> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
>> window
>> >>> >> that
>> >>> >> >> i
>> >>> >> >> >> >> >> don't know why?
>> >>> >> >> >> >> >>
>> >>> >> >> >> >> >> chaim
>> >>> >> >> >> >> >>
>> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
>> between
>> >>> >> >> >> different
>> >>> >> >> >> >> >> > windows, or do they depend only on data?
>> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>> >>> >> GlobalWindows())
>> >>> >> >> for
>> >>> >> >> >> >> your
>> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
>> out?
>> >>> >> >> >> >> >> >
>> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> >>> >> chaim@behalf.com>
>> >>> >> >> >> wrote:
>> >>> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> my code is:
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     //read docs from mongo
>> >>> >> >> >> >> >> >>                     final PCollection<Document> docs
>> =
>> >>> >> pipeline
>> >>> >> >> >> >> >> >>
>>  .apply(table.getTableName(),
>> >>> >> >> >> >> >> MongoDbIO.read()
>> >>> >> >> >> >> >> >>
>> >>>  .withUri("mongodb://" +
>> >>> >> >> >> >> >> >> connectionParams)
>> >>> >> >> >> >> >> >>
>>  .withFilter(filter)
>> >>> >> >> >> >> >> >>
>> >>>  .withDatabase(options.
>> >>> >> >> >> >> getDBName())
>> >>> >> >> >> >> >> >>
>> >>>  .withCollection(table.
>> >>> >> >> >> >> >> getTableName()))
>> >>> >> >> >> >> >> >>
>>  .apply("AddEventTimestamps",
>> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >>> >> >> >> >> >> >>                             .apply("Window Daily",
>> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     //update bq schema based on
>> window
>> >>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>> >>> >> String>>
>> >>> >> >> >> >> >> >> tableSchemas = docs
>> >>> >> >> >> >> >> >> //                            .apply("Global
>> >>> >> >> >> Window",Window.into(new
>> >>> >> >> >> >> >> >> GlobalWindows()))
>> >>> >> >> >> >> >> >>                             .apply("extract schema "
>> +
>> >>> >> >> >> >> >> >> table.getTableName(), new
>> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
>> DocsToSchemaTransform(table))
>> >>> >> >> >> >> >> >>
>>  .apply("getTableSchemaMemory
>> >>> " +
>> >>> >> >> >> >> >> >> table.getTableName(),
>> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
>> table.getTableName())))
>> >>> >> >> >> >> >> >>                             .apply(View.asMap());
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
>> docsRows
>> >>> =
>> >>> >> docs
>> >>> >> >> >> >> >> >>                             .apply("doc to row " +
>> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >>> >> >> >> getBqTableName(),
>> >>> >> >> >> >> >> >> tableSchemas))
>> >>> >> >> >> >> >> >>
>> >>> >> >>  .withSideInputs(tableSchemas))
>> >>> >> >> >> ;
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     final WriteResult apply =
>> docsRows
>> >>> >> >> >> >> >> >>                             .apply("insert data
>> table -
>> >>> " +
>> >>> >> >> >> >> >> >> table.getTableName(),
>> >>> >> >> >> >> >> >>
>> >>> >>  BigQueryIO.writeTableRows()
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >>> >> >> >> CreateDisposition.CREATE_IF_
>> >>> >> >> >> >> >> NEEDED)
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> exception is:
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> >>> <init>
>> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
>> 0d/cb3f0aef-9aeb-47ac-93dc-
>> >>> >> >> >> d9a12e4fdcfb.
>> >>> >> >> >> >> >> >> Exception in thread "main"
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
>> PipelineExecutionException:
>> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>> >>> side
>> >>> >> >> input
>> >>> >> >> >> >> window
>> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >>> >> >> DirectPipelineResult.
>> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >>> >> >> DirectPipelineResult.
>> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >>> >> >> >> >> >> DirectRunner.java:200)
>> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >>> >> >> >> >> >> DirectRunner.java:63)
>> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> run(Pipeline.java:297)
>> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> run(Pipeline.java:283)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >>> >> LoadMongodbDataPipeline.
>> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >>> >> >> >> LoadMongodbDataPipeline.main(
>> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>> >>> Attempted to
>> >>> >> >> get
>> >>> >> >> >> side
>> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
>> WindowFn
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >>> >> >> PartitioningWindowFn$1.
>> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
>> core.
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
>> >>> >> >> >> >> >> ner.java:94)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
>> core.
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> >>> >> processElementInReadyWindows(
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> >>> <init>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >>> >> >> >> >> >> >> > Please include the full exception and please show
>> the
>> >>> code
>> >>> >> >> that
>> >>> >> >> >> >> >> produces
>> >>> >> >> >> >> >> >> it.
>> >>> >> >> >> >> >> >> > See also
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >>> >> >> >> >> >> guide/#transforms-sideio
>> >>> >> >> >> >> >> >> > section
>> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
>> sufficient
>> >>> to
>> >>> >> >> >> resolve
>> >>> >> >> >> >> your
>> >>> >> >> >> >> >> >> > problem.
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >>> >> >> chaim@behalf.com>
>> >>> >> >> >> >> wrote:
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> >> Hi,
>> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
>> mongo
>> >>> >> >> updates
>> >>> >> >> >> the
>> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>> >>> want a
>> >>> >> >> >> >> partitioned
>> >>> >> >> >> >> >> >> >> table, i have a dally window.
>> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
>> get the
>> >>> >> >> >> exception
>> >>> >> >> >> >> of:
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> >> Attempted to get side input window for
>> GlobalWindow
>> >>> from
>> >>> >> >> >> >> non-global
>> >>> >> >> >> >> >> >> >> WindowFn"
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> >> chaim
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >>
>> >>> >> >>
>> >>> >>
>> >>>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
Ah, so you are loading each window into a separate BigQuery table? That
might be the reason things are slow. Remembert a batch job doesn't return
until everything finishes, and if you are loading that many tables it's
entirely possible that BigQuery will throttle you, causing the slowdown.

A couple of options:

1. Instead of loading into separate BigQuery tables, you could load into
separate partitions of the same table. See this page for more info:
https://cloud.google.com/bigquery/docs/partitioned-tables

2. If you have a streaming unbounded source for your data, you can run
using a streaming runner. That will load each window as it becomes
available instead of waiting for everything to load.

Reuven

On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:

> from what i found it I have the windowing with bigquery partition (per
> day - 1545 partitions) the insert can take 5 hours, where if there is
> no partitions then it takes about 12 minutes
>
> I have 13,843,080 recrods 6.76 GB.
> Any ideas how to get the partition to work faster.
>
> Is there a way to get the BigQueryIO to use streaming and not jobs?
>
> chaim
>
> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com> wrote:
> > i am using windowing for the partion of the table, maybe that has to do
> with it?
> >
> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> >> Ok, something is going wrong then. It appears that your job created over
> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> things
> >> were so slow).
> >>
> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
> >>
> >>> no that specific job created only 2 tables
> >>>
> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
> >>> wrote:
> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> tables.
> >>> > Does that sound correct to you?
> >>> >
> >>> > Reuven
> >>> >
> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >>> >
> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >>> >> as you can see the majority of the time is inserting into bigquery.
> >>> >> is there any way to parallel this?
> >>> >>
> >>> >> My feeling for the windowing is that writing should be done per
> window
> >>> >> (my window is daily) or at least to be able to configure it
> >>> >>
> >>> >> chaim
> >>> >>
> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> <re...@google.com.invalid>
> >>> >> wrote:
> >>> >> > So the problem is you are running on Dataflow, and it's taking
> longer
> >>> >> than
> >>> >> > you think it should? If you provide the Dataflow job id we can
> help
> >>> you
> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> >>> into a
> >>> >> > Dataflow debugging session we should move it off of the Beam list
> and
> >>> >> onto
> >>> >> > a Dataflow-specific tread)
> >>> >> >
> >>> >> > Reuven
> >>> >> >
> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
> >>> wrote:
> >>> >> >
> >>> >> >> is there a way around this, my time for 13gb is not close to 30
> >>> >> >> minutes, while it should be around 15 minutes.
> >>> >> >> Do i need to chunk the code myself to windows, and run in
> parallel?
> >>> >> >> chaim
> >>> >> >>
> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
> <relax@google.com.invalid
> >>> >
> >>> >> >> wrote:
> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> mode)
> >>> >> does
> >>> >> >> not
> >>> >> >> > produce results for a stage until it has processed that entire
> >>> stage.
> >>> >> The
> >>> >> >> > reason for this is that the batch runner is optimized for
> >>> throughput,
> >>> >> not
> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >>> finish,
> >>> >> not
> >>> >> >> > the time till first output. The side input will not be
> materialized
> >>> >> until
> >>> >> >> > all of the data for all of the windows of the side input have
> been
> >>> >> >> > processed. The streaming runner on the other hand will produce
> >>> >> windows as
> >>> >> >> > they finish. So for the batch runner, there is no performance
> >>> >> advantage
> >>> >> >> you
> >>> >> >> > get for windowing the side input.
> >>> >> >> >
> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
> globally
> >>> >> >> > windowed is a bit confusing and not well documented. We should
> add
> >>> >> better
> >>> >> >> > javadoc explaining this.
> >>> >> >> >
> >>> >> >> > Reuven
> >>> >> >> >
> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
> chaim@behalf.com>
> >>> >> wrote:
> >>> >> >> >
> >>> >> >> >> batch on dataflow
> >>> >> >> >>
> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> >>> <relax@google.com.invalid
> >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >>> >> >> >> >
> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
> chaim@behalf.com
> >>> >
> >>> >> >> wrote:
> >>> >> >> >> >
> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
> case.
> >>> >> From
> >>> >> >> >> >> what i have seen, since i have other code to update status
> >>> based
> >>> >> on
> >>> >> >> >> >> the window, it does get called before all the windows are
> >>> >> calculated.
> >>> >> >> >> >> There is no logical reason to wait, once the window has
> >>> finished,
> >>> >> the
> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
> start
> >>> to
> >>> >> >> write
> >>> >> >> >> >> the results.
> >>> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> >>> >> <relax@google.com.invalid
> >>> >> >> >
> >>> >> >> >> >> wrote:
> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows,
> and
> >>> >> >> writing
> >>> >> >> >> it
> >>> >> >> >> >> > windowed would result in incorrect output. For this
> reason,
> >>> >> >> BigQueryIO
> >>> >> >> >> >> > rewindows int global windows before actually writing to
> >>> >> BigQuery.
> >>> >> >> >> >> >
> >>> >> >> >> >> > If you are running in batch mode, there is no performance
> >>> >> >> difference
> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
> that
> >>> all
> >>> >> of
> >>> >> >> the
> >>> >> >> >> >> > batch runners wait until all windows are calculated
> before
> >>> >> >> >> materializing
> >>> >> >> >> >> > the output.
> >>> >> >> >> >> >
> >>> >> >> >> >> > Reuven
> >>> >> >> >> >> >
> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
> >>> chaim@behalf.com
> >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> >> >
> >>> >> >> >> >> >> the schema depends on the data per window.
> >>> >> >> >> >> >> when i added the global window it works, but then i
> loose
> >>> the
> >>> >> >> >> >> >> performance, since the secound stage of writing will
> begin
> >>> only
> >>> >> >> after
> >>> >> >> >> >> >> the side input has read all the data and updated the
> schema
> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
> window
> >>> >> that
> >>> >> >> i
> >>> >> >> >> >> >> don't know why?
> >>> >> >> >> >> >>
> >>> >> >> >> >> >> chaim
> >>> >> >> >> >> >>
> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
> between
> >>> >> >> >> different
> >>> >> >> >> >> >> > windows, or do they depend only on data?
> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
> >>> >> GlobalWindows())
> >>> >> >> for
> >>> >> >> >> >> your
> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
> out?
> >>> >> >> >> >> >> >
> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> >>> >> chaim@behalf.com>
> >>> >> >> >> wrote:
> >>> >> >> >> >> >> >
> >>> >> >> >> >> >> >> my code is:
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     //read docs from mongo
> >>> >> >> >> >> >> >>                     final PCollection<Document> docs
> =
> >>> >> pipeline
> >>> >> >> >> >> >> >>
>  .apply(table.getTableName(),
> >>> >> >> >> >> >> MongoDbIO.read()
> >>> >> >> >> >> >> >>
> >>>  .withUri("mongodb://" +
> >>> >> >> >> >> >> >> connectionParams)
> >>> >> >> >> >> >> >>
>  .withFilter(filter)
> >>> >> >> >> >> >> >>
> >>>  .withDatabase(options.
> >>> >> >> >> >> getDBName())
> >>> >> >> >> >> >> >>
> >>>  .withCollection(table.
> >>> >> >> >> >> >> getTableName()))
> >>> >> >> >> >> >> >>
>  .apply("AddEventTimestamps",
> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >>> >> >> >> >> >> >>                             .apply("Window Daily",
> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     //update bq schema based on
> window
> >>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
> >>> >> String>>
> >>> >> >> >> >> >> >> tableSchemas = docs
> >>> >> >> >> >> >> >> //                            .apply("Global
> >>> >> >> >> Window",Window.into(new
> >>> >> >> >> >> >> >> GlobalWindows()))
> >>> >> >> >> >> >> >>                             .apply("extract schema "
> +
> >>> >> >> >> >> >> >> table.getTableName(), new
> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
> DocsToSchemaTransform(table))
> >>> >> >> >> >> >> >>
>  .apply("getTableSchemaMemory
> >>> " +
> >>> >> >> >> >> >> >> table.getTableName(),
> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
> table.getTableName())))
> >>> >> >> >> >> >> >>                             .apply(View.asMap());
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
> docsRows
> >>> =
> >>> >> docs
> >>> >> >> >> >> >> >>                             .apply("doc to row " +
> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >>> >> >> >> getBqTableName(),
> >>> >> >> >> >> >> >> tableSchemas))
> >>> >> >> >> >> >> >>
> >>> >> >>  .withSideInputs(tableSchemas))
> >>> >> >> >> ;
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     final WriteResult apply =
> docsRows
> >>> >> >> >> >> >> >>                             .apply("insert data
> table -
> >>> " +
> >>> >> >> >> >> >> >> table.getTableName(),
> >>> >> >> >> >> >> >>
> >>> >>  BigQueryIO.writeTableRows()
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >>> >> >> >> CreateDisposition.CREATE_IF_
> >>> >> >> >> >> >> NEEDED)
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> exception is:
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> >>> <init>
> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
> 0d/cb3f0aef-9aeb-47ac-93dc-
> >>> >> >> >> d9a12e4fdcfb.
> >>> >> >> >> >> >> >> Exception in thread "main"
> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
> PipelineExecutionException:
> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
> >>> side
> >>> >> >> input
> >>> >> >> >> >> window
> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >>> >> >> DirectPipelineResult.
> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >>> >> >> DirectPipelineResult.
> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >>> >> >> >> >> >> DirectRunner.java:200)
> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >>> >> >> >> >> >> DirectRunner.java:63)
> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> run(Pipeline.java:297)
> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> run(Pipeline.java:283)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >>> >> LoadMongodbDataPipeline.
> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >>> >> >> >> LoadMongodbDataPipeline.main(
> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
> >>> Attempted to
> >>> >> >> get
> >>> >> >> >> side
> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
> WindowFn
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >>> >> >> PartitioningWindowFn$1.
> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
> core.
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
> >>> >> >> >> >> >> ner.java:94)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
> core.
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> >>> >> processElementInReadyWindows(
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> >>> <init>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >>> >> >> >> >> >> >> > Please include the full exception and please show
> the
> >>> code
> >>> >> >> that
> >>> >> >> >> >> >> produces
> >>> >> >> >> >> >> >> it.
> >>> >> >> >> >> >> >> > See also
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
> >>> >> >> >> >> >> guide/#transforms-sideio
> >>> >> >> >> >> >> >> > section
> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
> sufficient
> >>> to
> >>> >> >> >> resolve
> >>> >> >> >> >> your
> >>> >> >> >> >> >> >> > problem.
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >>> >> >> chaim@behalf.com>
> >>> >> >> >> >> wrote:
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> >> Hi,
> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
> mongo
> >>> >> >> updates
> >>> >> >> >> the
> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
> >>> want a
> >>> >> >> >> >> partitioned
> >>> >> >> >> >> >> >> >> table, i have a dally window.
> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
> get the
> >>> >> >> >> exception
> >>> >> >> >> >> of:
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> >> Attempted to get side input window for
> GlobalWindow
> >>> from
> >>> >> >> >> >> non-global
> >>> >> >> >> >> >> >> >> WindowFn"
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> >> chaim
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >>
> >>> >> >>
> >>> >>
> >>>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
just found, that with bigquery you cannot stream to partitions older
than 30 days (so i can't use it anyway to load old data) :(

On Wed, Sep 13, 2017 at 7:08 PM, Lukasz Cwik <lc...@google.com.invalid> wrote:
> Support was added to expose how users want to load their data with
> https://github.com/apache/beam/commit/075d4d45a9cd398f3b4023b6efd495cc58eb9bdd
> It is planned to be released in 2.2.0
>
> On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> from what i found it I have the windowing with bigquery partition (per
>> day - 1545 partitions) the insert can take 5 hours, where if there is
>> no partitions then it takes about 12 minutes
>>
>> I have 13,843,080 recrods 6.76 GB.
>> Any ideas how to get the partition to work faster.
>>
>> Is there a way to get the BigQueryIO to use streaming and not jobs?
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com> wrote:
>> > i am using windowing for the partion of the table, maybe that has to do
>> with it?
>> >
>> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> >> Ok, something is going wrong then. It appears that your job created over
>> >> 14,000 BigQuery load jobs, which is not expected (and probably why
>> things
>> >> were so slow).
>> >>
>> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >>
>> >>> no that specific job created only 2 tables
>> >>>
>> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
>> >>> wrote:
>> >>> > It looks like your job is creating about 14,45 distinct BigQuery
>> tables.
>> >>> > Does that sound correct to you?
>> >>> >
>> >>> > Reuven
>> >>> >
>> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >>> >
>> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >>> >> as you can see the majority of the time is inserting into bigquery.
>> >>> >> is there any way to parallel this?
>> >>> >>
>> >>> >> My feeling for the windowing is that writing should be done per
>> window
>> >>> >> (my window is daily) or at least to be able to configure it
>> >>> >>
>> >>> >> chaim
>> >>> >>
>> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
>> <re...@google.com.invalid>
>> >>> >> wrote:
>> >>> >> > So the problem is you are running on Dataflow, and it's taking
>> longer
>> >>> >> than
>> >>> >> > you think it should? If you provide the Dataflow job id we can
>> help
>> >>> you
>> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> >>> into a
>> >>> >> > Dataflow debugging session we should move it off of the Beam list
>> and
>> >>> >> onto
>> >>> >> > a Dataflow-specific tread)
>> >>> >> >
>> >>> >> > Reuven
>> >>> >> >
>> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
>> >>> wrote:
>> >>> >> >
>> >>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >>> >> >> minutes, while it should be around 15 minutes.
>> >>> >> >> Do i need to chunk the code myself to windows, and run in
>> parallel?
>> >>> >> >> chaim
>> >>> >> >>
>> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
>> <relax@google.com.invalid
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
>> mode)
>> >>> >> does
>> >>> >> >> not
>> >>> >> >> > produce results for a stage until it has processed that entire
>> >>> stage.
>> >>> >> The
>> >>> >> >> > reason for this is that the batch runner is optimized for
>> >>> throughput,
>> >>> >> not
>> >>> >> >> > latency; it wants to minimize the time for the entire job to
>> >>> finish,
>> >>> >> not
>> >>> >> >> > the time till first output. The side input will not be
>> materialized
>> >>> >> until
>> >>> >> >> > all of the data for all of the windows of the side input have
>> been
>> >>> >> >> > processed. The streaming runner on the other hand will produce
>> >>> >> windows as
>> >>> >> >> > they finish. So for the batch runner, there is no performance
>> >>> >> advantage
>> >>> >> >> you
>> >>> >> >> > get for windowing the side input.
>> >>> >> >> >
>> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
>> globally
>> >>> >> >> > windowed is a bit confusing and not well documented. We should
>> add
>> >>> >> better
>> >>> >> >> > javadoc explaining this.
>> >>> >> >> >
>> >>> >> >> > Reuven
>> >>> >> >> >
>> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
>> chaim@behalf.com>
>> >>> >> wrote:
>> >>> >> >> >
>> >>> >> >> >> batch on dataflow
>> >>> >> >> >>
>> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> >>> <relax@google.com.invalid
>> >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >>> >> >> >> >
>> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
>> chaim@behalf.com
>> >>> >
>> >>> >> >> wrote:
>> >>> >> >> >> >
>> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
>> case.
>> >>> >> From
>> >>> >> >> >> >> what i have seen, since i have other code to update status
>> >>> based
>> >>> >> on
>> >>> >> >> >> >> the window, it does get called before all the windows are
>> >>> >> calculated.
>> >>> >> >> >> >> There is no logical reason to wait, once the window has
>> >>> finished,
>> >>> >> the
>> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
>> start
>> >>> to
>> >>> >> >> write
>> >>> >> >> >> >> the results.
>> >>> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> >>> >> <relax@google.com.invalid
>> >>> >> >> >
>> >>> >> >> >> >> wrote:
>> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows,
>> and
>> >>> >> >> writing
>> >>> >> >> >> it
>> >>> >> >> >> >> > windowed would result in incorrect output. For this
>> reason,
>> >>> >> >> BigQueryIO
>> >>> >> >> >> >> > rewindows int global windows before actually writing to
>> >>> >> BigQuery.
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > If you are running in batch mode, there is no performance
>> >>> >> >> difference
>> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
>> that
>> >>> all
>> >>> >> of
>> >>> >> >> the
>> >>> >> >> >> >> > batch runners wait until all windows are calculated
>> before
>> >>> >> >> >> materializing
>> >>> >> >> >> >> > the output.
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > Reuven
>> >>> >> >> >> >> >
>> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>> >>> chaim@behalf.com
>> >>> >> >
>> >>> >> >> >> wrote:
>> >>> >> >> >> >> >
>> >>> >> >> >> >> >> the schema depends on the data per window.
>> >>> >> >> >> >> >> when i added the global window it works, but then i
>> loose
>> >>> the
>> >>> >> >> >> >> >> performance, since the secound stage of writing will
>> begin
>> >>> only
>> >>> >> >> after
>> >>> >> >> >> >> >> the side input has read all the data and updated the
>> schema
>> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
>> window
>> >>> >> that
>> >>> >> >> i
>> >>> >> >> >> >> >> don't know why?
>> >>> >> >> >> >> >>
>> >>> >> >> >> >> >> chaim
>> >>> >> >> >> >> >>
>> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
>> between
>> >>> >> >> >> different
>> >>> >> >> >> >> >> > windows, or do they depend only on data?
>> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>> >>> >> GlobalWindows())
>> >>> >> >> for
>> >>> >> >> >> >> your
>> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
>> out?
>> >>> >> >> >> >> >> >
>> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> >>> >> chaim@behalf.com>
>> >>> >> >> >> wrote:
>> >>> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> my code is:
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     //read docs from mongo
>> >>> >> >> >> >> >> >>                     final PCollection<Document> docs
>> =
>> >>> >> pipeline
>> >>> >> >> >> >> >> >>
>>  .apply(table.getTableName(),
>> >>> >> >> >> >> >> MongoDbIO.read()
>> >>> >> >> >> >> >> >>
>> >>>  .withUri("mongodb://" +
>> >>> >> >> >> >> >> >> connectionParams)
>> >>> >> >> >> >> >> >>
>>  .withFilter(filter)
>> >>> >> >> >> >> >> >>
>> >>>  .withDatabase(options.
>> >>> >> >> >> >> getDBName())
>> >>> >> >> >> >> >> >>
>> >>>  .withCollection(table.
>> >>> >> >> >> >> >> getTableName()))
>> >>> >> >> >> >> >> >>
>>  .apply("AddEventTimestamps",
>> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >>> >> >> >> >> >> >>                             .apply("Window Daily",
>> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     //update bq schema based on
>> window
>> >>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>> >>> >> String>>
>> >>> >> >> >> >> >> >> tableSchemas = docs
>> >>> >> >> >> >> >> >> //                            .apply("Global
>> >>> >> >> >> Window",Window.into(new
>> >>> >> >> >> >> >> >> GlobalWindows()))
>> >>> >> >> >> >> >> >>                             .apply("extract schema "
>> +
>> >>> >> >> >> >> >> >> table.getTableName(), new
>> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
>> DocsToSchemaTransform(table))
>> >>> >> >> >> >> >> >>
>>  .apply("getTableSchemaMemory
>> >>> " +
>> >>> >> >> >> >> >> >> table.getTableName(),
>> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
>> table.getTableName())))
>> >>> >> >> >> >> >> >>                             .apply(View.asMap());
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
>> docsRows
>> >>> =
>> >>> >> docs
>> >>> >> >> >> >> >> >>                             .apply("doc to row " +
>> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >>> >> >> >> getBqTableName(),
>> >>> >> >> >> >> >> >> tableSchemas))
>> >>> >> >> >> >> >> >>
>> >>> >> >>  .withSideInputs(tableSchemas))
>> >>> >> >> >> ;
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>                     final WriteResult apply =
>> docsRows
>> >>> >> >> >> >> >> >>                             .apply("insert data
>> table -
>> >>> " +
>> >>> >> >> >> >> >> >> table.getTableName(),
>> >>> >> >> >> >> >> >>
>> >>> >>  BigQueryIO.writeTableRows()
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >>> >> >> >> CreateDisposition.CREATE_IF_
>> >>> >> >> >> >> >> NEEDED)
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> exception is:
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> >>> <init>
>> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
>> 0d/cb3f0aef-9aeb-47ac-93dc-
>> >>> >> >> >> d9a12e4fdcfb.
>> >>> >> >> >> >> >> >> Exception in thread "main"
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
>> PipelineExecutionException:
>> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>> >>> side
>> >>> >> >> input
>> >>> >> >> >> >> window
>> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >>> >> >> DirectPipelineResult.
>> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >>> >> >> DirectPipelineResult.
>> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >>> >> >> >> >> >> DirectRunner.java:200)
>> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >>> >> >> >> >> >> DirectRunner.java:63)
>> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> run(Pipeline.java:297)
>> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
>> run(Pipeline.java:283)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >>> >> LoadMongodbDataPipeline.
>> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >>> >> >> >> LoadMongodbDataPipeline.main(
>> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>> >>> Attempted to
>> >>> >> >> get
>> >>> >> >> >> side
>> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
>> WindowFn
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >>> >> >> PartitioningWindowFn$1.
>> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
>> core.
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
>> >>> >> >> >> >> >> ner.java:94)
>> >>> >> >> >> >> >> >> at
>> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
>> core.
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> >>> >> processElementInReadyWindows(
>> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> >>> <init>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >>> >> >> >> >> >> >> > Please include the full exception and please show
>> the
>> >>> code
>> >>> >> >> that
>> >>> >> >> >> >> >> produces
>> >>> >> >> >> >> >> >> it.
>> >>> >> >> >> >> >> >> > See also
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >>> >> >> >> >> >> guide/#transforms-sideio
>> >>> >> >> >> >> >> >> > section
>> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
>> sufficient
>> >>> to
>> >>> >> >> >> resolve
>> >>> >> >> >> >> your
>> >>> >> >> >> >> >> >> > problem.
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >>> >> >> chaim@behalf.com>
>> >>> >> >> >> >> wrote:
>> >>> >> >> >> >> >> >> >
>> >>> >> >> >> >> >> >> >> Hi,
>> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
>> mongo
>> >>> >> >> updates
>> >>> >> >> >> the
>> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>> >>> want a
>> >>> >> >> >> >> partitioned
>> >>> >> >> >> >> >> >> >> table, i have a dally window.
>> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
>> get the
>> >>> >> >> >> exception
>> >>> >> >> >> >> of:
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> >> Attempted to get side input window for
>> GlobalWindow
>> >>> from
>> >>> >> >> >> >> non-global
>> >>> >> >> >> >> >> >> >> WindowFn"
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >> >> chaim
>> >>> >> >> >> >> >> >> >>
>> >>> >> >> >> >> >> >>
>> >>> >> >> >> >> >>
>> >>> >> >> >> >>
>> >>> >> >> >>
>> >>> >> >>
>> >>> >>
>> >>>
>>

Re: BigQueryIO withSchemaFromView

Posted by Lukasz Cwik <lc...@google.com.INVALID>.
Support was added to expose how users want to load their data with
https://github.com/apache/beam/commit/075d4d45a9cd398f3b4023b6efd495cc58eb9bdd
It is planned to be released in 2.2.0

On Tue, Sep 12, 2017 at 11:48 PM, Chaim Turkel <ch...@behalf.com> wrote:

> from what i found it I have the windowing with bigquery partition (per
> day - 1545 partitions) the insert can take 5 hours, where if there is
> no partitions then it takes about 12 minutes
>
> I have 13,843,080 recrods 6.76 GB.
> Any ideas how to get the partition to work faster.
>
> Is there a way to get the BigQueryIO to use streaming and not jobs?
>
> chaim
>
> On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com> wrote:
> > i am using windowing for the partion of the table, maybe that has to do
> with it?
> >
> > On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> >> Ok, something is going wrong then. It appears that your job created over
> >> 14,000 BigQuery load jobs, which is not expected (and probably why
> things
> >> were so slow).
> >>
> >> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
> >>
> >>> no that specific job created only 2 tables
> >>>
> >>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
> >>> wrote:
> >>> > It looks like your job is creating about 14,45 distinct BigQuery
> tables.
> >>> > Does that sound correct to you?
> >>> >
> >>> > Reuven
> >>> >
> >>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >>> >
> >>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >>> >> as you can see the majority of the time is inserting into bigquery.
> >>> >> is there any way to parallel this?
> >>> >>
> >>> >> My feeling for the windowing is that writing should be done per
> window
> >>> >> (my window is daily) or at least to be able to configure it
> >>> >>
> >>> >> chaim
> >>> >>
> >>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax
> <re...@google.com.invalid>
> >>> >> wrote:
> >>> >> > So the problem is you are running on Dataflow, and it's taking
> longer
> >>> >> than
> >>> >> > you think it should? If you provide the Dataflow job id we can
> help
> >>> you
> >>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> >>> into a
> >>> >> > Dataflow debugging session we should move it off of the Beam list
> and
> >>> >> onto
> >>> >> > a Dataflow-specific tread)
> >>> >> >
> >>> >> > Reuven
> >>> >> >
> >>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
> >>> wrote:
> >>> >> >
> >>> >> >> is there a way around this, my time for 13gb is not close to 30
> >>> >> >> minutes, while it should be around 15 minutes.
> >>> >> >> Do i need to chunk the code myself to windows, and run in
> parallel?
> >>> >> >> chaim
> >>> >> >>
> >>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax
> <relax@google.com.invalid
> >>> >
> >>> >> >> wrote:
> >>> >> >> > In that case I can say unequivocally that Dataflow (in batch
> mode)
> >>> >> does
> >>> >> >> not
> >>> >> >> > produce results for a stage until it has processed that entire
> >>> stage.
> >>> >> The
> >>> >> >> > reason for this is that the batch runner is optimized for
> >>> throughput,
> >>> >> not
> >>> >> >> > latency; it wants to minimize the time for the entire job to
> >>> finish,
> >>> >> not
> >>> >> >> > the time till first output. The side input will not be
> materialized
> >>> >> until
> >>> >> >> > all of the data for all of the windows of the side input have
> been
> >>> >> >> > processed. The streaming runner on the other hand will produce
> >>> >> windows as
> >>> >> >> > they finish. So for the batch runner, there is no performance
> >>> >> advantage
> >>> >> >> you
> >>> >> >> > get for windowing the side input.
> >>> >> >> >
> >>> >> >> > The fact that BigQueryIO needs the schema side input to be
> globally
> >>> >> >> > windowed is a bit confusing and not well documented. We should
> add
> >>> >> better
> >>> >> >> > javadoc explaining this.
> >>> >> >> >
> >>> >> >> > Reuven
> >>> >> >> >
> >>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <
> chaim@behalf.com>
> >>> >> wrote:
> >>> >> >> >
> >>> >> >> >> batch on dataflow
> >>> >> >> >>
> >>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> >>> <relax@google.com.invalid
> >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >>> >> >> >> >
> >>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <
> chaim@behalf.com
> >>> >
> >>> >> >> wrote:
> >>> >> >> >> >
> >>> >> >> >> >> Thank for the answer, but i don't think that that is the
> case.
> >>> >> From
> >>> >> >> >> >> what i have seen, since i have other code to update status
> >>> based
> >>> >> on
> >>> >> >> >> >> the window, it does get called before all the windows are
> >>> >> calculated.
> >>> >> >> >> >> There is no logical reason to wait, once the window has
> >>> finished,
> >>> >> the
> >>> >> >> >> >> rest of the pipeline should run and the BigQuery should
> start
> >>> to
> >>> >> >> write
> >>> >> >> >> >> the results.
> >>> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> >>> >> <relax@google.com.invalid
> >>> >> >> >
> >>> >> >> >> >> wrote:
> >>> >> >> >> >> > Logically the BigQuery write does not depend on windows,
> and
> >>> >> >> writing
> >>> >> >> >> it
> >>> >> >> >> >> > windowed would result in incorrect output. For this
> reason,
> >>> >> >> BigQueryIO
> >>> >> >> >> >> > rewindows int global windows before actually writing to
> >>> >> BigQuery.
> >>> >> >> >> >> >
> >>> >> >> >> >> > If you are running in batch mode, there is no performance
> >>> >> >> difference
> >>> >> >> >> >> > between windowed and unwindowed side inputs. I believe
> that
> >>> all
> >>> >> of
> >>> >> >> the
> >>> >> >> >> >> > batch runners wait until all windows are calculated
> before
> >>> >> >> >> materializing
> >>> >> >> >> >> > the output.
> >>> >> >> >> >> >
> >>> >> >> >> >> > Reuven
> >>> >> >> >> >> >
> >>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
> >>> chaim@behalf.com
> >>> >> >
> >>> >> >> >> wrote:
> >>> >> >> >> >> >
> >>> >> >> >> >> >> the schema depends on the data per window.
> >>> >> >> >> >> >> when i added the global window it works, but then i
> loose
> >>> the
> >>> >> >> >> >> >> performance, since the secound stage of writing will
> begin
> >>> only
> >>> >> >> after
> >>> >> >> >> >> >> the side input has read all the data and updated the
> schema
> >>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global
> window
> >>> >> that
> >>> >> >> i
> >>> >> >> >> >> >> don't know why?
> >>> >> >> >> >> >>
> >>> >> >> >> >> >> chaim
> >>> >> >> >> >> >>
> >>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >>> >> >> >> >> >> > Are your schemas actually supposed to be different
> between
> >>> >> >> >> different
> >>> >> >> >> >> >> > windows, or do they depend only on data?
> >>> >> >> >> >> >> > I see you have a commented-out Window.into(new
> >>> >> GlobalWindows())
> >>> >> >> for
> >>> >> >> >> >> your
> >>> >> >> >> >> >> > side input - did that work when it wasn't commented
> out?
> >>> >> >> >> >> >> >
> >>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> >>> >> chaim@behalf.com>
> >>> >> >> >> wrote:
> >>> >> >> >> >> >> >
> >>> >> >> >> >> >> >> my code is:
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     //read docs from mongo
> >>> >> >> >> >> >> >>                     final PCollection<Document> docs
> =
> >>> >> pipeline
> >>> >> >> >> >> >> >>
>  .apply(table.getTableName(),
> >>> >> >> >> >> >> MongoDbIO.read()
> >>> >> >> >> >> >> >>
> >>>  .withUri("mongodb://" +
> >>> >> >> >> >> >> >> connectionParams)
> >>> >> >> >> >> >> >>
>  .withFilter(filter)
> >>> >> >> >> >> >> >>
> >>>  .withDatabase(options.
> >>> >> >> >> >> getDBName())
> >>> >> >> >> >> >> >>
> >>>  .withCollection(table.
> >>> >> >> >> >> >> getTableName()))
> >>> >> >> >> >> >> >>
>  .apply("AddEventTimestamps",
> >>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >>> >> >> >> >> >> >>                             .apply("Window Daily",
> >>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     //update bq schema based on
> window
> >>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
> >>> >> String>>
> >>> >> >> >> >> >> >> tableSchemas = docs
> >>> >> >> >> >> >> >> //                            .apply("Global
> >>> >> >> >> Window",Window.into(new
> >>> >> >> >> >> >> >> GlobalWindows()))
> >>> >> >> >> >> >> >>                             .apply("extract schema "
> +
> >>> >> >> >> >> >> >> table.getTableName(), new
> >>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.
> DocsToSchemaTransform(table))
> >>> >> >> >> >> >> >>
>  .apply("getTableSchemaMemory
> >>> " +
> >>> >> >> >> >> >> >> table.getTableName(),
> >>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(
> table.getTableName())))
> >>> >> >> >> >> >> >>                             .apply(View.asMap());
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     final PCollection<TableRow>
> docsRows
> >>> =
> >>> >> docs
> >>> >> >> >> >> >> >>                             .apply("doc to row " +
> >>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >>> >> >> >> getBqTableName(),
> >>> >> >> >> >> >> >> tableSchemas))
> >>> >> >> >> >> >> >>
> >>> >> >>  .withSideInputs(tableSchemas))
> >>> >> >> >> ;
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>                     final WriteResult apply =
> docsRows
> >>> >> >> >> >> >> >>                             .apply("insert data
> table -
> >>> " +
> >>> >> >> >> >> >> >> table.getTableName(),
> >>> >> >> >> >> >> >>
> >>> >>  BigQueryIO.writeTableRows()
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >>> >> >> >> CreateDisposition.CREATE_IF_
> >>> >> >> >> >> >> NEEDED)
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> exception is:
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> >>> <init>
> >>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d20
> 0d/cb3f0aef-9aeb-47ac-93dc-
> >>> >> >> >> d9a12e4fdcfb.
> >>> >> >> >> >> >> >> Exception in thread "main"
> >>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$
> PipelineExecutionException:
> >>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
> >>> side
> >>> >> >> input
> >>> >> >> >> >> window
> >>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >>> >> >> DirectPipelineResult.
> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >>> >> >> DirectPipelineResult.
> >>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >>> >> >> >> >> >> DirectRunner.java:200)
> >>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >>> >> >> >> >> >> DirectRunner.java:63)
> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> run(Pipeline.java:297)
> >>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.
> run(Pipeline.java:283)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >>> >> LoadMongodbDataPipeline.
> >>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >>> >> >> >> LoadMongodbDataPipeline.main(
> >>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
> >>> Attempted to
> >>> >> >> get
> >>> >> >> >> side
> >>> >> >> >> >> >> >> input window for GlobalWindow from non-global
> WindowFn
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >>> >> >> PartitioningWindowFn$1.
> >>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
> core.
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >>> >> >> >> >> SimplePushbackSideInputDoFnRun
> >>> >> >> >> >> >> ner.java:94)
> >>> >> >> >> >> >> >> at
> >>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.
> core.
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> >>> >> processElementInReadyWindows(
> >>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> >>> <init>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >>> >> >> >> >> >> >> > Please include the full exception and please show
> the
> >>> code
> >>> >> >> that
> >>> >> >> >> >> >> produces
> >>> >> >> >> >> >> >> it.
> >>> >> >> >> >> >> >> > See also
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
> >>> >> >> >> >> >> guide/#transforms-sideio
> >>> >> >> >> >> >> >> > section
> >>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be
> sufficient
> >>> to
> >>> >> >> >> resolve
> >>> >> >> >> >> your
> >>> >> >> >> >> >> >> > problem.
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >>> >> >> chaim@behalf.com>
> >>> >> >> >> >> wrote:
> >>> >> >> >> >> >> >> >
> >>> >> >> >> >> >> >> >> Hi,
> >>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from
> mongo
> >>> >> >> updates
> >>> >> >> >> the
> >>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
> >>> want a
> >>> >> >> >> >> partitioned
> >>> >> >> >> >> >> >> >> table, i have a dally window.
> >>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i
> get the
> >>> >> >> >> exception
> >>> >> >> >> >> of:
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> >> Attempted to get side input window for
> GlobalWindow
> >>> from
> >>> >> >> >> >> non-global
> >>> >> >> >> >> >> >> >> WindowFn"
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >> >> chaim
> >>> >> >> >> >> >> >> >>
> >>> >> >> >> >> >> >>
> >>> >> >> >> >> >>
> >>> >> >> >> >>
> >>> >> >> >>
> >>> >> >>
> >>> >>
> >>>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
from what i found it I have the windowing with bigquery partition (per
day - 1545 partitions) the insert can take 5 hours, where if there is
no partitions then it takes about 12 minutes

I have 13,843,080 recrods 6.76 GB.
Any ideas how to get the partition to work faster.

Is there a way to get the BigQueryIO to use streaming and not jobs?

chaim

On Tue, Sep 12, 2017 at 11:32 PM, Chaim Turkel <ch...@behalf.com> wrote:
> i am using windowing for the partion of the table, maybe that has to do with it?
>
> On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid> wrote:
>> Ok, something is going wrong then. It appears that your job created over
>> 14,000 BigQuery load jobs, which is not expected (and probably why things
>> were so slow).
>>
>> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>>
>>> no that specific job created only 2 tables
>>>
>>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
>>> wrote:
>>> > It looks like your job is creating about 14,45 distinct BigQuery tables.
>>> > Does that sound correct to you?
>>> >
>>> > Reuven
>>> >
>>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com> wrote:
>>> >
>>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>>> >> as you can see the majority of the time is inserting into bigquery.
>>> >> is there any way to parallel this?
>>> >>
>>> >> My feeling for the windowing is that writing should be done per window
>>> >> (my window is daily) or at least to be able to configure it
>>> >>
>>> >> chaim
>>> >>
>>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid>
>>> >> wrote:
>>> >> > So the problem is you are running on Dataflow, and it's taking longer
>>> >> than
>>> >> > you think it should? If you provide the Dataflow job id we can help
>>> you
>>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>>> into a
>>> >> > Dataflow debugging session we should move it off of the Beam list and
>>> >> onto
>>> >> > a Dataflow-specific tread)
>>> >> >
>>> >> > Reuven
>>> >> >
>>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
>>> wrote:
>>> >> >
>>> >> >> is there a way around this, my time for 13gb is not close to 30
>>> >> >> minutes, while it should be around 15 minutes.
>>> >> >> Do i need to chunk the code myself to windows, and run in parallel?
>>> >> >> chaim
>>> >> >>
>>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <relax@google.com.invalid
>>> >
>>> >> >> wrote:
>>> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
>>> >> does
>>> >> >> not
>>> >> >> > produce results for a stage until it has processed that entire
>>> stage.
>>> >> The
>>> >> >> > reason for this is that the batch runner is optimized for
>>> throughput,
>>> >> not
>>> >> >> > latency; it wants to minimize the time for the entire job to
>>> finish,
>>> >> not
>>> >> >> > the time till first output. The side input will not be materialized
>>> >> until
>>> >> >> > all of the data for all of the windows of the side input have been
>>> >> >> > processed. The streaming runner on the other hand will produce
>>> >> windows as
>>> >> >> > they finish. So for the batch runner, there is no performance
>>> >> advantage
>>> >> >> you
>>> >> >> > get for windowing the side input.
>>> >> >> >
>>> >> >> > The fact that BigQueryIO needs the schema side input to be globally
>>> >> >> > windowed is a bit confusing and not well documented. We should add
>>> >> better
>>> >> >> > javadoc explaining this.
>>> >> >> >
>>> >> >> > Reuven
>>> >> >> >
>>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com>
>>> >> wrote:
>>> >> >> >
>>> >> >> >> batch on dataflow
>>> >> >> >>
>>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>>> <relax@google.com.invalid
>>> >> >
>>> >> >> >> wrote:
>>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>>> >> >> >> >
>>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <chaim@behalf.com
>>> >
>>> >> >> wrote:
>>> >> >> >> >
>>> >> >> >> >> Thank for the answer, but i don't think that that is the case.
>>> >> From
>>> >> >> >> >> what i have seen, since i have other code to update status
>>> based
>>> >> on
>>> >> >> >> >> the window, it does get called before all the windows are
>>> >> calculated.
>>> >> >> >> >> There is no logical reason to wait, once the window has
>>> finished,
>>> >> the
>>> >> >> >> >> rest of the pipeline should run and the BigQuery should start
>>> to
>>> >> >> write
>>> >> >> >> >> the results.
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>>> >> <relax@google.com.invalid
>>> >> >> >
>>> >> >> >> >> wrote:
>>> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
>>> >> >> writing
>>> >> >> >> it
>>> >> >> >> >> > windowed would result in incorrect output. For this reason,
>>> >> >> BigQueryIO
>>> >> >> >> >> > rewindows int global windows before actually writing to
>>> >> BigQuery.
>>> >> >> >> >> >
>>> >> >> >> >> > If you are running in batch mode, there is no performance
>>> >> >> difference
>>> >> >> >> >> > between windowed and unwindowed side inputs. I believe that
>>> all
>>> >> of
>>> >> >> the
>>> >> >> >> >> > batch runners wait until all windows are calculated before
>>> >> >> >> materializing
>>> >> >> >> >> > the output.
>>> >> >> >> >> >
>>> >> >> >> >> > Reuven
>>> >> >> >> >> >
>>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>>> chaim@behalf.com
>>> >> >
>>> >> >> >> wrote:
>>> >> >> >> >> >
>>> >> >> >> >> >> the schema depends on the data per window.
>>> >> >> >> >> >> when i added the global window it works, but then i loose
>>> the
>>> >> >> >> >> >> performance, since the secound stage of writing will begin
>>> only
>>> >> >> after
>>> >> >> >> >> >> the side input has read all the data and updated the schema
>>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
>>> >> that
>>> >> >> i
>>> >> >> >> >> >> don't know why?
>>> >> >> >> >> >>
>>> >> >> >> >> >> chaim
>>> >> >> >> >> >>
>>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>>> >> >> >> >> >> > Are your schemas actually supposed to be different between
>>> >> >> >> different
>>> >> >> >> >> >> > windows, or do they depend only on data?
>>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>>> >> GlobalWindows())
>>> >> >> for
>>> >> >> >> >> your
>>> >> >> >> >> >> > side input - did that work when it wasn't commented out?
>>> >> >> >> >> >> >
>>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>>> >> chaim@behalf.com>
>>> >> >> >> wrote:
>>> >> >> >> >> >> >
>>> >> >> >> >> >> >> my code is:
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     //read docs from mongo
>>> >> >> >> >> >> >>                     final PCollection<Document> docs =
>>> >> pipeline
>>> >> >> >> >> >> >>                             .apply(table.getTableName(),
>>> >> >> >> >> >> MongoDbIO.read()
>>> >> >> >> >> >> >>
>>>  .withUri("mongodb://" +
>>> >> >> >> >> >> >> connectionParams)
>>> >> >> >> >> >> >>                                     .withFilter(filter)
>>> >> >> >> >> >> >>
>>>  .withDatabase(options.
>>> >> >> >> >> getDBName())
>>> >> >> >> >> >> >>
>>>  .withCollection(table.
>>> >> >> >> >> >> getTableName()))
>>> >> >> >> >> >> >>                             .apply("AddEventTimestamps",
>>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>>> >> >> >> >> >> >>                             .apply("Window Daily",
>>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     //update bq schema based on window
>>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>>> >> String>>
>>> >> >> >> >> >> >> tableSchemas = docs
>>> >> >> >> >> >> >> //                            .apply("Global
>>> >> >> >> Window",Window.into(new
>>> >> >> >> >> >> >> GlobalWindows()))
>>> >> >> >> >> >> >>                             .apply("extract schema " +
>>> >> >> >> >> >> >> table.getTableName(), new
>>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>>> >> >> >> >> >> >>                             .apply("getTableSchemaMemory
>>> " +
>>> >> >> >> >> >> >> table.getTableName(),
>>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>>> >> >> >> >> >> >>                             .apply(View.asMap());
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     final PCollection<TableRow> docsRows
>>> =
>>> >> docs
>>> >> >> >> >> >> >>                             .apply("doc to row " +
>>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>>> >> >> >> getBqTableName(),
>>> >> >> >> >> >> >> tableSchemas))
>>> >> >> >> >> >> >>
>>> >> >>  .withSideInputs(tableSchemas))
>>> >> >> >> ;
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>                     final WriteResult apply = docsRows
>>> >> >> >> >> >> >>                             .apply("insert data table -
>>> " +
>>> >> >> >> >> >> >> table.getTableName(),
>>> >> >> >> >> >> >>
>>> >>  BigQueryIO.writeTableRows()
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>>> >> >> >> CreateDisposition.CREATE_IF_
>>> >> >> >> >> >> NEEDED)
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> exception is:
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>>> <init>
>>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>>> >> >> >> d9a12e4fdcfb.
>>> >> >> >> >> >> >> Exception in thread "main"
>>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>>> side
>>> >> >> input
>>> >> >> >> >> window
>>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>>> >> >> DirectPipelineResult.
>>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>>> >> >> DirectPipelineResult.
>>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>>> >> >> >> >> >> DirectRunner.java:200)
>>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>>> >> >> >> >> >> DirectRunner.java:63)
>>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>>> >> LoadMongodbDataPipeline.
>>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>>> >> >> >> LoadMongodbDataPipeline.main(
>>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>>> Attempted to
>>> >> >> get
>>> >> >> >> side
>>> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>>> >> >> PartitioningWindowFn$1.
>>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>>> >> >> >> >> SimplePushbackSideInputDoFnRun
>>> >> >> >> >> >> ner.java:94)
>>> >> >> >> >> >> >> at
>>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>>> >> processElementInReadyWindows(
>>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>>> <init>
>>> >> >> >> >> >> >>
>>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>>> >> >> >> >> >> >> > Please include the full exception and please show the
>>> code
>>> >> >> that
>>> >> >> >> >> >> produces
>>> >> >> >> >> >> >> it.
>>> >> >> >> >> >> >> > See also
>>> >> >> >> >> >> >> >
>>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>>> >> >> >> >> >> guide/#transforms-sideio
>>> >> >> >> >> >> >> > section
>>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient
>>> to
>>> >> >> >> resolve
>>> >> >> >> >> your
>>> >> >> >> >> >> >> > problem.
>>> >> >> >> >> >> >> >
>>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>>> >> >> chaim@behalf.com>
>>> >> >> >> >> wrote:
>>> >> >> >> >> >> >> >
>>> >> >> >> >> >> >> >> Hi,
>>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
>>> >> >> updates
>>> >> >> >> the
>>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>>> want a
>>> >> >> >> >> partitioned
>>> >> >> >> >> >> >> >> table, i have a dally window.
>>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
>>> >> >> >> exception
>>> >> >> >> >> of:
>>> >> >> >> >> >> >> >>
>>> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow
>>> from
>>> >> >> >> >> non-global
>>> >> >> >> >> >> >> >> WindowFn"
>>> >> >> >> >> >> >> >>
>>> >> >> >> >> >> >> >> chaim
>>> >> >> >> >> >> >> >>
>>> >> >> >> >> >> >>
>>> >> >> >> >> >>
>>> >> >> >> >>
>>> >> >> >>
>>> >> >>
>>> >>
>>>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
i am using windowing for the partion of the table, maybe that has to do with it?

On Tue, Sep 12, 2017 at 11:25 PM, Reuven Lax <re...@google.com.invalid> wrote:
> Ok, something is going wrong then. It appears that your job created over
> 14,000 BigQuery load jobs, which is not expected (and probably why things
> were so slow).
>
> On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> no that specific job created only 2 tables
>>
>> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > It looks like your job is creating about 14,45 distinct BigQuery tables.
>> > Does that sound correct to you?
>> >
>> > Reuven
>> >
>> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> the job id is 2017-09-12_02_57_55-5233544151932101752
>> >> as you can see the majority of the time is inserting into bigquery.
>> >> is there any way to parallel this?
>> >>
>> >> My feeling for the windowing is that writing should be done per window
>> >> (my window is daily) or at least to be able to configure it
>> >>
>> >> chaim
>> >>
>> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid>
>> >> wrote:
>> >> > So the problem is you are running on Dataflow, and it's taking longer
>> >> than
>> >> > you think it should? If you provide the Dataflow job id we can help
>> you
>> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
>> into a
>> >> > Dataflow debugging session we should move it off of the Beam list and
>> >> onto
>> >> > a Dataflow-specific tread)
>> >> >
>> >> > Reuven
>> >> >
>> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >
>> >> >> is there a way around this, my time for 13gb is not close to 30
>> >> >> minutes, while it should be around 15 minutes.
>> >> >> Do i need to chunk the code myself to windows, and run in parallel?
>> >> >> chaim
>> >> >>
>> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <relax@google.com.invalid
>> >
>> >> >> wrote:
>> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
>> >> does
>> >> >> not
>> >> >> > produce results for a stage until it has processed that entire
>> stage.
>> >> The
>> >> >> > reason for this is that the batch runner is optimized for
>> throughput,
>> >> not
>> >> >> > latency; it wants to minimize the time for the entire job to
>> finish,
>> >> not
>> >> >> > the time till first output. The side input will not be materialized
>> >> until
>> >> >> > all of the data for all of the windows of the side input have been
>> >> >> > processed. The streaming runner on the other hand will produce
>> >> windows as
>> >> >> > they finish. So for the batch runner, there is no performance
>> >> advantage
>> >> >> you
>> >> >> > get for windowing the side input.
>> >> >> >
>> >> >> > The fact that BigQueryIO needs the schema side input to be globally
>> >> >> > windowed is a bit confusing and not well documented. We should add
>> >> better
>> >> >> > javadoc explaining this.
>> >> >> >
>> >> >> > Reuven
>> >> >> >
>> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >> >
>> >> >> >> batch on dataflow
>> >> >> >>
>> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
>> <relax@google.com.invalid
>> >> >
>> >> >> >> wrote:
>> >> >> >> > Which runner are you using? And is this a batch pipeline?
>> >> >> >> >
>> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <chaim@behalf.com
>> >
>> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> Thank for the answer, but i don't think that that is the case.
>> >> From
>> >> >> >> >> what i have seen, since i have other code to update status
>> based
>> >> on
>> >> >> >> >> the window, it does get called before all the windows are
>> >> calculated.
>> >> >> >> >> There is no logical reason to wait, once the window has
>> finished,
>> >> the
>> >> >> >> >> rest of the pipeline should run and the BigQuery should start
>> to
>> >> >> write
>> >> >> >> >> the results.
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> >> <relax@google.com.invalid
>> >> >> >
>> >> >> >> >> wrote:
>> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
>> >> >> writing
>> >> >> >> it
>> >> >> >> >> > windowed would result in incorrect output. For this reason,
>> >> >> BigQueryIO
>> >> >> >> >> > rewindows int global windows before actually writing to
>> >> BigQuery.
>> >> >> >> >> >
>> >> >> >> >> > If you are running in batch mode, there is no performance
>> >> >> difference
>> >> >> >> >> > between windowed and unwindowed side inputs. I believe that
>> all
>> >> of
>> >> >> the
>> >> >> >> >> > batch runners wait until all windows are calculated before
>> >> >> >> materializing
>> >> >> >> >> > the output.
>> >> >> >> >> >
>> >> >> >> >> > Reuven
>> >> >> >> >> >
>> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
>> chaim@behalf.com
>> >> >
>> >> >> >> wrote:
>> >> >> >> >> >
>> >> >> >> >> >> the schema depends on the data per window.
>> >> >> >> >> >> when i added the global window it works, but then i loose
>> the
>> >> >> >> >> >> performance, since the secound stage of writing will begin
>> only
>> >> >> after
>> >> >> >> >> >> the side input has read all the data and updated the schema
>> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
>> >> that
>> >> >> i
>> >> >> >> >> >> don't know why?
>> >> >> >> >> >>
>> >> >> >> >> >> chaim
>> >> >> >> >> >>
>> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> >> > Are your schemas actually supposed to be different between
>> >> >> >> different
>> >> >> >> >> >> > windows, or do they depend only on data?
>> >> >> >> >> >> > I see you have a commented-out Window.into(new
>> >> GlobalWindows())
>> >> >> for
>> >> >> >> >> your
>> >> >> >> >> >> > side input - did that work when it wasn't commented out?
>> >> >> >> >> >> >
>> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> >> chaim@behalf.com>
>> >> >> >> wrote:
>> >> >> >> >> >> >
>> >> >> >> >> >> >> my code is:
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     //read docs from mongo
>> >> >> >> >> >> >>                     final PCollection<Document> docs =
>> >> pipeline
>> >> >> >> >> >> >>                             .apply(table.getTableName(),
>> >> >> >> >> >> MongoDbIO.read()
>> >> >> >> >> >> >>
>>  .withUri("mongodb://" +
>> >> >> >> >> >> >> connectionParams)
>> >> >> >> >> >> >>                                     .withFilter(filter)
>> >> >> >> >> >> >>
>>  .withDatabase(options.
>> >> >> >> >> getDBName())
>> >> >> >> >> >> >>
>>  .withCollection(table.
>> >> >> >> >> >> getTableName()))
>> >> >> >> >> >> >>                             .apply("AddEventTimestamps",
>> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >> >> >> >> >>                             .apply("Window Daily",
>> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     //update bq schema based on window
>> >> >> >> >> >> >>                     final PCollectionView<Map<String,
>> >> String>>
>> >> >> >> >> >> >> tableSchemas = docs
>> >> >> >> >> >> >> //                            .apply("Global
>> >> >> >> Window",Window.into(new
>> >> >> >> >> >> >> GlobalWindows()))
>> >> >> >> >> >> >>                             .apply("extract schema " +
>> >> >> >> >> >> >> table.getTableName(), new
>> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >> >> >> >> >>                             .apply("getTableSchemaMemory
>> " +
>> >> >> >> >> >> >> table.getTableName(),
>> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >> >> >> >> >>                             .apply(View.asMap());
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     final PCollection<TableRow> docsRows
>> =
>> >> docs
>> >> >> >> >> >> >>                             .apply("doc to row " +
>> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >> >> >> getBqTableName(),
>> >> >> >> >> >> >> tableSchemas))
>> >> >> >> >> >> >>
>> >> >>  .withSideInputs(tableSchemas))
>> >> >> >> ;
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>                     final WriteResult apply = docsRows
>> >> >> >> >> >> >>                             .apply("insert data table -
>> " +
>> >> >> >> >> >> >> table.getTableName(),
>> >> >> >> >> >> >>
>> >>  BigQueryIO.writeTableRows()
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >> >> >> CreateDisposition.CREATE_IF_
>> >> >> >> >> >> NEEDED)
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >> >> >> >> >>
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> exception is:
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> <init>
>> >> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>> >> >> >> d9a12e4fdcfb.
>> >> >> >> >> >> >> Exception in thread "main"
>> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
>> side
>> >> >> input
>> >> >> >> >> window
>> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> >> DirectPipelineResult.
>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> >> DirectPipelineResult.
>> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> >> >> DirectRunner.java:200)
>> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> >> >> DirectRunner.java:63)
>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> LoadMongodbDataPipeline.
>> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> >> >> LoadMongodbDataPipeline.main(
>> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
>> Attempted to
>> >> >> get
>> >> >> >> side
>> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >> >> PartitioningWindowFn$1.
>> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >> >> >> >> SimplePushbackSideInputDoFnRun
>> >> >> >> >> >> ner.java:94)
>> >> >> >> >> >> >> at
>> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> >> processElementInReadyWindows(
>> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
>> <init>
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> >> >> > Please include the full exception and please show the
>> code
>> >> >> that
>> >> >> >> >> >> produces
>> >> >> >> >> >> >> it.
>> >> >> >> >> >> >> > See also
>> >> >> >> >> >> >> >
>> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >> >> >> >> >> guide/#transforms-sideio
>> >> >> >> >> >> >> > section
>> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient
>> to
>> >> >> >> resolve
>> >> >> >> >> your
>> >> >> >> >> >> >> > problem.
>> >> >> >> >> >> >> >
>> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >> >> chaim@behalf.com>
>> >> >> >> >> wrote:
>> >> >> >> >> >> >> >
>> >> >> >> >> >> >> >> Hi,
>> >> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
>> >> >> updates
>> >> >> >> the
>> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
>> want a
>> >> >> >> >> partitioned
>> >> >> >> >> >> >> >> table, i have a dally window.
>> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
>> >> >> >> exception
>> >> >> >> >> of:
>> >> >> >> >> >> >> >>
>> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow
>> from
>> >> >> >> >> non-global
>> >> >> >> >> >> >> >> WindowFn"
>> >> >> >> >> >> >> >>
>> >> >> >> >> >> >> >> chaim
>> >> >> >> >> >> >> >>
>> >> >> >> >> >> >>
>> >> >> >> >> >>
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
Ok, something is going wrong then. It appears that your job created over
14,000 BigQuery load jobs, which is not expected (and probably why things
were so slow).

On Tue, Sep 12, 2017 at 8:50 AM, Chaim Turkel <ch...@behalf.com> wrote:

> no that specific job created only 2 tables
>
> On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > It looks like your job is creating about 14,45 distinct BigQuery tables.
> > Does that sound correct to you?
> >
> > Reuven
> >
> > On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> the job id is 2017-09-12_02_57_55-5233544151932101752
> >> as you can see the majority of the time is inserting into bigquery.
> >> is there any way to parallel this?
> >>
> >> My feeling for the windowing is that writing should be done per window
> >> (my window is daily) or at least to be able to configure it
> >>
> >> chaim
> >>
> >> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >> > So the problem is you are running on Dataflow, and it's taking longer
> >> than
> >> > you think it should? If you provide the Dataflow job id we can help
> you
> >> > debug why it's taking 30 minutes. (and as an aside, if this turns
> into a
> >> > Dataflow debugging session we should move it off of the Beam list and
> >> onto
> >> > a Dataflow-specific tread)
> >> >
> >> > Reuven
> >> >
> >> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >
> >> >> is there a way around this, my time for 13gb is not close to 30
> >> >> minutes, while it should be around 15 minutes.
> >> >> Do i need to chunk the code myself to windows, and run in parallel?
> >> >> chaim
> >> >>
> >> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <relax@google.com.invalid
> >
> >> >> wrote:
> >> >> > In that case I can say unequivocally that Dataflow (in batch mode)
> >> does
> >> >> not
> >> >> > produce results for a stage until it has processed that entire
> stage.
> >> The
> >> >> > reason for this is that the batch runner is optimized for
> throughput,
> >> not
> >> >> > latency; it wants to minimize the time for the entire job to
> finish,
> >> not
> >> >> > the time till first output. The side input will not be materialized
> >> until
> >> >> > all of the data for all of the windows of the side input have been
> >> >> > processed. The streaming runner on the other hand will produce
> >> windows as
> >> >> > they finish. So for the batch runner, there is no performance
> >> advantage
> >> >> you
> >> >> > get for windowing the side input.
> >> >> >
> >> >> > The fact that BigQueryIO needs the schema side input to be globally
> >> >> > windowed is a bit confusing and not well documented. We should add
> >> better
> >> >> > javadoc explaining this.
> >> >> >
> >> >> > Reuven
> >> >> >
> >> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com>
> >> wrote:
> >> >> >
> >> >> >> batch on dataflow
> >> >> >>
> >> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax
> <relax@google.com.invalid
> >> >
> >> >> >> wrote:
> >> >> >> > Which runner are you using? And is this a batch pipeline?
> >> >> >> >
> >> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <chaim@behalf.com
> >
> >> >> wrote:
> >> >> >> >
> >> >> >> >> Thank for the answer, but i don't think that that is the case.
> >> From
> >> >> >> >> what i have seen, since i have other code to update status
> based
> >> on
> >> >> >> >> the window, it does get called before all the windows are
> >> calculated.
> >> >> >> >> There is no logical reason to wait, once the window has
> finished,
> >> the
> >> >> >> >> rest of the pipeline should run and the BigQuery should start
> to
> >> >> write
> >> >> >> >> the results.
> >> >> >> >>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> >> <relax@google.com.invalid
> >> >> >
> >> >> >> >> wrote:
> >> >> >> >> > Logically the BigQuery write does not depend on windows, and
> >> >> writing
> >> >> >> it
> >> >> >> >> > windowed would result in incorrect output. For this reason,
> >> >> BigQueryIO
> >> >> >> >> > rewindows int global windows before actually writing to
> >> BigQuery.
> >> >> >> >> >
> >> >> >> >> > If you are running in batch mode, there is no performance
> >> >> difference
> >> >> >> >> > between windowed and unwindowed side inputs. I believe that
> all
> >> of
> >> >> the
> >> >> >> >> > batch runners wait until all windows are calculated before
> >> >> >> materializing
> >> >> >> >> > the output.
> >> >> >> >> >
> >> >> >> >> > Reuven
> >> >> >> >> >
> >> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <
> chaim@behalf.com
> >> >
> >> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> the schema depends on the data per window.
> >> >> >> >> >> when i added the global window it works, but then i loose
> the
> >> >> >> >> >> performance, since the secound stage of writing will begin
> only
> >> >> after
> >> >> >> >> >> the side input has read all the data and updated the schema
> >> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
> >> that
> >> >> i
> >> >> >> >> >> don't know why?
> >> >> >> >> >>
> >> >> >> >> >> chaim
> >> >> >> >> >>
> >> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> >> >> > Are your schemas actually supposed to be different between
> >> >> >> different
> >> >> >> >> >> > windows, or do they depend only on data?
> >> >> >> >> >> > I see you have a commented-out Window.into(new
> >> GlobalWindows())
> >> >> for
> >> >> >> >> your
> >> >> >> >> >> > side input - did that work when it wasn't commented out?
> >> >> >> >> >> >
> >> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> >> chaim@behalf.com>
> >> >> >> wrote:
> >> >> >> >> >> >
> >> >> >> >> >> >> my code is:
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     //read docs from mongo
> >> >> >> >> >> >>                     final PCollection<Document> docs =
> >> pipeline
> >> >> >> >> >> >>                             .apply(table.getTableName(),
> >> >> >> >> >> MongoDbIO.read()
> >> >> >> >> >> >>
>  .withUri("mongodb://" +
> >> >> >> >> >> >> connectionParams)
> >> >> >> >> >> >>                                     .withFilter(filter)
> >> >> >> >> >> >>
>  .withDatabase(options.
> >> >> >> >> getDBName())
> >> >> >> >> >> >>
>  .withCollection(table.
> >> >> >> >> >> getTableName()))
> >> >> >> >> >> >>                             .apply("AddEventTimestamps",
> >> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >> >> >> >> >>                             .apply("Window Daily",
> >> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     //update bq schema based on window
> >> >> >> >> >> >>                     final PCollectionView<Map<String,
> >> String>>
> >> >> >> >> >> >> tableSchemas = docs
> >> >> >> >> >> >> //                            .apply("Global
> >> >> >> Window",Window.into(new
> >> >> >> >> >> >> GlobalWindows()))
> >> >> >> >> >> >>                             .apply("extract schema " +
> >> >> >> >> >> >> table.getTableName(), new
> >> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >> >> >> >> >>                             .apply("getTableSchemaMemory
> " +
> >> >> >> >> >> >> table.getTableName(),
> >> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >> >> >> >> >>                             .apply(View.asMap());
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     final PCollection<TableRow> docsRows
> =
> >> docs
> >> >> >> >> >> >>                             .apply("doc to row " +
> >> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >> >> >> getBqTableName(),
> >> >> >> >> >> >> tableSchemas))
> >> >> >> >> >> >>
> >> >>  .withSideInputs(tableSchemas))
> >> >> >> ;
> >> >> >> >> >> >>
> >> >> >> >> >> >>                     final WriteResult apply = docsRows
> >> >> >> >> >> >>                             .apply("insert data table -
> " +
> >> >> >> >> >> >> table.getTableName(),
> >> >> >> >> >> >>
> >>  BigQueryIO.writeTableRows()
> >> >> >> >> >> >>
> >> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >> >> >> >> >>
> >> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >> >> >> >> >> >>
> >> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >> >> >> CreateDisposition.CREATE_IF_
> >> >> >> >> >> NEEDED)
> >> >> >> >> >> >>
> >> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >> >> >> >> >>
> >> >> >> >> >> >>
> >> >> >> >> >> >> exception is:
> >> >> >> >> >> >>
> >> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> <init>
> >> >> >> >> >> >> INFO: Opening TableRowWriter to
> >> >> >> >> >> >>
> >> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> >> >> >> d9a12e4fdcfb.
> >> >> >> >> >> >> Exception in thread "main"
> >> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get
> side
> >> >> input
> >> >> >> >> window
> >> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> >> DirectPipelineResult.
> >> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> >> DirectPipelineResult.
> >> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> >> >> DirectRunner.java:200)
> >> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> >> >> DirectRunner.java:63)
> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> LoadMongodbDataPipeline.
> >> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> >> >> LoadMongodbDataPipeline.main(
> >> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException:
> Attempted to
> >> >> get
> >> >> >> side
> >> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >> >> PartitioningWindowFn$1.
> >> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> >> >> >> SimplePushbackSideInputDoFnRun
> >> >> >> >> >> ner.java:94)
> >> >> >> >> >> >> at
> >> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> >> processElementInReadyWindows(
> >> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter
> <init>
> >> >> >> >> >> >>
> >> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> >> >> >> > Please include the full exception and please show the
> code
> >> >> that
> >> >> >> >> >> produces
> >> >> >> >> >> >> it.
> >> >> >> >> >> >> > See also
> >> >> >> >> >> >> >
> >> >> >> >> >> >> https://beam.apache.org/documentation/programming-
> >> >> >> >> >> guide/#transforms-sideio
> >> >> >> >> >> >> > section
> >> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient
> to
> >> >> >> resolve
> >> >> >> >> your
> >> >> >> >> >> >> > problem.
> >> >> >> >> >> >> >
> >> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >> >> chaim@behalf.com>
> >> >> >> >> wrote:
> >> >> >> >> >> >> >
> >> >> >> >> >> >> >> Hi,
> >> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
> >> >> updates
> >> >> >> the
> >> >> >> >> >> >> >> schema and then adds the records to mongo. Since i
> want a
> >> >> >> >> partitioned
> >> >> >> >> >> >> >> table, i have a dally window.
> >> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
> >> >> >> exception
> >> >> >> >> of:
> >> >> >> >> >> >> >>
> >> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow
> from
> >> >> >> >> non-global
> >> >> >> >> >> >> >> WindowFn"
> >> >> >> >> >> >> >>
> >> >> >> >> >> >> >> chaim
> >> >> >> >> >> >> >>
> >> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
no that specific job created only 2 tables

On Tue, Sep 12, 2017 at 4:36 PM, Reuven Lax <re...@google.com.invalid> wrote:
> It looks like your job is creating about 14,45 distinct BigQuery tables.
> Does that sound correct to you?
>
> Reuven
>
> On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> the job id is 2017-09-12_02_57_55-5233544151932101752
>> as you can see the majority of the time is inserting into bigquery.
>> is there any way to parallel this?
>>
>> My feeling for the windowing is that writing should be done per window
>> (my window is daily) or at least to be able to configure it
>>
>> chaim
>>
>> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > So the problem is you are running on Dataflow, and it's taking longer
>> than
>> > you think it should? If you provide the Dataflow job id we can help you
>> > debug why it's taking 30 minutes. (and as an aside, if this turns into a
>> > Dataflow debugging session we should move it off of the Beam list and
>> onto
>> > a Dataflow-specific tread)
>> >
>> > Reuven
>> >
>> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> is there a way around this, my time for 13gb is not close to 30
>> >> minutes, while it should be around 15 minutes.
>> >> Do i need to chunk the code myself to windows, and run in parallel?
>> >> chaim
>> >>
>> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <re...@google.com.invalid>
>> >> wrote:
>> >> > In that case I can say unequivocally that Dataflow (in batch mode)
>> does
>> >> not
>> >> > produce results for a stage until it has processed that entire stage.
>> The
>> >> > reason for this is that the batch runner is optimized for throughput,
>> not
>> >> > latency; it wants to minimize the time for the entire job to finish,
>> not
>> >> > the time till first output. The side input will not be materialized
>> until
>> >> > all of the data for all of the windows of the side input have been
>> >> > processed. The streaming runner on the other hand will produce
>> windows as
>> >> > they finish. So for the batch runner, there is no performance
>> advantage
>> >> you
>> >> > get for windowing the side input.
>> >> >
>> >> > The fact that BigQueryIO needs the schema side input to be globally
>> >> > windowed is a bit confusing and not well documented. We should add
>> better
>> >> > javadoc explaining this.
>> >> >
>> >> > Reuven
>> >> >
>> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >
>> >> >> batch on dataflow
>> >> >>
>> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <relax@google.com.invalid
>> >
>> >> >> wrote:
>> >> >> > Which runner are you using? And is this a batch pipeline?
>> >> >> >
>> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >> >
>> >> >> >> Thank for the answer, but i don't think that that is the case.
>> From
>> >> >> >> what i have seen, since i have other code to update status based
>> on
>> >> >> >> the window, it does get called before all the windows are
>> calculated.
>> >> >> >> There is no logical reason to wait, once the window has finished,
>> the
>> >> >> >> rest of the pipeline should run and the BigQuery should start to
>> >> write
>> >> >> >> the results.
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
>> <relax@google.com.invalid
>> >> >
>> >> >> >> wrote:
>> >> >> >> > Logically the BigQuery write does not depend on windows, and
>> >> writing
>> >> >> it
>> >> >> >> > windowed would result in incorrect output. For this reason,
>> >> BigQueryIO
>> >> >> >> > rewindows int global windows before actually writing to
>> BigQuery.
>> >> >> >> >
>> >> >> >> > If you are running in batch mode, there is no performance
>> >> difference
>> >> >> >> > between windowed and unwindowed side inputs. I believe that all
>> of
>> >> the
>> >> >> >> > batch runners wait until all windows are calculated before
>> >> >> materializing
>> >> >> >> > the output.
>> >> >> >> >
>> >> >> >> > Reuven
>> >> >> >> >
>> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <chaim@behalf.com
>> >
>> >> >> wrote:
>> >> >> >> >
>> >> >> >> >> the schema depends on the data per window.
>> >> >> >> >> when i added the global window it works, but then i loose the
>> >> >> >> >> performance, since the secound stage of writing will begin only
>> >> after
>> >> >> >> >> the side input has read all the data and updated the schema
>> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
>> that
>> >> i
>> >> >> >> >> don't know why?
>> >> >> >> >>
>> >> >> >> >> chaim
>> >> >> >> >>
>> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> > Are your schemas actually supposed to be different between
>> >> >> different
>> >> >> >> >> > windows, or do they depend only on data?
>> >> >> >> >> > I see you have a commented-out Window.into(new
>> GlobalWindows())
>> >> for
>> >> >> >> your
>> >> >> >> >> > side input - did that work when it wasn't commented out?
>> >> >> >> >> >
>> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
>> chaim@behalf.com>
>> >> >> wrote:
>> >> >> >> >> >
>> >> >> >> >> >> my code is:
>> >> >> >> >> >>
>> >> >> >> >> >>                     //read docs from mongo
>> >> >> >> >> >>                     final PCollection<Document> docs =
>> pipeline
>> >> >> >> >> >>                             .apply(table.getTableName(),
>> >> >> >> >> MongoDbIO.read()
>> >> >> >> >> >>                                     .withUri("mongodb://" +
>> >> >> >> >> >> connectionParams)
>> >> >> >> >> >>                                     .withFilter(filter)
>> >> >> >> >> >>                                     .withDatabase(options.
>> >> >> >> getDBName())
>> >> >> >> >> >>                                     .withCollection(table.
>> >> >> >> >> getTableName()))
>> >> >> >> >> >>                             .apply("AddEventTimestamps",
>> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >> >> >> >>                             .apply("Window Daily",
>> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >> >> >> >> >>
>> >> >> >> >> >>                     //update bq schema based on window
>> >> >> >> >> >>                     final PCollectionView<Map<String,
>> String>>
>> >> >> >> >> >> tableSchemas = docs
>> >> >> >> >> >> //                            .apply("Global
>> >> >> Window",Window.into(new
>> >> >> >> >> >> GlobalWindows()))
>> >> >> >> >> >>                             .apply("extract schema " +
>> >> >> >> >> >> table.getTableName(), new
>> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >> >> >> >>                             .apply("getTableSchemaMemory " +
>> >> >> >> >> >> table.getTableName(),
>> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >> >> >> >>                             .apply(View.asMap());
>> >> >> >> >> >>
>> >> >> >> >> >>                     final PCollection<TableRow> docsRows =
>> docs
>> >> >> >> >> >>                             .apply("doc to row " +
>> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >> >> getBqTableName(),
>> >> >> >> >> >> tableSchemas))
>> >> >> >> >> >>
>> >>  .withSideInputs(tableSchemas))
>> >> >> ;
>> >> >> >> >> >>
>> >> >> >> >> >>                     final WriteResult apply = docsRows
>> >> >> >> >> >>                             .apply("insert data table - " +
>> >> >> >> >> >> table.getTableName(),
>> >> >> >> >> >>
>>  BigQueryIO.writeTableRows()
>> >> >> >> >> >>
>> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >> >> >> >>
>> >> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >> >> >> >> >>
>> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >> >> CreateDisposition.CREATE_IF_
>> >> >> >> >> NEEDED)
>> >> >> >> >> >>
>> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >> >> >> >>
>> >> >> >> >> >>
>> >> >> >> >> >> exception is:
>> >> >> >> >> >>
>> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> >> >> >> INFO: Opening TableRowWriter to
>> >> >> >> >> >>
>> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>> >> >> d9a12e4fdcfb.
>> >> >> >> >> >> Exception in thread "main"
>> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get side
>> >> input
>> >> >> >> window
>> >> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >> >> >> >> >> at
>> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> DirectPipelineResult.
>> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> >> >> >> at
>> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> >> DirectPipelineResult.
>> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> >> DirectRunner.java:200)
>> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> >> DirectRunner.java:63)
>> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> >> >> >> at
>> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> LoadMongodbDataPipeline.
>> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> >> >> >> at
>> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> >> LoadMongodbDataPipeline.main(
>> >> >> >> >> LoadMongodbDataPipeline.java:372)
>> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to
>> >> get
>> >> >> side
>> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> >> >> >> at
>> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> >> PartitioningWindowFn$1.
>> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> >> >> >> at
>> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >> >> >> SimplePushbackSideInputDoFnRun
>> >> >> >> >> ner.java:94)
>> >> >> >> >> >> at
>> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> >> SimplePushbackSideInputDoFnRunner.
>> processElementInReadyWindows(
>> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> >> >> >>
>> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> >> > Please include the full exception and please show the code
>> >> that
>> >> >> >> >> produces
>> >> >> >> >> >> it.
>> >> >> >> >> >> > See also
>> >> >> >> >> >> >
>> >> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >> >> >> >> guide/#transforms-sideio
>> >> >> >> >> >> > section
>> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient to
>> >> >> resolve
>> >> >> >> your
>> >> >> >> >> >> > problem.
>> >> >> >> >> >> >
>> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> >> chaim@behalf.com>
>> >> >> >> wrote:
>> >> >> >> >> >> >
>> >> >> >> >> >> >> Hi,
>> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
>> >> updates
>> >> >> the
>> >> >> >> >> >> >> schema and then adds the records to mongo. Since i want a
>> >> >> >> partitioned
>> >> >> >> >> >> >> table, i have a dally window.
>> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
>> >> >> exception
>> >> >> >> of:
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow from
>> >> >> >> non-global
>> >> >> >> >> >> >> WindowFn"
>> >> >> >> >> >> >>
>> >> >> >> >> >> >> chaim
>> >> >> >> >> >> >>
>> >> >> >> >> >>
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
It looks like your job is creating about 14,45 distinct BigQuery tables.
Does that sound correct to you?

Reuven

On Tue, Sep 12, 2017 at 6:22 AM, Chaim Turkel <ch...@behalf.com> wrote:

> the job id is 2017-09-12_02_57_55-5233544151932101752
> as you can see the majority of the time is inserting into bigquery.
> is there any way to parallel this?
>
> My feeling for the windowing is that writing should be done per window
> (my window is daily) or at least to be able to configure it
>
> chaim
>
> On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > So the problem is you are running on Dataflow, and it's taking longer
> than
> > you think it should? If you provide the Dataflow job id we can help you
> > debug why it's taking 30 minutes. (and as an aside, if this turns into a
> > Dataflow debugging session we should move it off of the Beam list and
> onto
> > a Dataflow-specific tread)
> >
> > Reuven
> >
> > On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> is there a way around this, my time for 13gb is not close to 30
> >> minutes, while it should be around 15 minutes.
> >> Do i need to chunk the code myself to windows, and run in parallel?
> >> chaim
> >>
> >> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >> > In that case I can say unequivocally that Dataflow (in batch mode)
> does
> >> not
> >> > produce results for a stage until it has processed that entire stage.
> The
> >> > reason for this is that the batch runner is optimized for throughput,
> not
> >> > latency; it wants to minimize the time for the entire job to finish,
> not
> >> > the time till first output. The side input will not be materialized
> until
> >> > all of the data for all of the windows of the side input have been
> >> > processed. The streaming runner on the other hand will produce
> windows as
> >> > they finish. So for the batch runner, there is no performance
> advantage
> >> you
> >> > get for windowing the side input.
> >> >
> >> > The fact that BigQueryIO needs the schema side input to be globally
> >> > windowed is a bit confusing and not well documented. We should add
> better
> >> > javadoc explaining this.
> >> >
> >> > Reuven
> >> >
> >> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >
> >> >> batch on dataflow
> >> >>
> >> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <relax@google.com.invalid
> >
> >> >> wrote:
> >> >> > Which runner are you using? And is this a batch pipeline?
> >> >> >
> >> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com>
> >> wrote:
> >> >> >
> >> >> >> Thank for the answer, but i don't think that that is the case.
> From
> >> >> >> what i have seen, since i have other code to update status based
> on
> >> >> >> the window, it does get called before all the windows are
> calculated.
> >> >> >> There is no logical reason to wait, once the window has finished,
> the
> >> >> >> rest of the pipeline should run and the BigQuery should start to
> >> write
> >> >> >> the results.
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax
> <relax@google.com.invalid
> >> >
> >> >> >> wrote:
> >> >> >> > Logically the BigQuery write does not depend on windows, and
> >> writing
> >> >> it
> >> >> >> > windowed would result in incorrect output. For this reason,
> >> BigQueryIO
> >> >> >> > rewindows int global windows before actually writing to
> BigQuery.
> >> >> >> >
> >> >> >> > If you are running in batch mode, there is no performance
> >> difference
> >> >> >> > between windowed and unwindowed side inputs. I believe that all
> of
> >> the
> >> >> >> > batch runners wait until all windows are calculated before
> >> >> materializing
> >> >> >> > the output.
> >> >> >> >
> >> >> >> > Reuven
> >> >> >> >
> >> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <chaim@behalf.com
> >
> >> >> wrote:
> >> >> >> >
> >> >> >> >> the schema depends on the data per window.
> >> >> >> >> when i added the global window it works, but then i loose the
> >> >> >> >> performance, since the secound stage of writing will begin only
> >> after
> >> >> >> >> the side input has read all the data and updated the schema
> >> >> >> >> The batchmode of the BigqueryIO seems to use a global window
> that
> >> i
> >> >> >> >> don't know why?
> >> >> >> >>
> >> >> >> >> chaim
> >> >> >> >>
> >> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> >> > Are your schemas actually supposed to be different between
> >> >> different
> >> >> >> >> > windows, or do they depend only on data?
> >> >> >> >> > I see you have a commented-out Window.into(new
> GlobalWindows())
> >> for
> >> >> >> your
> >> >> >> >> > side input - did that work when it wasn't commented out?
> >> >> >> >> >
> >> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <
> chaim@behalf.com>
> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> my code is:
> >> >> >> >> >>
> >> >> >> >> >>                     //read docs from mongo
> >> >> >> >> >>                     final PCollection<Document> docs =
> pipeline
> >> >> >> >> >>                             .apply(table.getTableName(),
> >> >> >> >> MongoDbIO.read()
> >> >> >> >> >>                                     .withUri("mongodb://" +
> >> >> >> >> >> connectionParams)
> >> >> >> >> >>                                     .withFilter(filter)
> >> >> >> >> >>                                     .withDatabase(options.
> >> >> >> getDBName())
> >> >> >> >> >>                                     .withCollection(table.
> >> >> >> >> getTableName()))
> >> >> >> >> >>                             .apply("AddEventTimestamps",
> >> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >> >> >> >>                             .apply("Window Daily",
> >> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >> >> >> >>
> >> >> >> >> >>                     //update bq schema based on window
> >> >> >> >> >>                     final PCollectionView<Map<String,
> String>>
> >> >> >> >> >> tableSchemas = docs
> >> >> >> >> >> //                            .apply("Global
> >> >> Window",Window.into(new
> >> >> >> >> >> GlobalWindows()))
> >> >> >> >> >>                             .apply("extract schema " +
> >> >> >> >> >> table.getTableName(), new
> >> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >> >> >> >>                             .apply("getTableSchemaMemory " +
> >> >> >> >> >> table.getTableName(),
> >> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >> >> >> >>                             .apply(View.asMap());
> >> >> >> >> >>
> >> >> >> >> >>                     final PCollection<TableRow> docsRows =
> docs
> >> >> >> >> >>                             .apply("doc to row " +
> >> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >> >> getBqTableName(),
> >> >> >> >> >> tableSchemas))
> >> >> >> >> >>
> >>  .withSideInputs(tableSchemas))
> >> >> ;
> >> >> >> >> >>
> >> >> >> >> >>                     final WriteResult apply = docsRows
> >> >> >> >> >>                             .apply("insert data table - " +
> >> >> >> >> >> table.getTableName(),
> >> >> >> >> >>
>  BigQueryIO.writeTableRows()
> >> >> >> >> >>
> >> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >> >> >> >>
> >> >> >> >> >> .withSchemaFromView(tableSchemas)
> >> >> >> >> >>
> >> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >> >> CreateDisposition.CREATE_IF_
> >> >> >> >> NEEDED)
> >> >> >> >> >>
> >> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >> >> exception is:
> >> >> >> >> >>
> >> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >> >> >> INFO: Opening TableRowWriter to
> >> >> >> >> >>
> >> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> >> >> d9a12e4fdcfb.
> >> >> >> >> >> Exception in thread "main"
> >> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get side
> >> input
> >> >> >> window
> >> >> >> >> >> for GlobalWindow from non-global WindowFn
> >> >> >> >> >> at
> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> DirectPipelineResult.
> >> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >> >> >> >> at
> >> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> >> DirectPipelineResult.
> >> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> >> DirectRunner.java:200)
> >> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> >> DirectRunner.java:63)
> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> >> >> >> at
> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> LoadMongodbDataPipeline.
> >> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> >> >> >> at
> >> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> >> LoadMongodbDataPipeline.main(
> >> >> >> >> LoadMongodbDataPipeline.java:372)
> >> >> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to
> >> get
> >> >> side
> >> >> >> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> >> >> >> at
> >> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> >> PartitioningWindowFn$1.
> >> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> >> >> >> at
> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> >> >> SimplePushbackSideInputDoFnRun
> >> >> >> >> ner.java:94)
> >> >> >> >> >> at
> >> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> >> SimplePushbackSideInputDoFnRunner.
> processElementInReadyWindows(
> >> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >> >> >>
> >> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> >> >> > Please include the full exception and please show the code
> >> that
> >> >> >> >> produces
> >> >> >> >> >> it.
> >> >> >> >> >> > See also
> >> >> >> >> >> >
> >> >> >> >> >> https://beam.apache.org/documentation/programming-
> >> >> >> >> guide/#transforms-sideio
> >> >> >> >> >> > section
> >> >> >> >> >> > "Side inputs and windowing" - that might be sufficient to
> >> >> resolve
> >> >> >> your
> >> >> >> >> >> > problem.
> >> >> >> >> >> >
> >> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> >> chaim@behalf.com>
> >> >> >> wrote:
> >> >> >> >> >> >
> >> >> >> >> >> >> Hi,
> >> >> >> >> >> >>   I have a pipline that bases on documents from mongo
> >> updates
> >> >> the
> >> >> >> >> >> >> schema and then adds the records to mongo. Since i want a
> >> >> >> partitioned
> >> >> >> >> >> >> table, i have a dally window.
> >> >> >> >> >> >> How do i get the schema view to be a window, i get the
> >> >> exception
> >> >> >> of:
> >> >> >> >> >> >>
> >> >> >> >> >> >> Attempted to get side input window for GlobalWindow from
> >> >> >> non-global
> >> >> >> >> >> >> WindowFn"
> >> >> >> >> >> >>
> >> >> >> >> >> >> chaim
> >> >> >> >> >> >>
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
the job id is 2017-09-12_02_57_55-5233544151932101752
as you can see the majority of the time is inserting into bigquery.
is there any way to parallel this?

My feeling for the windowing is that writing should be done per window
(my window is daily) or at least to be able to configure it

chaim

On Tue, Sep 12, 2017 at 4:10 PM, Reuven Lax <re...@google.com.invalid> wrote:
> So the problem is you are running on Dataflow, and it's taking longer than
> you think it should? If you provide the Dataflow job id we can help you
> debug why it's taking 30 minutes. (and as an aside, if this turns into a
> Dataflow debugging session we should move it off of the Beam list and onto
> a Dataflow-specific tread)
>
> Reuven
>
> On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> is there a way around this, my time for 13gb is not close to 30
>> minutes, while it should be around 15 minutes.
>> Do i need to chunk the code myself to windows, and run in parallel?
>> chaim
>>
>> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > In that case I can say unequivocally that Dataflow (in batch mode) does
>> not
>> > produce results for a stage until it has processed that entire stage. The
>> > reason for this is that the batch runner is optimized for throughput, not
>> > latency; it wants to minimize the time for the entire job to finish, not
>> > the time till first output. The side input will not be materialized until
>> > all of the data for all of the windows of the side input have been
>> > processed. The streaming runner on the other hand will produce windows as
>> > they finish. So for the batch runner, there is no performance advantage
>> you
>> > get for windowing the side input.
>> >
>> > The fact that BigQueryIO needs the schema side input to be globally
>> > windowed is a bit confusing and not well documented. We should add better
>> > javadoc explaining this.
>> >
>> > Reuven
>> >
>> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> batch on dataflow
>> >>
>> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <re...@google.com.invalid>
>> >> wrote:
>> >> > Which runner are you using? And is this a batch pipeline?
>> >> >
>> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >
>> >> >> Thank for the answer, but i don't think that that is the case. From
>> >> >> what i have seen, since i have other code to update status based on
>> >> >> the window, it does get called before all the windows are calculated.
>> >> >> There is no logical reason to wait, once the window has finished, the
>> >> >> rest of the pipeline should run and the BigQuery should start to
>> write
>> >> >> the results.
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <relax@google.com.invalid
>> >
>> >> >> wrote:
>> >> >> > Logically the BigQuery write does not depend on windows, and
>> writing
>> >> it
>> >> >> > windowed would result in incorrect output. For this reason,
>> BigQueryIO
>> >> >> > rewindows int global windows before actually writing to BigQuery.
>> >> >> >
>> >> >> > If you are running in batch mode, there is no performance
>> difference
>> >> >> > between windowed and unwindowed side inputs. I believe that all of
>> the
>> >> >> > batch runners wait until all windows are calculated before
>> >> materializing
>> >> >> > the output.
>> >> >> >
>> >> >> > Reuven
>> >> >> >
>> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >> >
>> >> >> >> the schema depends on the data per window.
>> >> >> >> when i added the global window it works, but then i loose the
>> >> >> >> performance, since the secound stage of writing will begin only
>> after
>> >> >> >> the side input has read all the data and updated the schema
>> >> >> >> The batchmode of the BigqueryIO seems to use a global window that
>> i
>> >> >> >> don't know why?
>> >> >> >>
>> >> >> >> chaim
>> >> >> >>
>> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> > Are your schemas actually supposed to be different between
>> >> different
>> >> >> >> > windows, or do they depend only on data?
>> >> >> >> > I see you have a commented-out Window.into(new GlobalWindows())
>> for
>> >> >> your
>> >> >> >> > side input - did that work when it wasn't commented out?
>> >> >> >> >
>> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >> >> >
>> >> >> >> >> my code is:
>> >> >> >> >>
>> >> >> >> >>                     //read docs from mongo
>> >> >> >> >>                     final PCollection<Document> docs = pipeline
>> >> >> >> >>                             .apply(table.getTableName(),
>> >> >> >> MongoDbIO.read()
>> >> >> >> >>                                     .withUri("mongodb://" +
>> >> >> >> >> connectionParams)
>> >> >> >> >>                                     .withFilter(filter)
>> >> >> >> >>                                     .withDatabase(options.
>> >> >> getDBName())
>> >> >> >> >>                                     .withCollection(table.
>> >> >> >> getTableName()))
>> >> >> >> >>                             .apply("AddEventTimestamps",
>> >> >> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >> >> >>                             .apply("Window Daily",
>> >> >> >> >> Window.into(CalendarWindows.days(1)));
>> >> >> >> >>
>> >> >> >> >>                     //update bq schema based on window
>> >> >> >> >>                     final PCollectionView<Map<String, String>>
>> >> >> >> >> tableSchemas = docs
>> >> >> >> >> //                            .apply("Global
>> >> Window",Window.into(new
>> >> >> >> >> GlobalWindows()))
>> >> >> >> >>                             .apply("extract schema " +
>> >> >> >> >> table.getTableName(), new
>> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >> >> >>                             .apply("getTableSchemaMemory " +
>> >> >> >> >> table.getTableName(),
>> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >> >> >>                             .apply(View.asMap());
>> >> >> >> >>
>> >> >> >> >>                     final PCollection<TableRow> docsRows = docs
>> >> >> >> >>                             .apply("doc to row " +
>> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> >> getBqTableName(),
>> >> >> >> >> tableSchemas))
>> >> >> >> >>
>>  .withSideInputs(tableSchemas))
>> >> ;
>> >> >> >> >>
>> >> >> >> >>                     final WriteResult apply = docsRows
>> >> >> >> >>                             .apply("insert data table - " +
>> >> >> >> >> table.getTableName(),
>> >> >> >> >>                                     BigQueryIO.writeTableRows()
>> >> >> >> >>
>> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >> >> >>
>> >> >> >> >> .withSchemaFromView(tableSchemas)
>> >> >> >> >>
>> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> >> CreateDisposition.CREATE_IF_
>> >> >> >> NEEDED)
>> >> >> >> >>
>> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> exception is:
>> >> >> >> >>
>> >> >> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> >> >> INFO: Opening TableRowWriter to
>> >> >> >> >>
>> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>> >> d9a12e4fdcfb.
>> >> >> >> >> Exception in thread "main"
>> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get side
>> input
>> >> >> window
>> >> >> >> >> for GlobalWindow from non-global WindowFn
>> >> >> >> >> at
>> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> DirectPipelineResult.
>> >> >> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> >> >> at
>> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
>> DirectPipelineResult.
>> >> >> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> DirectRunner.java:200)
>> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> >> DirectRunner.java:63)
>> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> >> >> at
>> >> >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
>> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> >> >> at
>> >> >> >> >> com.behalf.migration.dataflow.mongodb.
>> >> LoadMongodbDataPipeline.main(
>> >> >> >> LoadMongodbDataPipeline.java:372)
>> >> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to
>> get
>> >> side
>> >> >> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> >> >> at
>> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
>> PartitioningWindowFn$1.
>> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> >> >> at
>> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >> >> SimplePushbackSideInputDoFnRun
>> >> >> >> ner.java:94)
>> >> >> >> >> at
>> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
>> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> >> >>
>> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> >> > Please include the full exception and please show the code
>> that
>> >> >> >> produces
>> >> >> >> >> it.
>> >> >> >> >> > See also
>> >> >> >> >> >
>> >> >> >> >> https://beam.apache.org/documentation/programming-
>> >> >> >> guide/#transforms-sideio
>> >> >> >> >> > section
>> >> >> >> >> > "Side inputs and windowing" - that might be sufficient to
>> >> resolve
>> >> >> your
>> >> >> >> >> > problem.
>> >> >> >> >> >
>> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
>> chaim@behalf.com>
>> >> >> wrote:
>> >> >> >> >> >
>> >> >> >> >> >> Hi,
>> >> >> >> >> >>   I have a pipline that bases on documents from mongo
>> updates
>> >> the
>> >> >> >> >> >> schema and then adds the records to mongo. Since i want a
>> >> >> partitioned
>> >> >> >> >> >> table, i have a dally window.
>> >> >> >> >> >> How do i get the schema view to be a window, i get the
>> >> exception
>> >> >> of:
>> >> >> >> >> >>
>> >> >> >> >> >> Attempted to get side input window for GlobalWindow from
>> >> >> non-global
>> >> >> >> >> >> WindowFn"
>> >> >> >> >> >>
>> >> >> >> >> >> chaim
>> >> >> >> >> >>
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
So the problem is you are running on Dataflow, and it's taking longer than
you think it should? If you provide the Dataflow job id we can help you
debug why it's taking 30 minutes. (and as an aside, if this turns into a
Dataflow debugging session we should move it off of the Beam list and onto
a Dataflow-specific tread)

Reuven

On Tue, Sep 12, 2017 at 3:28 AM, Chaim Turkel <ch...@behalf.com> wrote:

> is there a way around this, my time for 13gb is not close to 30
> minutes, while it should be around 15 minutes.
> Do i need to chunk the code myself to windows, and run in parallel?
> chaim
>
> On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > In that case I can say unequivocally that Dataflow (in batch mode) does
> not
> > produce results for a stage until it has processed that entire stage. The
> > reason for this is that the batch runner is optimized for throughput, not
> > latency; it wants to minimize the time for the entire job to finish, not
> > the time till first output. The side input will not be materialized until
> > all of the data for all of the windows of the side input have been
> > processed. The streaming runner on the other hand will produce windows as
> > they finish. So for the batch runner, there is no performance advantage
> you
> > get for windowing the side input.
> >
> > The fact that BigQueryIO needs the schema side input to be globally
> > windowed is a bit confusing and not well documented. We should add better
> > javadoc explaining this.
> >
> > Reuven
> >
> > On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> batch on dataflow
> >>
> >> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >> > Which runner are you using? And is this a batch pipeline?
> >> >
> >> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >
> >> >> Thank for the answer, but i don't think that that is the case. From
> >> >> what i have seen, since i have other code to update status based on
> >> >> the window, it does get called before all the windows are calculated.
> >> >> There is no logical reason to wait, once the window has finished, the
> >> >> rest of the pipeline should run and the BigQuery should start to
> write
> >> >> the results.
> >> >>
> >> >>
> >> >>
> >> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <relax@google.com.invalid
> >
> >> >> wrote:
> >> >> > Logically the BigQuery write does not depend on windows, and
> writing
> >> it
> >> >> > windowed would result in incorrect output. For this reason,
> BigQueryIO
> >> >> > rewindows int global windows before actually writing to BigQuery.
> >> >> >
> >> >> > If you are running in batch mode, there is no performance
> difference
> >> >> > between windowed and unwindowed side inputs. I believe that all of
> the
> >> >> > batch runners wait until all windows are calculated before
> >> materializing
> >> >> > the output.
> >> >> >
> >> >> > Reuven
> >> >> >
> >> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com>
> >> wrote:
> >> >> >
> >> >> >> the schema depends on the data per window.
> >> >> >> when i added the global window it works, but then i loose the
> >> >> >> performance, since the secound stage of writing will begin only
> after
> >> >> >> the side input has read all the data and updated the schema
> >> >> >> The batchmode of the BigqueryIO seems to use a global window that
> i
> >> >> >> don't know why?
> >> >> >>
> >> >> >> chaim
> >> >> >>
> >> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> > Are your schemas actually supposed to be different between
> >> different
> >> >> >> > windows, or do they depend only on data?
> >> >> >> > I see you have a commented-out Window.into(new GlobalWindows())
> for
> >> >> your
> >> >> >> > side input - did that work when it wasn't commented out?
> >> >> >> >
> >> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com>
> >> wrote:
> >> >> >> >
> >> >> >> >> my code is:
> >> >> >> >>
> >> >> >> >>                     //read docs from mongo
> >> >> >> >>                     final PCollection<Document> docs = pipeline
> >> >> >> >>                             .apply(table.getTableName(),
> >> >> >> MongoDbIO.read()
> >> >> >> >>                                     .withUri("mongodb://" +
> >> >> >> >> connectionParams)
> >> >> >> >>                                     .withFilter(filter)
> >> >> >> >>                                     .withDatabase(options.
> >> >> getDBName())
> >> >> >> >>                                     .withCollection(table.
> >> >> >> getTableName()))
> >> >> >> >>                             .apply("AddEventTimestamps",
> >> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >> >> >>                             .apply("Window Daily",
> >> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >> >> >>
> >> >> >> >>                     //update bq schema based on window
> >> >> >> >>                     final PCollectionView<Map<String, String>>
> >> >> >> >> tableSchemas = docs
> >> >> >> >> //                            .apply("Global
> >> Window",Window.into(new
> >> >> >> >> GlobalWindows()))
> >> >> >> >>                             .apply("extract schema " +
> >> >> >> >> table.getTableName(), new
> >> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >> >> >>                             .apply("getTableSchemaMemory " +
> >> >> >> >> table.getTableName(),
> >> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >> >> >>                             .apply(View.asMap());
> >> >> >> >>
> >> >> >> >>                     final PCollection<TableRow> docsRows = docs
> >> >> >> >>                             .apply("doc to row " +
> >> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> >> getBqTableName(),
> >> >> >> >> tableSchemas))
> >> >> >> >>
>  .withSideInputs(tableSchemas))
> >> ;
> >> >> >> >>
> >> >> >> >>                     final WriteResult apply = docsRows
> >> >> >> >>                             .apply("insert data table - " +
> >> >> >> >> table.getTableName(),
> >> >> >> >>                                     BigQueryIO.writeTableRows()
> >> >> >> >>
> >> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >> >> >>
> >> >> >> >> .withSchemaFromView(tableSchemas)
> >> >> >> >>
> >> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> >> CreateDisposition.CREATE_IF_
> >> >> >> NEEDED)
> >> >> >> >>
> >> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> exception is:
> >> >> >> >>
> >> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >> >> INFO: Opening TableRowWriter to
> >> >> >> >>
> >> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> >> d9a12e4fdcfb.
> >> >> >> >> Exception in thread "main"
> >> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> >> >> java.lang.IllegalArgumentException: Attempted to get side
> input
> >> >> window
> >> >> >> >> for GlobalWindow from non-global WindowFn
> >> >> >> >> at
> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> DirectPipelineResult.
> >> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >> >> >> at
> >> >> >> >> org.apache.beam.runners.direct.DirectRunner$
> DirectPipelineResult.
> >> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> DirectRunner.java:200)
> >> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> >> DirectRunner.java:63)
> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> >> >> at
> >> >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> >> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> >> >> at
> >> >> >> >> com.behalf.migration.dataflow.mongodb.
> >> LoadMongodbDataPipeline.main(
> >> >> >> LoadMongodbDataPipeline.java:372)
> >> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to
> get
> >> side
> >> >> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> >> >> at
> >> >> >> >> org.apache.beam.sdk.transforms.windowing.
> PartitioningWindowFn$1.
> >> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> >> >> at
> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> >> SimplePushbackSideInputDoFnRun
> >> >> >> ner.java:94)
> >> >> >> >> at
> >> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> >> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >> >>
> >> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> >> > Please include the full exception and please show the code
> that
> >> >> >> produces
> >> >> >> >> it.
> >> >> >> >> > See also
> >> >> >> >> >
> >> >> >> >> https://beam.apache.org/documentation/programming-
> >> >> >> guide/#transforms-sideio
> >> >> >> >> > section
> >> >> >> >> > "Side inputs and windowing" - that might be sufficient to
> >> resolve
> >> >> your
> >> >> >> >> > problem.
> >> >> >> >> >
> >> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <
> chaim@behalf.com>
> >> >> wrote:
> >> >> >> >> >
> >> >> >> >> >> Hi,
> >> >> >> >> >>   I have a pipline that bases on documents from mongo
> updates
> >> the
> >> >> >> >> >> schema and then adds the records to mongo. Since i want a
> >> >> partitioned
> >> >> >> >> >> table, i have a dally window.
> >> >> >> >> >> How do i get the schema view to be a window, i get the
> >> exception
> >> >> of:
> >> >> >> >> >>
> >> >> >> >> >> Attempted to get side input window for GlobalWindow from
> >> >> non-global
> >> >> >> >> >> WindowFn"
> >> >> >> >> >>
> >> >> >> >> >> chaim
> >> >> >> >> >>
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
is there a way around this, my time for 13gb is not close to 30
minutes, while it should be around 15 minutes.
Do i need to chunk the code myself to windows, and run in parallel?
chaim

On Sun, Sep 10, 2017 at 6:32 PM, Reuven Lax <re...@google.com.invalid> wrote:
> In that case I can say unequivocally that Dataflow (in batch mode) does not
> produce results for a stage until it has processed that entire stage. The
> reason for this is that the batch runner is optimized for throughput, not
> latency; it wants to minimize the time for the entire job to finish, not
> the time till first output. The side input will not be materialized until
> all of the data for all of the windows of the side input have been
> processed. The streaming runner on the other hand will produce windows as
> they finish. So for the batch runner, there is no performance advantage you
> get for windowing the side input.
>
> The fact that BigQueryIO needs the schema side input to be globally
> windowed is a bit confusing and not well documented. We should add better
> javadoc explaining this.
>
> Reuven
>
> On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> batch on dataflow
>>
>> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > Which runner are you using? And is this a batch pipeline?
>> >
>> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> Thank for the answer, but i don't think that that is the case. From
>> >> what i have seen, since i have other code to update status based on
>> >> the window, it does get called before all the windows are calculated.
>> >> There is no logical reason to wait, once the window has finished, the
>> >> rest of the pipeline should run and the BigQuery should start to write
>> >> the results.
>> >>
>> >>
>> >>
>> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <re...@google.com.invalid>
>> >> wrote:
>> >> > Logically the BigQuery write does not depend on windows, and writing
>> it
>> >> > windowed would result in incorrect output. For this reason, BigQueryIO
>> >> > rewindows int global windows before actually writing to BigQuery.
>> >> >
>> >> > If you are running in batch mode, there is no performance difference
>> >> > between windowed and unwindowed side inputs. I believe that all of the
>> >> > batch runners wait until all windows are calculated before
>> materializing
>> >> > the output.
>> >> >
>> >> > Reuven
>> >> >
>> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >
>> >> >> the schema depends on the data per window.
>> >> >> when i added the global window it works, but then i loose the
>> >> >> performance, since the secound stage of writing will begin only after
>> >> >> the side input has read all the data and updated the schema
>> >> >> The batchmode of the BigqueryIO seems to use a global window that i
>> >> >> don't know why?
>> >> >>
>> >> >> chaim
>> >> >>
>> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> >> <ki...@google.com.invalid> wrote:
>> >> >> > Are your schemas actually supposed to be different between
>> different
>> >> >> > windows, or do they depend only on data?
>> >> >> > I see you have a commented-out Window.into(new GlobalWindows()) for
>> >> your
>> >> >> > side input - did that work when it wasn't commented out?
>> >> >> >
>> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >> >
>> >> >> >> my code is:
>> >> >> >>
>> >> >> >>                     //read docs from mongo
>> >> >> >>                     final PCollection<Document> docs = pipeline
>> >> >> >>                             .apply(table.getTableName(),
>> >> >> MongoDbIO.read()
>> >> >> >>                                     .withUri("mongodb://" +
>> >> >> >> connectionParams)
>> >> >> >>                                     .withFilter(filter)
>> >> >> >>                                     .withDatabase(options.
>> >> getDBName())
>> >> >> >>                                     .withCollection(table.
>> >> >> getTableName()))
>> >> >> >>                             .apply("AddEventTimestamps",
>> >> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >> >>                             .apply("Window Daily",
>> >> >> >> Window.into(CalendarWindows.days(1)));
>> >> >> >>
>> >> >> >>                     //update bq schema based on window
>> >> >> >>                     final PCollectionView<Map<String, String>>
>> >> >> >> tableSchemas = docs
>> >> >> >> //                            .apply("Global
>> Window",Window.into(new
>> >> >> >> GlobalWindows()))
>> >> >> >>                             .apply("extract schema " +
>> >> >> >> table.getTableName(), new
>> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >> >>                             .apply("getTableSchemaMemory " +
>> >> >> >> table.getTableName(),
>> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >> >>                             .apply(View.asMap());
>> >> >> >>
>> >> >> >>                     final PCollection<TableRow> docsRows = docs
>> >> >> >>                             .apply("doc to row " +
>> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
>> getBqTableName(),
>> >> >> >> tableSchemas))
>> >> >> >>                                     .withSideInputs(tableSchemas))
>> ;
>> >> >> >>
>> >> >> >>                     final WriteResult apply = docsRows
>> >> >> >>                             .apply("insert data table - " +
>> >> >> >> table.getTableName(),
>> >> >> >>                                     BigQueryIO.writeTableRows()
>> >> >> >>
>> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >> >>
>> >> >> >> .withSchemaFromView(tableSchemas)
>> >> >> >>
>> >> >> >> .withCreateDisposition(BigQueryIO.Write.
>> CreateDisposition.CREATE_IF_
>> >> >> NEEDED)
>> >> >> >>
>> >> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >> >>
>> >> >> >>
>> >> >> >> exception is:
>> >> >> >>
>> >> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> >> INFO: Opening TableRowWriter to
>> >> >> >>
>> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
>> d9a12e4fdcfb.
>> >> >> >> Exception in thread "main"
>> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> >> java.lang.IllegalArgumentException: Attempted to get side input
>> >> window
>> >> >> >> for GlobalWindow from non-global WindowFn
>> >> >> >> at
>> >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> >> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> >> at
>> >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> >> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> DirectRunner.java:200)
>> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> >> DirectRunner.java:63)
>> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> >> at
>> >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
>> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> >> at
>> >> >> >> com.behalf.migration.dataflow.mongodb.
>> LoadMongodbDataPipeline.main(
>> >> >> LoadMongodbDataPipeline.java:372)
>> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get
>> side
>> >> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> >> at
>> >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
>> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> >> at
>> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> SimplePushbackSideInputDoFnRunner.isReady(
>> >> SimplePushbackSideInputDoFnRun
>> >> >> ner.java:94)
>> >> >> >> at
>> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
>> >> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> >>
>> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> >> <ki...@google.com.invalid> wrote:
>> >> >> >> > Please include the full exception and please show the code that
>> >> >> produces
>> >> >> >> it.
>> >> >> >> > See also
>> >> >> >> >
>> >> >> >> https://beam.apache.org/documentation/programming-
>> >> >> guide/#transforms-sideio
>> >> >> >> > section
>> >> >> >> > "Side inputs and windowing" - that might be sufficient to
>> resolve
>> >> your
>> >> >> >> > problem.
>> >> >> >> >
>> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com>
>> >> wrote:
>> >> >> >> >
>> >> >> >> >> Hi,
>> >> >> >> >>   I have a pipline that bases on documents from mongo updates
>> the
>> >> >> >> >> schema and then adds the records to mongo. Since i want a
>> >> partitioned
>> >> >> >> >> table, i have a dally window.
>> >> >> >> >> How do i get the schema view to be a window, i get the
>> exception
>> >> of:
>> >> >> >> >>
>> >> >> >> >> Attempted to get side input window for GlobalWindow from
>> >> non-global
>> >> >> >> >> WindowFn"
>> >> >> >> >>
>> >> >> >> >> chaim
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
In that case I can say unequivocally that Dataflow (in batch mode) does not
produce results for a stage until it has processed that entire stage. The
reason for this is that the batch runner is optimized for throughput, not
latency; it wants to minimize the time for the entire job to finish, not
the time till first output. The side input will not be materialized until
all of the data for all of the windows of the side input have been
processed. The streaming runner on the other hand will produce windows as
they finish. So for the batch runner, there is no performance advantage you
get for windowing the side input.

The fact that BigQueryIO needs the schema side input to be globally
windowed is a bit confusing and not well documented. We should add better
javadoc explaining this.

Reuven

On Sun, Sep 10, 2017 at 12:50 AM, Chaim Turkel <ch...@behalf.com> wrote:

> batch on dataflow
>
> On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > Which runner are you using? And is this a batch pipeline?
> >
> > On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> Thank for the answer, but i don't think that that is the case. From
> >> what i have seen, since i have other code to update status based on
> >> the window, it does get called before all the windows are calculated.
> >> There is no logical reason to wait, once the window has finished, the
> >> rest of the pipeline should run and the BigQuery should start to write
> >> the results.
> >>
> >>
> >>
> >> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <re...@google.com.invalid>
> >> wrote:
> >> > Logically the BigQuery write does not depend on windows, and writing
> it
> >> > windowed would result in incorrect output. For this reason, BigQueryIO
> >> > rewindows int global windows before actually writing to BigQuery.
> >> >
> >> > If you are running in batch mode, there is no performance difference
> >> > between windowed and unwindowed side inputs. I believe that all of the
> >> > batch runners wait until all windows are calculated before
> materializing
> >> > the output.
> >> >
> >> > Reuven
> >> >
> >> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >
> >> >> the schema depends on the data per window.
> >> >> when i added the global window it works, but then i loose the
> >> >> performance, since the secound stage of writing will begin only after
> >> >> the side input has read all the data and updated the schema
> >> >> The batchmode of the BigqueryIO seems to use a global window that i
> >> >> don't know why?
> >> >>
> >> >> chaim
> >> >>
> >> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> >> <ki...@google.com.invalid> wrote:
> >> >> > Are your schemas actually supposed to be different between
> different
> >> >> > windows, or do they depend only on data?
> >> >> > I see you have a commented-out Window.into(new GlobalWindows()) for
> >> your
> >> >> > side input - did that work when it wasn't commented out?
> >> >> >
> >> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >> >
> >> >> >> my code is:
> >> >> >>
> >> >> >>                     //read docs from mongo
> >> >> >>                     final PCollection<Document> docs = pipeline
> >> >> >>                             .apply(table.getTableName(),
> >> >> MongoDbIO.read()
> >> >> >>                                     .withUri("mongodb://" +
> >> >> >> connectionParams)
> >> >> >>                                     .withFilter(filter)
> >> >> >>                                     .withDatabase(options.
> >> getDBName())
> >> >> >>                                     .withCollection(table.
> >> >> getTableName()))
> >> >> >>                             .apply("AddEventTimestamps",
> >> >> >> WithTimestamps.of((Document doc) -> new
> >> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >> >>                             .apply("Window Daily",
> >> >> >> Window.into(CalendarWindows.days(1)));
> >> >> >>
> >> >> >>                     //update bq schema based on window
> >> >> >>                     final PCollectionView<Map<String, String>>
> >> >> >> tableSchemas = docs
> >> >> >> //                            .apply("Global
> Window",Window.into(new
> >> >> >> GlobalWindows()))
> >> >> >>                             .apply("extract schema " +
> >> >> >> table.getTableName(), new
> >> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >> >>                             .apply("getTableSchemaMemory " +
> >> >> >> table.getTableName(),
> >> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >> >>                             .apply(View.asMap());
> >> >> >>
> >> >> >>                     final PCollection<TableRow> docsRows = docs
> >> >> >>                             .apply("doc to row " +
> >> >> >> table.getTableName(), ParDo.of(docToTableRow(table.
> getBqTableName(),
> >> >> >> tableSchemas))
> >> >> >>                                     .withSideInputs(tableSchemas))
> ;
> >> >> >>
> >> >> >>                     final WriteResult apply = docsRows
> >> >> >>                             .apply("insert data table - " +
> >> >> >> table.getTableName(),
> >> >> >>                                     BigQueryIO.writeTableRows()
> >> >> >>
> >> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >> >>
> >> >> >> .withSchemaFromView(tableSchemas)
> >> >> >>
> >> >> >> .withCreateDisposition(BigQueryIO.Write.
> CreateDisposition.CREATE_IF_
> >> >> NEEDED)
> >> >> >>
> >> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >> >>
> >> >> >>
> >> >> >> exception is:
> >> >> >>
> >> >> >> Sep 08, 2017 12:16:55 PM
> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >> INFO: Opening TableRowWriter to
> >> >> >>
> >> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-
> d9a12e4fdcfb.
> >> >> >> Exception in thread "main"
> >> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> >> java.lang.IllegalArgumentException: Attempted to get side input
> >> window
> >> >> >> for GlobalWindow from non-global WindowFn
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> >> >> waitUntilFinish(DirectRunner.java:331)
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> >> >> waitUntilFinish(DirectRunner.java:301)
> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> DirectRunner.java:200)
> >> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> >> DirectRunner.java:63)
> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> >> at
> >> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> >> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> >> at
> >> >> >> com.behalf.migration.dataflow.mongodb.
> LoadMongodbDataPipeline.main(
> >> >> LoadMongodbDataPipeline.java:372)
> >> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get
> side
> >> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> >> at
> >> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> >> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> SimplePushbackSideInputDoFnRunner.isReady(
> >> SimplePushbackSideInputDoFnRun
> >> >> ner.java:94)
> >> >> >> at
> >> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> >> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> >> Sep 08, 2017 12:16:58 PM
> >> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> >>
> >> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> >> <ki...@google.com.invalid> wrote:
> >> >> >> > Please include the full exception and please show the code that
> >> >> produces
> >> >> >> it.
> >> >> >> > See also
> >> >> >> >
> >> >> >> https://beam.apache.org/documentation/programming-
> >> >> guide/#transforms-sideio
> >> >> >> > section
> >> >> >> > "Side inputs and windowing" - that might be sufficient to
> resolve
> >> your
> >> >> >> > problem.
> >> >> >> >
> >> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com>
> >> wrote:
> >> >> >> >
> >> >> >> >> Hi,
> >> >> >> >>   I have a pipline that bases on documents from mongo updates
> the
> >> >> >> >> schema and then adds the records to mongo. Since i want a
> >> partitioned
> >> >> >> >> table, i have a dally window.
> >> >> >> >> How do i get the schema view to be a window, i get the
> exception
> >> of:
> >> >> >> >>
> >> >> >> >> Attempted to get side input window for GlobalWindow from
> >> non-global
> >> >> >> >> WindowFn"
> >> >> >> >>
> >> >> >> >> chaim
> >> >> >> >>
> >> >> >>
> >> >>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
batch on dataflow

On Sun, Sep 10, 2017 at 8:05 AM, Reuven Lax <re...@google.com.invalid> wrote:
> Which runner are you using? And is this a batch pipeline?
>
> On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> Thank for the answer, but i don't think that that is the case. From
>> what i have seen, since i have other code to update status based on
>> the window, it does get called before all the windows are calculated.
>> There is no logical reason to wait, once the window has finished, the
>> rest of the pipeline should run and the BigQuery should start to write
>> the results.
>>
>>
>>
>> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > Logically the BigQuery write does not depend on windows, and writing it
>> > windowed would result in incorrect output. For this reason, BigQueryIO
>> > rewindows int global windows before actually writing to BigQuery.
>> >
>> > If you are running in batch mode, there is no performance difference
>> > between windowed and unwindowed side inputs. I believe that all of the
>> > batch runners wait until all windows are calculated before materializing
>> > the output.
>> >
>> > Reuven
>> >
>> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> the schema depends on the data per window.
>> >> when i added the global window it works, but then i loose the
>> >> performance, since the secound stage of writing will begin only after
>> >> the side input has read all the data and updated the schema
>> >> The batchmode of the BigqueryIO seems to use a global window that i
>> >> don't know why?
>> >>
>> >> chaim
>> >>
>> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> >> <ki...@google.com.invalid> wrote:
>> >> > Are your schemas actually supposed to be different between different
>> >> > windows, or do they depend only on data?
>> >> > I see you have a commented-out Window.into(new GlobalWindows()) for
>> your
>> >> > side input - did that work when it wasn't commented out?
>> >> >
>> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:
>> >> >
>> >> >> my code is:
>> >> >>
>> >> >>                     //read docs from mongo
>> >> >>                     final PCollection<Document> docs = pipeline
>> >> >>                             .apply(table.getTableName(),
>> >> MongoDbIO.read()
>> >> >>                                     .withUri("mongodb://" +
>> >> >> connectionParams)
>> >> >>                                     .withFilter(filter)
>> >> >>                                     .withDatabase(options.
>> getDBName())
>> >> >>                                     .withCollection(table.
>> >> getTableName()))
>> >> >>                             .apply("AddEventTimestamps",
>> >> >> WithTimestamps.of((Document doc) -> new
>> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >> >>                             .apply("Window Daily",
>> >> >> Window.into(CalendarWindows.days(1)));
>> >> >>
>> >> >>                     //update bq schema based on window
>> >> >>                     final PCollectionView<Map<String, String>>
>> >> >> tableSchemas = docs
>> >> >> //                            .apply("Global Window",Window.into(new
>> >> >> GlobalWindows()))
>> >> >>                             .apply("extract schema " +
>> >> >> table.getTableName(), new
>> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >> >>                             .apply("getTableSchemaMemory " +
>> >> >> table.getTableName(),
>> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >> >>                             .apply(View.asMap());
>> >> >>
>> >> >>                     final PCollection<TableRow> docsRows = docs
>> >> >>                             .apply("doc to row " +
>> >> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
>> >> >> tableSchemas))
>> >> >>                                     .withSideInputs(tableSchemas));
>> >> >>
>> >> >>                     final WriteResult apply = docsRows
>> >> >>                             .apply("insert data table - " +
>> >> >> table.getTableName(),
>> >> >>                                     BigQueryIO.writeTableRows()
>> >> >>
>> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> >> options.getDatasetId(), table.getBqTableName()))
>> >> >>
>> >> >> .withSchemaFromView(tableSchemas)
>> >> >>
>> >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
>> >> NEEDED)
>> >> >>
>> >> >> .withWriteDisposition(WRITE_APPEND));
>> >> >>
>> >> >>
>> >> >> exception is:
>> >> >>
>> >> >> Sep 08, 2017 12:16:55 PM
>> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >> INFO: Opening TableRowWriter to
>> >> >>
>> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
>> >> >> Exception in thread "main"
>> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> >> java.lang.IllegalArgumentException: Attempted to get side input
>> window
>> >> >> for GlobalWindow from non-global WindowFn
>> >> >> at
>> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> >> waitUntilFinish(DirectRunner.java:331)
>> >> >> at
>> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> >> waitUntilFinish(DirectRunner.java:301)
>> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> DirectRunner.java:200)
>> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> >> DirectRunner.java:63)
>> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> >> at
>> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
>> >> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> >> at
>> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
>> >> LoadMongodbDataPipeline.java:372)
>> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
>> >> >> input window for GlobalWindow from non-global WindowFn
>> >> >> at
>> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
>> >> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> >> at
>> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> SimplePushbackSideInputDoFnRunner.isReady(
>> SimplePushbackSideInputDoFnRun
>> >> ner.java:94)
>> >> >> at
>> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
>> >> SimplePushbackSideInputDoFnRunner.java:76)
>> >> >> Sep 08, 2017 12:16:58 PM
>> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> >>
>> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> >> <ki...@google.com.invalid> wrote:
>> >> >> > Please include the full exception and please show the code that
>> >> produces
>> >> >> it.
>> >> >> > See also
>> >> >> >
>> >> >> https://beam.apache.org/documentation/programming-
>> >> guide/#transforms-sideio
>> >> >> > section
>> >> >> > "Side inputs and windowing" - that might be sufficient to resolve
>> your
>> >> >> > problem.
>> >> >> >
>> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com>
>> wrote:
>> >> >> >
>> >> >> >> Hi,
>> >> >> >>   I have a pipline that bases on documents from mongo updates the
>> >> >> >> schema and then adds the records to mongo. Since i want a
>> partitioned
>> >> >> >> table, i have a dally window.
>> >> >> >> How do i get the schema view to be a window, i get the exception
>> of:
>> >> >> >>
>> >> >> >> Attempted to get side input window for GlobalWindow from
>> non-global
>> >> >> >> WindowFn"
>> >> >> >>
>> >> >> >> chaim
>> >> >> >>
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
Which runner are you using? And is this a batch pipeline?

On Sat, Sep 9, 2017 at 10:03 PM, Chaim Turkel <ch...@behalf.com> wrote:

> Thank for the answer, but i don't think that that is the case. From
> what i have seen, since i have other code to update status based on
> the window, it does get called before all the windows are calculated.
> There is no logical reason to wait, once the window has finished, the
> rest of the pipeline should run and the BigQuery should start to write
> the results.
>
>
>
> On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > Logically the BigQuery write does not depend on windows, and writing it
> > windowed would result in incorrect output. For this reason, BigQueryIO
> > rewindows int global windows before actually writing to BigQuery.
> >
> > If you are running in batch mode, there is no performance difference
> > between windowed and unwindowed side inputs. I believe that all of the
> > batch runners wait until all windows are calculated before materializing
> > the output.
> >
> > Reuven
> >
> > On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> the schema depends on the data per window.
> >> when i added the global window it works, but then i loose the
> >> performance, since the secound stage of writing will begin only after
> >> the side input has read all the data and updated the schema
> >> The batchmode of the BigqueryIO seems to use a global window that i
> >> don't know why?
> >>
> >> chaim
> >>
> >> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> >> <ki...@google.com.invalid> wrote:
> >> > Are your schemas actually supposed to be different between different
> >> > windows, or do they depend only on data?
> >> > I see you have a commented-out Window.into(new GlobalWindows()) for
> your
> >> > side input - did that work when it wasn't commented out?
> >> >
> >> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:
> >> >
> >> >> my code is:
> >> >>
> >> >>                     //read docs from mongo
> >> >>                     final PCollection<Document> docs = pipeline
> >> >>                             .apply(table.getTableName(),
> >> MongoDbIO.read()
> >> >>                                     .withUri("mongodb://" +
> >> >> connectionParams)
> >> >>                                     .withFilter(filter)
> >> >>                                     .withDatabase(options.
> getDBName())
> >> >>                                     .withCollection(table.
> >> getTableName()))
> >> >>                             .apply("AddEventTimestamps",
> >> >> WithTimestamps.of((Document doc) -> new
> >> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >> >>                             .apply("Window Daily",
> >> >> Window.into(CalendarWindows.days(1)));
> >> >>
> >> >>                     //update bq schema based on window
> >> >>                     final PCollectionView<Map<String, String>>
> >> >> tableSchemas = docs
> >> >> //                            .apply("Global Window",Window.into(new
> >> >> GlobalWindows()))
> >> >>                             .apply("extract schema " +
> >> >> table.getTableName(), new
> >> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >> >>                             .apply("getTableSchemaMemory " +
> >> >> table.getTableName(),
> >> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >> >>                             .apply(View.asMap());
> >> >>
> >> >>                     final PCollection<TableRow> docsRows = docs
> >> >>                             .apply("doc to row " +
> >> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
> >> >> tableSchemas))
> >> >>                                     .withSideInputs(tableSchemas));
> >> >>
> >> >>                     final WriteResult apply = docsRows
> >> >>                             .apply("insert data table - " +
> >> >> table.getTableName(),
> >> >>                                     BigQueryIO.writeTableRows()
> >> >>
> >> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> >> options.getDatasetId(), table.getBqTableName()))
> >> >>
> >> >> .withSchemaFromView(tableSchemas)
> >> >>
> >> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
> >> NEEDED)
> >> >>
> >> >> .withWriteDisposition(WRITE_APPEND));
> >> >>
> >> >>
> >> >> exception is:
> >> >>
> >> >> Sep 08, 2017 12:16:55 PM
> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >> INFO: Opening TableRowWriter to
> >> >>
> >> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> >> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
> >> >> Exception in thread "main"
> >> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> >> java.lang.IllegalArgumentException: Attempted to get side input
> window
> >> >> for GlobalWindow from non-global WindowFn
> >> >> at
> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> >> waitUntilFinish(DirectRunner.java:331)
> >> >> at
> >> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> >> waitUntilFinish(DirectRunner.java:301)
> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> DirectRunner.java:200)
> >> >> at org.apache.beam.runners.direct.DirectRunner.run(
> >> DirectRunner.java:63)
> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> >> at
> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> >> runPipeline(LoadMongodbDataPipeline.java:347)
> >> >> at
> >> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
> >> LoadMongodbDataPipeline.java:372)
> >> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
> >> >> input window for GlobalWindow from non-global WindowFn
> >> >> at
> >> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> >> getSideInputWindow(PartitioningWindowFn.java:49)
> >> >> at
> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> SimplePushbackSideInputDoFnRunner.isReady(
> SimplePushbackSideInputDoFnRun
> >> ner.java:94)
> >> >> at
> >> >> org.apache.beam.runners.direct.repackaged.runners.core.
> >> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> >> SimplePushbackSideInputDoFnRunner.java:76)
> >> >> Sep 08, 2017 12:16:58 PM
> >> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> >>
> >> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> >> <ki...@google.com.invalid> wrote:
> >> >> > Please include the full exception and please show the code that
> >> produces
> >> >> it.
> >> >> > See also
> >> >> >
> >> >> https://beam.apache.org/documentation/programming-
> >> guide/#transforms-sideio
> >> >> > section
> >> >> > "Side inputs and windowing" - that might be sufficient to resolve
> your
> >> >> > problem.
> >> >> >
> >> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com>
> wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>   I have a pipline that bases on documents from mongo updates the
> >> >> >> schema and then adds the records to mongo. Since i want a
> partitioned
> >> >> >> table, i have a dally window.
> >> >> >> How do i get the schema view to be a window, i get the exception
> of:
> >> >> >>
> >> >> >> Attempted to get side input window for GlobalWindow from
> non-global
> >> >> >> WindowFn"
> >> >> >>
> >> >> >> chaim
> >> >> >>
> >> >>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
Thank for the answer, but i don't think that that is the case. From
what i have seen, since i have other code to update status based on
the window, it does get called before all the windows are calculated.
There is no logical reason to wait, once the window has finished, the
rest of the pipeline should run and the BigQuery should start to write
the results.



On Sat, Sep 9, 2017 at 10:48 PM, Reuven Lax <re...@google.com.invalid> wrote:
> Logically the BigQuery write does not depend on windows, and writing it
> windowed would result in incorrect output. For this reason, BigQueryIO
> rewindows int global windows before actually writing to BigQuery.
>
> If you are running in batch mode, there is no performance difference
> between windowed and unwindowed side inputs. I believe that all of the
> batch runners wait until all windows are calculated before materializing
> the output.
>
> Reuven
>
> On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com> wrote:
>
>> the schema depends on the data per window.
>> when i added the global window it works, but then i loose the
>> performance, since the secound stage of writing will begin only after
>> the side input has read all the data and updated the schema
>> The batchmode of the BigqueryIO seems to use a global window that i
>> don't know why?
>>
>> chaim
>>
>> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
>> <ki...@google.com.invalid> wrote:
>> > Are your schemas actually supposed to be different between different
>> > windows, or do they depend only on data?
>> > I see you have a commented-out Window.into(new GlobalWindows()) for your
>> > side input - did that work when it wasn't commented out?
>> >
>> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> my code is:
>> >>
>> >>                     //read docs from mongo
>> >>                     final PCollection<Document> docs = pipeline
>> >>                             .apply(table.getTableName(),
>> MongoDbIO.read()
>> >>                                     .withUri("mongodb://" +
>> >> connectionParams)
>> >>                                     .withFilter(filter)
>> >>                                     .withDatabase(options.getDBName())
>> >>                                     .withCollection(table.
>> getTableName()))
>> >>                             .apply("AddEventTimestamps",
>> >> WithTimestamps.of((Document doc) -> new
>> >> Instant(MongodbManagment.docTimeToLong(doc))))
>> >>                             .apply("Window Daily",
>> >> Window.into(CalendarWindows.days(1)));
>> >>
>> >>                     //update bq schema based on window
>> >>                     final PCollectionView<Map<String, String>>
>> >> tableSchemas = docs
>> >> //                            .apply("Global Window",Window.into(new
>> >> GlobalWindows()))
>> >>                             .apply("extract schema " +
>> >> table.getTableName(), new
>> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>> >>                             .apply("getTableSchemaMemory " +
>> >> table.getTableName(),
>> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
>> >>                             .apply(View.asMap());
>> >>
>> >>                     final PCollection<TableRow> docsRows = docs
>> >>                             .apply("doc to row " +
>> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
>> >> tableSchemas))
>> >>                                     .withSideInputs(tableSchemas));
>> >>
>> >>                     final WriteResult apply = docsRows
>> >>                             .apply("insert data table - " +
>> >> table.getTableName(),
>> >>                                     BigQueryIO.writeTableRows()
>> >>
>> >> .to(TableRefPartition.perDay(options.getBQProject(),
>> >> options.getDatasetId(), table.getBqTableName()))
>> >>
>> >> .withSchemaFromView(tableSchemas)
>> >>
>> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
>> NEEDED)
>> >>
>> >> .withWriteDisposition(WRITE_APPEND));
>> >>
>> >>
>> >> exception is:
>> >>
>> >> Sep 08, 2017 12:16:55 PM
>> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >> INFO: Opening TableRowWriter to
>> >>
>> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
>> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
>> >> Exception in thread "main"
>> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> >> java.lang.IllegalArgumentException: Attempted to get side input window
>> >> for GlobalWindow from non-global WindowFn
>> >> at
>> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> waitUntilFinish(DirectRunner.java:331)
>> >> at
>> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
>> waitUntilFinish(DirectRunner.java:301)
>> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> DirectRunner.java:200)
>> >> at org.apache.beam.runners.direct.DirectRunner.run(
>> DirectRunner.java:63)
>> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> >> at
>> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
>> runPipeline(LoadMongodbDataPipeline.java:347)
>> >> at
>> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
>> LoadMongodbDataPipeline.java:372)
>> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
>> >> input window for GlobalWindow from non-global WindowFn
>> >> at
>> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
>> getSideInputWindow(PartitioningWindowFn.java:49)
>> >> at
>> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRun
>> ner.java:94)
>> >> at
>> >> org.apache.beam.runners.direct.repackaged.runners.core.
>> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
>> SimplePushbackSideInputDoFnRunner.java:76)
>> >> Sep 08, 2017 12:16:58 PM
>> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> >>
>> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> >> <ki...@google.com.invalid> wrote:
>> >> > Please include the full exception and please show the code that
>> produces
>> >> it.
>> >> > See also
>> >> >
>> >> https://beam.apache.org/documentation/programming-
>> guide/#transforms-sideio
>> >> > section
>> >> > "Side inputs and windowing" - that might be sufficient to resolve your
>> >> > problem.
>> >> >
>> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:
>> >> >
>> >> >> Hi,
>> >> >>   I have a pipline that bases on documents from mongo updates the
>> >> >> schema and then adds the records to mongo. Since i want a partitioned
>> >> >> table, i have a dally window.
>> >> >> How do i get the schema view to be a window, i get the exception of:
>> >> >>
>> >> >> Attempted to get side input window for GlobalWindow from non-global
>> >> >> WindowFn"
>> >> >>
>> >> >> chaim
>> >> >>
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Reuven Lax <re...@google.com.INVALID>.
Logically the BigQuery write does not depend on windows, and writing it
windowed would result in incorrect output. For this reason, BigQueryIO
rewindows int global windows before actually writing to BigQuery.

If you are running in batch mode, there is no performance difference
between windowed and unwindowed side inputs. I believe that all of the
batch runners wait until all windows are calculated before materializing
the output.

Reuven

On Sat, Sep 9, 2017 at 12:43 PM, Chaim Turkel <ch...@behalf.com> wrote:

> the schema depends on the data per window.
> when i added the global window it works, but then i loose the
> performance, since the secound stage of writing will begin only after
> the side input has read all the data and updated the schema
> The batchmode of the BigqueryIO seems to use a global window that i
> don't know why?
>
> chaim
>
> On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
> <ki...@google.com.invalid> wrote:
> > Are your schemas actually supposed to be different between different
> > windows, or do they depend only on data?
> > I see you have a commented-out Window.into(new GlobalWindows()) for your
> > side input - did that work when it wasn't commented out?
> >
> > On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> my code is:
> >>
> >>                     //read docs from mongo
> >>                     final PCollection<Document> docs = pipeline
> >>                             .apply(table.getTableName(),
> MongoDbIO.read()
> >>                                     .withUri("mongodb://" +
> >> connectionParams)
> >>                                     .withFilter(filter)
> >>                                     .withDatabase(options.getDBName())
> >>                                     .withCollection(table.
> getTableName()))
> >>                             .apply("AddEventTimestamps",
> >> WithTimestamps.of((Document doc) -> new
> >> Instant(MongodbManagment.docTimeToLong(doc))))
> >>                             .apply("Window Daily",
> >> Window.into(CalendarWindows.days(1)));
> >>
> >>                     //update bq schema based on window
> >>                     final PCollectionView<Map<String, String>>
> >> tableSchemas = docs
> >> //                            .apply("Global Window",Window.into(new
> >> GlobalWindows()))
> >>                             .apply("extract schema " +
> >> table.getTableName(), new
> >> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
> >>                             .apply("getTableSchemaMemory " +
> >> table.getTableName(),
> >> ParDo.of(getTableSchemaMemory(table.getTableName())))
> >>                             .apply(View.asMap());
> >>
> >>                     final PCollection<TableRow> docsRows = docs
> >>                             .apply("doc to row " +
> >> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
> >> tableSchemas))
> >>                                     .withSideInputs(tableSchemas));
> >>
> >>                     final WriteResult apply = docsRows
> >>                             .apply("insert data table - " +
> >> table.getTableName(),
> >>                                     BigQueryIO.writeTableRows()
> >>
> >> .to(TableRefPartition.perDay(options.getBQProject(),
> >> options.getDatasetId(), table.getBqTableName()))
> >>
> >> .withSchemaFromView(tableSchemas)
> >>
> >> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_
> NEEDED)
> >>
> >> .withWriteDisposition(WRITE_APPEND));
> >>
> >>
> >> exception is:
> >>
> >> Sep 08, 2017 12:16:55 PM
> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >> INFO: Opening TableRowWriter to
> >>
> >> gs://bq-migration/tempMongo/BigQueryWriteTemp/
> 7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
> >> Exception in thread "main"
> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> java.lang.IllegalArgumentException: Attempted to get side input window
> >> for GlobalWindow from non-global WindowFn
> >> at
> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:331)
> >> at
> >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.
> waitUntilFinish(DirectRunner.java:301)
> >> at org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:200)
> >> at org.apache.beam.runners.direct.DirectRunner.run(
> DirectRunner.java:63)
> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> >> at
> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.
> runPipeline(LoadMongodbDataPipeline.java:347)
> >> at
> >> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(
> LoadMongodbDataPipeline.java:372)
> >> Caused by: java.lang.IllegalArgumentException: Attempted to get side
> >> input window for GlobalWindow from non-global WindowFn
> >> at
> >> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.
> getSideInputWindow(PartitioningWindowFn.java:49)
> >> at
> >> org.apache.beam.runners.direct.repackaged.runners.core.
> SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRun
> ner.java:94)
> >> at
> >> org.apache.beam.runners.direct.repackaged.runners.core.
> SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(
> SimplePushbackSideInputDoFnRunner.java:76)
> >> Sep 08, 2017 12:16:58 PM
> >> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> >>
> >> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> >> <ki...@google.com.invalid> wrote:
> >> > Please include the full exception and please show the code that
> produces
> >> it.
> >> > See also
> >> >
> >> https://beam.apache.org/documentation/programming-
> guide/#transforms-sideio
> >> > section
> >> > "Side inputs and windowing" - that might be sufficient to resolve your
> >> > problem.
> >> >
> >> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:
> >> >
> >> >> Hi,
> >> >>   I have a pipline that bases on documents from mongo updates the
> >> >> schema and then adds the records to mongo. Since i want a partitioned
> >> >> table, i have a dally window.
> >> >> How do i get the schema view to be a window, i get the exception of:
> >> >>
> >> >> Attempted to get side input window for GlobalWindow from non-global
> >> >> WindowFn"
> >> >>
> >> >> chaim
> >> >>
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
the schema depends on the data per window.
when i added the global window it works, but then i loose the
performance, since the secound stage of writing will begin only after
the side input has read all the data and updated the schema
The batchmode of the BigqueryIO seems to use a global window that i
don't know why?

chaim

On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
<ki...@google.com.invalid> wrote:
> Are your schemas actually supposed to be different between different
> windows, or do they depend only on data?
> I see you have a commented-out Window.into(new GlobalWindows()) for your
> side input - did that work when it wasn't commented out?
>
> On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:
>
>> my code is:
>>
>>                     //read docs from mongo
>>                     final PCollection<Document> docs = pipeline
>>                             .apply(table.getTableName(), MongoDbIO.read()
>>                                     .withUri("mongodb://" +
>> connectionParams)
>>                                     .withFilter(filter)
>>                                     .withDatabase(options.getDBName())
>>                                     .withCollection(table.getTableName()))
>>                             .apply("AddEventTimestamps",
>> WithTimestamps.of((Document doc) -> new
>> Instant(MongodbManagment.docTimeToLong(doc))))
>>                             .apply("Window Daily",
>> Window.into(CalendarWindows.days(1)));
>>
>>                     //update bq schema based on window
>>                     final PCollectionView<Map<String, String>>
>> tableSchemas = docs
>> //                            .apply("Global Window",Window.into(new
>> GlobalWindows()))
>>                             .apply("extract schema " +
>> table.getTableName(), new
>> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>>                             .apply("getTableSchemaMemory " +
>> table.getTableName(),
>> ParDo.of(getTableSchemaMemory(table.getTableName())))
>>                             .apply(View.asMap());
>>
>>                     final PCollection<TableRow> docsRows = docs
>>                             .apply("doc to row " +
>> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
>> tableSchemas))
>>                                     .withSideInputs(tableSchemas));
>>
>>                     final WriteResult apply = docsRows
>>                             .apply("insert data table - " +
>> table.getTableName(),
>>                                     BigQueryIO.writeTableRows()
>>
>> .to(TableRefPartition.perDay(options.getBQProject(),
>> options.getDatasetId(), table.getBqTableName()))
>>
>> .withSchemaFromView(tableSchemas)
>>
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>
>> .withWriteDisposition(WRITE_APPEND));
>>
>>
>> exception is:
>>
>> Sep 08, 2017 12:16:55 PM
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> INFO: Opening TableRowWriter to
>>
>> gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalArgumentException: Attempted to get side input window
>> for GlobalWindow from non-global WindowFn
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> at
>> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347)
>> at
>> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372)
>> Caused by: java.lang.IllegalArgumentException: Attempted to get side
>> input window for GlobalWindow from non-global WindowFn
>> at
>> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
>> at
>> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94)
>> at
>> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76)
>> Sep 08, 2017 12:16:58 PM
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>>
>> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> <ki...@google.com.invalid> wrote:
>> > Please include the full exception and please show the code that produces
>> it.
>> > See also
>> >
>> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
>> > section
>> > "Side inputs and windowing" - that might be sufficient to resolve your
>> > problem.
>> >
>> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> Hi,
>> >>   I have a pipline that bases on documents from mongo updates the
>> >> schema and then adds the records to mongo. Since i want a partitioned
>> >> table, i have a dally window.
>> >> How do i get the schema view to be a window, i get the exception of:
>> >>
>> >> Attempted to get side input window for GlobalWindow from non-global
>> >> WindowFn"
>> >>
>> >> chaim
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
the schema depends on the data per window.
when i added the global window it works, but then i loose the
performance, since the second stage of writing will begin only after
the side input has read all the data and updated the schema
The batchmode of the BigqueryIO seems to use a global window that i
don't know why?

chaim

On Fri, Sep 8, 2017 at 9:26 PM, Eugene Kirpichov
<ki...@google.com.invalid> wrote:
> Are your schemas actually supposed to be different between different
> windows, or do they depend only on data?
> I see you have a commented-out Window.into(new GlobalWindows()) for your
> side input - did that work when it wasn't commented out?
>
> On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:
>
>> my code is:
>>
>>                     //read docs from mongo
>>                     final PCollection<Document> docs = pipeline
>>                             .apply(table.getTableName(), MongoDbIO.read()
>>                                     .withUri("mongodb://" +
>> connectionParams)
>>                                     .withFilter(filter)
>>                                     .withDatabase(options.getDBName())
>>                                     .withCollection(table.getTableName()))
>>                             .apply("AddEventTimestamps",
>> WithTimestamps.of((Document doc) -> new
>> Instant(MongodbManagment.docTimeToLong(doc))))
>>                             .apply("Window Daily",
>> Window.into(CalendarWindows.days(1)));
>>
>>                     //update bq schema based on window
>>                     final PCollectionView<Map<String, String>>
>> tableSchemas = docs
>> //                            .apply("Global Window",Window.into(new
>> GlobalWindows()))
>>                             .apply("extract schema " +
>> table.getTableName(), new
>> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>>                             .apply("getTableSchemaMemory " +
>> table.getTableName(),
>> ParDo.of(getTableSchemaMemory(table.getTableName())))
>>                             .apply(View.asMap());
>>
>>                     final PCollection<TableRow> docsRows = docs
>>                             .apply("doc to row " +
>> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
>> tableSchemas))
>>                                     .withSideInputs(tableSchemas));
>>
>>                     final WriteResult apply = docsRows
>>                             .apply("insert data table - " +
>> table.getTableName(),
>>                                     BigQueryIO.writeTableRows()
>>
>> .to(TableRefPartition.perDay(options.getBQProject(),
>> options.getDatasetId(), table.getBqTableName()))
>>
>> .withSchemaFromView(tableSchemas)
>>
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>
>> .withWriteDisposition(WRITE_APPEND));
>>
>>
>> exception is:
>>
>> Sep 08, 2017 12:16:55 PM
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>> INFO: Opening TableRowWriter to
>>
>> gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
>> Exception in thread "main"
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.IllegalArgumentException: Attempted to get side input window
>> for GlobalWindow from non-global WindowFn
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
>> at
>> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347)
>> at
>> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372)
>> Caused by: java.lang.IllegalArgumentException: Attempted to get side
>> input window for GlobalWindow from non-global WindowFn
>> at
>> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
>> at
>> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94)
>> at
>> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76)
>> Sep 08, 2017 12:16:58 PM
>> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>>
>> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
>> <ki...@google.com.invalid> wrote:
>> > Please include the full exception and please show the code that produces
>> it.
>> > See also
>> >
>> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
>> > section
>> > "Side inputs and windowing" - that might be sufficient to resolve your
>> > problem.
>> >
>> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:
>> >
>> >> Hi,
>> >>   I have a pipline that bases on documents from mongo updates the
>> >> schema and then adds the records to mongo. Since i want a partitioned
>> >> table, i have a dally window.
>> >> How do i get the schema view to be a window, i get the exception of:
>> >>
>> >> Attempted to get side input window for GlobalWindow from non-global
>> >> WindowFn"
>> >>
>> >> chaim
>> >>
>>

Re: BigQueryIO withSchemaFromView

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Are your schemas actually supposed to be different between different
windows, or do they depend only on data?
I see you have a commented-out Window.into(new GlobalWindows()) for your
side input - did that work when it wasn't commented out?

On Fri, Sep 8, 2017 at 2:17 AM Chaim Turkel <ch...@behalf.com> wrote:

> my code is:
>
>                     //read docs from mongo
>                     final PCollection<Document> docs = pipeline
>                             .apply(table.getTableName(), MongoDbIO.read()
>                                     .withUri("mongodb://" +
> connectionParams)
>                                     .withFilter(filter)
>                                     .withDatabase(options.getDBName())
>                                     .withCollection(table.getTableName()))
>                             .apply("AddEventTimestamps",
> WithTimestamps.of((Document doc) -> new
> Instant(MongodbManagment.docTimeToLong(doc))))
>                             .apply("Window Daily",
> Window.into(CalendarWindows.days(1)));
>
>                     //update bq schema based on window
>                     final PCollectionView<Map<String, String>>
> tableSchemas = docs
> //                            .apply("Global Window",Window.into(new
> GlobalWindows()))
>                             .apply("extract schema " +
> table.getTableName(), new
> LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
>                             .apply("getTableSchemaMemory " +
> table.getTableName(),
> ParDo.of(getTableSchemaMemory(table.getTableName())))
>                             .apply(View.asMap());
>
>                     final PCollection<TableRow> docsRows = docs
>                             .apply("doc to row " +
> table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
> tableSchemas))
>                                     .withSideInputs(tableSchemas));
>
>                     final WriteResult apply = docsRows
>                             .apply("insert data table - " +
> table.getTableName(),
>                                     BigQueryIO.writeTableRows()
>
> .to(TableRefPartition.perDay(options.getBQProject(),
> options.getDatasetId(), table.getBqTableName()))
>
> .withSchemaFromView(tableSchemas)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>
> .withWriteDisposition(WRITE_APPEND));
>
>
> exception is:
>
> Sep 08, 2017 12:16:55 PM
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
> INFO: Opening TableRowWriter to
>
> gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.IllegalArgumentException: Attempted to get side input window
> for GlobalWindow from non-global WindowFn
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
> at
> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347)
> at
> com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372)
> Caused by: java.lang.IllegalArgumentException: Attempted to get side
> input window for GlobalWindow from non-global WindowFn
> at
> org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94)
> at
> org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76)
> Sep 08, 2017 12:16:58 PM
> org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
>
> On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
> <ki...@google.com.invalid> wrote:
> > Please include the full exception and please show the code that produces
> it.
> > See also
> >
> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
> > section
> > "Side inputs and windowing" - that might be sufficient to resolve your
> > problem.
> >
> > On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:
> >
> >> Hi,
> >>   I have a pipline that bases on documents from mongo updates the
> >> schema and then adds the records to mongo. Since i want a partitioned
> >> table, i have a dally window.
> >> How do i get the schema view to be a window, i get the exception of:
> >>
> >> Attempted to get side input window for GlobalWindow from non-global
> >> WindowFn"
> >>
> >> chaim
> >>
>

Re: BigQueryIO withSchemaFromView

Posted by Chaim Turkel <ch...@behalf.com>.
my code is:

                    //read docs from mongo
                    final PCollection<Document> docs = pipeline
                            .apply(table.getTableName(), MongoDbIO.read()
                                    .withUri("mongodb://" + connectionParams)
                                    .withFilter(filter)
                                    .withDatabase(options.getDBName())
                                    .withCollection(table.getTableName()))
                            .apply("AddEventTimestamps",
WithTimestamps.of((Document doc) -> new
Instant(MongodbManagment.docTimeToLong(doc))))
                            .apply("Window Daily",
Window.into(CalendarWindows.days(1)));

                    //update bq schema based on window
                    final PCollectionView<Map<String, String>>
tableSchemas = docs
//                            .apply("Global Window",Window.into(new
GlobalWindows()))
                            .apply("extract schema " +
table.getTableName(), new
LoadMongodbSchemaPipeline.DocsToSchemaTransform(table))
                            .apply("getTableSchemaMemory " +
table.getTableName(),
ParDo.of(getTableSchemaMemory(table.getTableName())))
                            .apply(View.asMap());

                    final PCollection<TableRow> docsRows = docs
                            .apply("doc to row " +
table.getTableName(), ParDo.of(docToTableRow(table.getBqTableName(),
tableSchemas))
                                    .withSideInputs(tableSchemas));

                    final WriteResult apply = docsRows
                            .apply("insert data table - " +
table.getTableName(),
                                    BigQueryIO.writeTableRows()

.to(TableRefPartition.perDay(options.getBQProject(),
options.getDatasetId(), table.getBqTableName()))
                                            .withSchemaFromView(tableSchemas)

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

.withWriteDisposition(WRITE_APPEND));


exception is:

Sep 08, 2017 12:16:55 PM
org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
INFO: Opening TableRowWriter to
gs://bq-migration/tempMongo/BigQueryWriteTemp/7ae420d6f9eb41e488d2015d2e4d200d/cb3f0aef-9aeb-47ac-93dc-d9a12e4fdcfb.
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException: Attempted to get side input window
for GlobalWindow from non-global WindowFn
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.runPipeline(LoadMongodbDataPipeline.java:347)
at com.behalf.migration.dataflow.mongodb.LoadMongodbDataPipeline.main(LoadMongodbDataPipeline.java:372)
Caused by: java.lang.IllegalArgumentException: Attempted to get side
input window for GlobalWindow from non-global WindowFn
at org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
at org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.isReady(SimplePushbackSideInputDoFnRunner.java:94)
at org.apache.beam.runners.direct.repackaged.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:76)
Sep 08, 2017 12:16:58 PM
org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>

On Thu, Sep 7, 2017 at 7:07 PM, Eugene Kirpichov
<ki...@google.com.invalid> wrote:
> Please include the full exception and please show the code that produces it.
> See also
> https://beam.apache.org/documentation/programming-guide/#transforms-sideio
> section
> "Side inputs and windowing" - that might be sufficient to resolve your
> problem.
>
> On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:
>
>> Hi,
>>   I have a pipline that bases on documents from mongo updates the
>> schema and then adds the records to mongo. Since i want a partitioned
>> table, i have a dally window.
>> How do i get the schema view to be a window, i get the exception of:
>>
>> Attempted to get side input window for GlobalWindow from non-global
>> WindowFn"
>>
>> chaim
>>

Re: BigQueryIO withSchemaFromView

Posted by Eugene Kirpichov <ki...@google.com.INVALID>.
Please include the full exception and please show the code that produces it.
See also
https://beam.apache.org/documentation/programming-guide/#transforms-sideio
section
"Side inputs and windowing" - that might be sufficient to resolve your
problem.

On Thu, Sep 7, 2017 at 5:10 AM Chaim Turkel <ch...@behalf.com> wrote:

> Hi,
>   I have a pipline that bases on documents from mongo updates the
> schema and then adds the records to mongo. Since i want a partitioned
> table, i have a dally window.
> How do i get the schema view to be a window, i get the exception of:
>
> Attempted to get side input window for GlobalWindow from non-global
> WindowFn"
>
> chaim
>