You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Abacn (via GitHub)" <gi...@apache.org> on 2023/05/19 20:43:57 UTC

[GitHub] [beam] Abacn opened a new issue, #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Abacn opened a new issue, #26796:
URL: https://github.com/apache/beam/issues/26796

   ### What happened?
   
   I am trying to reproduce #26789 but found another apparent bug before set all conditions:
   
   Attached is the full code. Basically what it does is to write to a table with schema order shuffled; tableRow insertion order shuffled; changing number of fields and see if data corruption/loss/exception happens.
   
   However, it is found that if the TableRow contains a Timestamp field valued by a String (set initcolumns = 5 below), there is no record get written into BigQuery, and no error shown.
   
   (Set  initcolumns = 4 which avoided the Timestamp field, all record written successfully into BigQuery) 
   
   ```java
   
   package com.github.abacn;
   
   import com.google.api.services.bigquery.model.TableFieldSchema;
   import com.google.api.services.bigquery.model.TableRow;
   import com.google.api.services.bigquery.model.TableSchema;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.io.GenerateSequence;
   import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
   import org.apache.beam.sdk.options.PipelineOptions;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.transforms.SerializableFunction;
   
   import java.util.ArrayList;
   import java.util.Collections;
   import java.util.List;
   
   /**
    * Reproduce the issue of <a href="https://github.com/apache/beam/issues/26789">BigQueryIO Storage API write data corruption</a>
    */
   public class BigQueryStorageWriteDemo {
     public static void main(String[] argv) {
       PipelineOptions option = PipelineOptionsFactory.fromArgs(argv).create();
       Pipeline pipeline = Pipeline.create(option);
   
       final int initcolumns = 5;
       TableSchema initSchema = getTableSchema(initcolumns, true);
   
       BigQueryIO.Write<Long> writeIO = BigQueryIO.<Long>write()
           .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
           .withFormatFunction(new FormatFn(initcolumns, false))
           .to("*******") // table specifier
           .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
           .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
           .withSchema(initSchema);
           //.withAutoSchemaUpdate(true)
           //.ignoreUnknownValues();
   
       pipeline.apply(GenerateSequence.from(0).to(10)).apply(writeIO);
   
       pipeline.run().waitUntilFinish();
   
       System.out.println("Finished");
       System.exit(0);
     }
   
     static TableSchema getTableSchema(int numColumns, boolean isShuffle) {
       List<TableFieldSchema> fields = new ArrayList<>(numColumns);
       for (int idx = 0; idx < numColumns; ++idx) {
         switch (idx) {
           case 0:
             fields.add(new TableFieldSchema().setName("int_value").setType("INTEGER"));
             break;
           case 1:
             fields.add(new TableFieldSchema().setName("double_value").setType("FLOAT"));
             break;
           case 2:
             fields.add(new TableFieldSchema().setName("string_value").setType("STRING"));
             break;
           case 3:
             fields.add(new TableFieldSchema().setName("boolean_value").setType("BOOLEAN"));
             break;
           case 4:
             fields.add(new TableFieldSchema().setName("time_stamp_value").setType("TIMESTAMP"));
             break;
           default:
             fields.add(new TableFieldSchema().setName("field_" + idx).setType("INTEGER"));
         }
       }
       if (isShuffle) {
         Collections.shuffle(fields);
       }
       return new TableSchema().setFields(fields);
     }
   
     private static class FormatFn implements SerializableFunction<Long, TableRow> {
       protected final int numColumns;
   
       protected final boolean isShuffle;
   
       public FormatFn(int numColumns, boolean isShuffle) {
         this.numColumns = numColumns;
         this.isShuffle = isShuffle;
       }
   
       @Override
       public TableRow apply(Long input) {
         List<Integer> arrs = new ArrayList<>(numColumns);
         for (int idx = 0; idx < numColumns; idx++) {
           arrs.add(idx);
         }
         if (isShuffle) {
           Collections.shuffle(arrs);
         }
         TableRow row = new TableRow();
         for (int idx : arrs) {
           switch (idx) {
             case 0:
               row.set("int_value", input);
               break;
             case 1:
               row.set("double_value", input.doubleValue() + 0.1);
               break;
             case 2:
               row.set("string_value", input.toString() + "S");
               break;
             case 3:
               row.set(("boolean_value"), (input % 2 == 0));
               break;
             case 4:
               row.set("time_stamp_value", String.format("%d-01-03 12:34:56+00", 2000+input));
               break;
             default:
               row.set("field_" + idx, input);
           }
         }
         return row;
       }
     }
   }
   
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on issue #26796:
URL: https://github.com/apache/beam/issues/26796#issuecomment-1555413523

   Ok, this seems WAI as all messages are written into DLQ actually. The following exception is suppressed:
   
   ```
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaDoesntMatchException: Problem converting field root.time_stamp_value expected type: TIMESTAMP. Exception: java.lang.NumberFormatException: For input string: "2009-01-03 12:34:56+00"
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:374)
   Caused by: org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto$SchemaDoesntMatchException: Problem converting field root.time_stamp_value expected type: TIMESTAMP. Exception: java.lang.NumberFormatException: For input string: "2009-01-03 12:34:56+00"
   	at org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.messageFromMap(TableRowToStorageApiProto.java:487)
   	at org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.messageFromTableRow(TableRowToStorageApiProto.java:589)
   	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinationsTableRow$TableRowConverter.toMessage(StorageApiDynamicDestinationsTableRow.java:152)
   	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinationsTableRow$TableRowConverter.toMessage(StorageApiDynamicDestinationsTableRow.java:140)
   	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiConvertMessages$ConvertMessagesDoFn.processElement(StorageApiConvertMessages.java:142)
   Caused by: java.lang.NumberFormatException: For input string: "2009-01-03 12:34:56+00"
   	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
   	at java.lang.Long.parseLong(Long.java:589)
   	at java.lang.Long.parseLong(Long.java:631)
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on issue #26796:
URL: https://github.com/apache/beam/issues/26796#issuecomment-1561726215

   general issue of handling DLQ alert: #26796 keep this issue open for the fix on the timestamp


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on issue #26796:
URL: https://github.com/apache/beam/issues/26796#issuecomment-1561841723

   DateTimeFormatterBuilder's `appendOffsetId()` uses a particular pattern that requires MM:
   https://github.com/apache/beam/blob/34da7f9ab86842f712f136ff2aad7ef138870ae9/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L94-L101


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn closed issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn closed issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid
URL: https://github.com/apache/beam/issues/26796


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on issue #26796:
URL: https://github.com/apache/beam/issues/26796#issuecomment-1561839837

   Tried it locally, looks like `2009-01-03 12:34:56+00` doesn't work, but `2009-01-03 12:34:56+00:00` does


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on issue #26796: [Bug]: Records get silently dropped in BigQueryIO storage write when timestamp (possibly) invalid

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on issue #26796:
URL: https://github.com/apache/beam/issues/26796#issuecomment-1561895160

   .take-issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org