You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/08/03 09:08:00 UTC

[jira] [Work logged] (BEAM-10248) Beam does not set correct region for BigQuery when requesting load job status

     [ https://issues.apache.org/jira/browse/BEAM-10248?focusedWorklogId=465587&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-465587 ]

ASF GitHub Bot logged work on BEAM-10248:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Aug/20 09:07
            Start Date: 03/Aug/20 09:07
    Worklog Time Spent: 10m 
      Work Description: polleyg commented on pull request #12431:
URL: https://github.com/apache/beam/pull/12431#issuecomment-667904984


   Thanks both!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 465587)
    Time Spent: 50m  (was: 40m)

> Beam does not set correct region for BigQuery when requesting load job status
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-10248
>                 URL: https://issues.apache.org/jira/browse/BEAM-10248
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.22.0
>         Environment: -Beam 2.22.0
> -DirectRunner & DataflowRunner
> -Java 11
>            Reporter: Graham Polley
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>             Fix For: 2.24.0
>
>         Attachments: Untitled document.pdf
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> I am using `FILE_LOADS` (micro-batching) from Pub/Sub and writing to BigQuery. My BigQuery dataset is in region `australia-southeast1`.
> If the load job into BigQuery fails for some reason (e.g. table does not exist, or schema has changed), then an error from BigQuery is returned for the load.
> Beam then enters into a loop of retrying the load job. However, instead of using a new job id (where the suffix is incremented by 1), it wrongly tries to reinsert the job using the same job id. This is because when it tries to look up if the job id already exists, it does not take into consideration the region where the dataset is. Instead, it defaults to the US but the job was created in `australia-southeast1`, so it returns `null`.
> Bug is on LN308 of `BigQueryHelpers.java`. It should not return `null`. It returns `null` because it's looking in the wrong region.
> If I *test* using a dataset in BigQuery that is in the US region, then it correctly finds the job id and begins to retry the job with a new job id (suffixed with the retry count). We have seen this bug in other areas of Beam before and in other tools/services on GCP e.g. Cloud Composer.
> However, that leads me to my next problem/bug. Even if that is fixed, the number of retries is set to `Integer.MX_VALUE`, so Beam will keep retrying the job 2,147,483,647 times. This is not good.
> The exception is swallowed up by the Beam SDK and never propagated back up the stack for users to catch and handle. So, if a load job fails there is no way to handle it and react for users.
> I attempted to set `withFailedInsertRetryPolicy()` to transient errors only, but it is not supported with `FILE_LOADS`. I also tried, using the `WriteResult` object returned from the bigQuery sink/write, to get a handle on the error but it does not work. Users need a way to catch and catch failed load jobs when using `FILE_LOADS`.
>  
> {code:java}
> public class TemplatePipeline {
>  private static final String TOPIC = "projects/etl-demo-269105/topics/test-micro-batching";
>  private static final String BIGQUERY_DESTINATION_FILTERED = "etl-demo-269105:etl_demo_fun.micro_batch_test_xxx";
>  public static void main(String[] args) throws Exception {
>  try {
>  PipelineOptionsFactory.register(DataflowPipelineOptions.class);
>  DataflowPipelineOptions options = PipelineOptionsFactory
>  .fromArgs(args)
>  .withoutStrictParsing()
>  .as(DataflowPipelineOptions.class);
>  Pipeline pipeline = Pipeline.create(options);
>  PCollection<PubsubMessage> messages = pipeline
>  .apply(PubsubIO.readMessagesWithAttributes().fromTopic(String.format(TOPIC, options.getProject())))
>  .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
>  WriteResult result = messages.apply(ParDo.of(new RowToBQRow()))
>  .apply(BigQueryIO.writeTableRows()
>  .to(String.format(BIGQUERY_DESTINATION_FILTERED, options.getProject()))
>  .withCreateDisposition(CREATE_NEVER)
>  .withWriteDisposition(WRITE_APPEND)
>  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>  .withTriggeringFrequency(Duration.standardSeconds(5))
>  .withNumFileShards(1)
>  .withExtendedErrorInfo()
>  .withSchema(getTableSchema()));
> // result.getFailedInsertsWithErr().apply(ParDo.of(new DoFn<BigQueryInsertError, String>() {
> // @ProcessElement
> // public void processElement(ProcessContext c) {
> // for(ErrorProto err : c.element().getError().getErrors()){
> // throw new RuntimeException(err.getMessage());
> // }
> //
> // }
> // }));
>  result.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, String>() {
>  @ProcessElement
>  public void processElement(ProcessContext c) {
>  System.out.println(c.element());
>  c.output("foo");
>  throw new RuntimeException("Failed to load");
>  }
>  @FinishBundle
>  public void finishUp(FinishBundleContext finishBundleContextc){
>  System.out.println("Got here");
>  }
>  }));
>  pipeline.run();
>  } catch (Exception e) {
>  e.printStackTrace();
>  throw new Exception(e);
>  }
>  }
>  private static TableSchema getTableSchema() {
>  List<TableFieldSchema> fields = new ArrayList<>();
>  fields.add(new TableFieldSchema().setName("timestamp").setType("INTEGER"));
>  fields.add(new TableFieldSchema().setName("payload").setType("STRING"));
>  return new TableSchema().setFields(fields);
>  }
>  public static class RowToBQRow extends DoFn<PubsubMessage, TableRow> {
>  @ProcessElement
>  public void processElement(ProcessContext c) {
>  String payload = new String(c.element().getPayload(), StandardCharsets.UTF_8);
>  c.output(new TableRow()
>  .set("timestamp", System.currentTimeMillis())
>  .set("payload", payload)
>  );
>  }
>  }
> }{code}
> Stack trace is attached showing problem of same job id being used/inserted to BigQuery on each retry.
> {noformat}
>  {noformat}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)