You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/14 21:53:23 UTC

[GitHub] [beam] ahmedabu98 opened a new pull request, #21872: Standardizing output of WriteToBigQuery

ahmedabu98 opened a new pull request, #21872:
URL: https://github.com/apache/beam/pull/21872

   Standardizing WriteToBigQuery's output to adhere to the [[Beam I/O Standards] API Syntax and Semantics](https://docs.google.com/document/d/1V2FkGGunVgvLwi1dKHr-7mtDuwYjTuvuESV_oPzVnfQ/edit?resourcekey=0-KvfQq-5iCcMlu3f3MFJ-GQ#heading=h.k9bqspolh594) document (specifically, T2).
   
   This solution creates a joint `WriteResult` object (similar to the [WriteResult class](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java) in the Java SDK) that would be returned by both `STREAMING_INSERTS` and `FILE_LOADS` write methods.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1155747130

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1155747127

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1155810086

   # [Codecov](https://codecov.io/gh/apache/beam/pull/21872?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#21872](https://codecov.io/gh/apache/beam/pull/21872?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (be2fa1f) into [master](https://codecov.io/gh/apache/beam/commit/4b33a38cdd06bebb9ab1b0a04844944937a8bd8b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4b33a38) will **decrease** coverage by `0.01%`.
   > The diff coverage is `70.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #21872      +/-   ##
   ==========================================
   - Coverage   74.07%   74.06%   -0.02%     
   ==========================================
     Files         698      698              
     Lines       92574    92638      +64     
   ==========================================
   + Hits        68577    68612      +35     
   - Misses      22742    22771      +29     
     Partials     1255     1255              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.73% <70.00%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/21872?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `69.90% <70.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.01% <0.00%> (-1.39%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [...ache\_beam/runners/dataflow/ptransform\_overrides.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9wdHJhbnNmb3JtX292ZXJyaWRlcy5weQ==) | `90.12% <0.00%> (-0.62%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/bigquery\_read\_internal.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3JlYWRfaW50ZXJuYWwucHk=) | `53.92% <0.00%> (-0.50%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `88.94% <0.00%> (-0.16%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/urns.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvdXJucy5weQ==) | `88.70% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/coders/coders.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vY29kZXJzL2NvZGVycy5weQ==) | `88.22% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.68% <0.00%> (ø)` | |
   | ... and [41 more](https://codecov.io/gh/apache/beam/pull/21872/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/21872?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/21872?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [4b33a38...be2fa1f](https://codecov.io/gh/apache/beam/pull/21872?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1156486291

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1170031576

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1170032027

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r902913424


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:

Review Comment:
   add a nice, detailed Pydoc for the class.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method = method
+    self.destination = destination
+    self.schema = schema
+    self.destination_load_jobid_pairs = destination_load_jobid_pairs
+    self.destination_file_pairs = destination_file_pairs
+    self.destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self.failed_rows = failed_rows
+    self.failed_rows_with_errors = failed_rows_with_errors
+
+    self.config = {
+        'method': method,
+        'destination': destination,
+        'schema': schema,
+    }
+
+    from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+    self.attributes = {
+        BigQueryWriteFn.FAILED_ROWS: self.get_failed_rows,
+        BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: self.
+        get_failed_rows_with_errors,
+        BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: self.
+        get_destination_load_jobid_pairs,
+        BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: self.
+        get_destination_file_pairs,
+        BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: self.
+        get_destination_copy_jobid_pairs,
+        'write_configuration': self.get_write_configuration
+    }
+
+  def get_write_configuration(self):
+    return self.config
+
+  def validate(self, method, attribute):
+    if self.method != method:
+      raise ValueError(
+          f'Cannot get {attribute} because it is not produced '
+          f'by {self.method} write method. Note: only {method} '
+          'produces this attribute.')
+
+  def get_destination_load_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_JOBID_PAIRS')
+
+    return self.destination_load_jobid_pairs
+
+  def get_destination_file_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_FILE_PAIRS')
+
+    return self.destination_file_pairs
+
+  def get_destination_copy_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_COPY_JOBID_PAIRS')
+
+    return self.destination_copy_jobid_pairs
+
+  def get_failed_rows(self):
+    self.validate('STREAMING_INSERTS', 'FAILED_ROWS')
+
+    return self.failed_rows
+
+  def get_failed_rows_with_errors(self):

Review Comment:
   I like this. A few changes I propose for these methods:
   - Remove the `get_` in the names. Let them have the name of the property (the class attributes will have to be `self._destination_file_pairs`).
   - Annotate with `@property` so people can do `result.failed_rows_with_errors`



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method = method
+    self.destination = destination
+    self.schema = schema
+    self.destination_load_jobid_pairs = destination_load_jobid_pairs
+    self.destination_file_pairs = destination_file_pairs
+    self.destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self.failed_rows = failed_rows
+    self.failed_rows_with_errors = failed_rows_with_errors
+
+    self.config = {
+        'method': method,
+        'destination': destination,
+        'schema': schema,
+    }
+
+    from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+    self.attributes = {
+        BigQueryWriteFn.FAILED_ROWS: self.get_failed_rows,
+        BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: self.
+        get_failed_rows_with_errors,
+        BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: self.
+        get_destination_load_jobid_pairs,
+        BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: self.
+        get_destination_file_pairs,
+        BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: self.
+        get_destination_copy_jobid_pairs,
+        'write_configuration': self.get_write_configuration
+    }
+
+  def get_write_configuration(self):
+    return self.config
+
+  def validate(self, method, attribute):
+    if self.method != method:
+      raise ValueError(
+          f'Cannot get {attribute} because it is not produced '
+          f'by {self.method} write method. Note: only {method} '
+          'produces this attribute.')
+
+  def get_destination_load_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_JOBID_PAIRS')
+
+    return self.destination_load_jobid_pairs
+
+  def get_destination_file_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_FILE_PAIRS')
+
+    return self.destination_file_pairs
+
+  def get_destination_copy_jobid_pairs(self):
+    self.validate('FILE_LOADS', 'DESTINATION_COPY_JOBID_PAIRS')
+
+    return self.destination_copy_jobid_pairs
+
+  def get_failed_rows(self):
+    self.validate('STREAMING_INSERTS', 'FAILED_ROWS')
+
+    return self.failed_rows
+
+  def get_failed_rows_with_errors(self):

Review Comment:
   let me know if that makes sense.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method = method
+    self.destination = destination
+    self.schema = schema
+    self.destination_load_jobid_pairs = destination_load_jobid_pairs
+    self.destination_file_pairs = destination_file_pairs
+    self.destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self.failed_rows = failed_rows
+    self.failed_rows_with_errors = failed_rows_with_errors

Review Comment:
   please annotate the types for all of these attributes



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,

Review Comment:
   do you think we need `destination` and `schema`? Maybe we don't really need them. What are your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1157936456

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1167574676

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1167528765

   R: @pabloem, @johnjcasey
   Ready for second round of reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r908773930


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -235,53 +235,39 @@ def compute_table_name(row):
 [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
 [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
 
-Output of the WriteToBigQuery transform
+Chaining of operations after WriteToBigQuery
 ---------------------------------------
+WritToBigQuery returns an object with several PCollections that consist of
+metadata about the write operations. These are useful to inspect the write
+operation and follow up on the results. Often, the simplest use case is to
+chain an operation after writing data to BigQuery.
 
-Writing to BigQuery returns a WriteResult object that includes metadata
-relating to the write you configured. This data can be used in later steps
-in your pipeline:::
-
-  schema = {'fields': [
-      {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
-
-  error_schema = {'fields': [
-      {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
-      {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
-      {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
-
-  with Pipeline() as p:
-    result = (p
-      | 'Create Columns' >> beam.Create([
-              {'column': 'value'},
-              {'bad_column': 'bad_value'}
-            ])
-      | 'Write Data' >> WriteToBigQuery(
-              method=WriteToBigQuery.Method.STREAMING_INSERTS,
-              table=my_table,
-              schema=schema,
-              insert_retry_strategy=RetryStrategy.RETRY_NEVER
-            ))
-
-    _ = (result.failed_rows_with_errors
-      | 'Get Errors' >> beam.Map(lambda e: {
-              "destination": e[0],
-              "row": json.dumps(e[1]),
-              "error_message": e[2][0]['message']
-            })
-      | 'Write Errors' >> WriteToBigQuery(
-              method=WriteToBigQuery.Method.STREAMING_INSERTS,
-              table=error_log_table,
-              schema=error_schema,
-            ))

Review Comment:
   You don't need to remove this example. I quite like it. Up to you though : )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r909051541


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2531,6 +2455,126 @@ def deserialize(side_inputs):
     return WriteToBigQuery(**config)
 
 
+class WriteResult:
+  """The result of a WriteToBigQuery transform.
+  """
+  def __init__(
+      self,
+      method: WriteToBigQuery.Method = None,
+      destination_load_jobid_pairs: PCollection[Tuple[
+          str, bigquery.JobReference]] = None,
+      destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] = None,
+      destination_copy_jobid_pairs: PCollection[Tuple[
+          str, bigquery.JobReference]] = None,
+      failed_rows: PCollection[Tuple[str, dict]] = None,
+      failed_rows_with_errors: PCollection[Tuple[str, dict, list]] = None):
+
+    self._method = method
+    self._destination_load_jobid_pairs = destination_load_jobid_pairs
+    self._destination_file_pairs = destination_file_pairs
+    self._destination_copy_jobid_pairs = destination_copy_jobid_pairs
+    self._failed_rows = failed_rows
+    self._failed_rows_with_errors = failed_rows_with_errors

Review Comment:
   @pabloem 
   
   Updated type annotations, can you double check?
   Also, I moved the annotations to be in the parameters because I see it more commonly like that. Let met know if that works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 closed pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 closed pull request #21872: Standardizing output of WriteToBigQuery
URL: https://github.com/apache/beam/pull/21872


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r907642663


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1544,112 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  """The result of a WriteToBigQuery transform.
+  """
+  def __init__(
+      self,
+      method=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method: str = method
+    self._destination_load_jobid_pairs: PCollection = destination_load_jobid_pairs
+    self._destination_file_pairs: PCollection = destination_file_pairs
+    self._destination_copy_jobid_pairs: PCollection = destination_copy_jobid_pairs
+    self._failed_rows: PCollection = failed_rows
+    self._failed_rows_with_errors: PCollection = failed_rows_with_errors
+
+    from apache_beam.io.gcp.bigquery_file_loads import BigQueryBatchFileLoads
+    self.attributes = {
+        BigQueryWriteFn.FAILED_ROWS: WriteResult.failed_rows,
+        BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS: WriteResult.
+        failed_rows_with_errors,
+        BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS: WriteResult.
+        destination_load_jobid_pairs,
+        BigQueryBatchFileLoads.DESTINATION_FILE_PAIRS: WriteResult.
+        destination_file_pairs,
+        BigQueryBatchFileLoads.DESTINATION_COPY_JOBID_PAIRS: WriteResult.
+        destination_copy_jobid_pairs,
+    }
+
+  def validate(self, method, attribute):
+    if self.method != method:
+      raise AttributeError(
+          f'Cannot get {attribute} because it is not produced '
+          f'by {self.method} write method. Note: only {method} '
+          'produces this attribute.')
+
+  @property
+  def destination_load_jobid_pairs(self):
+    """A ``FILE_LOADS`` method attribute
+
+    Returns: A PCollection of the table destinations that were successfully
+      loaded to using the batch load API, along with the load job IDs.
+
+    Raises: AttributeError: if accessed with a write method besides ``FILE_LOADS``."""
+    self.validate('FILE_LOADS', 'DESTINATION_JOBID_PAIRS')
+
+    return self._destination_load_jobid_pairs
+
+  @property
+  def destination_file_pairs(self):
+    """A ``FILE_LOADS`` method attribute
+
+    Returns: A PCollection of the table destinations along with the
+      temp files used as sources to load from.
+
+    Raises: AttributeError: if accessed with a write method besides ``FILE_LOADS``."""
+    self.validate('FILE_LOADS', 'DESTINATION_FILE_PAIRS')
+
+    return self._destination_file_pairs
+
+  @property
+  def destination_copy_jobid_pairs(self):
+    """A ``FILE_LOADS`` method attribute
+
+    Returns: A PCollection of the table destinations that were successfully
+      copied to, along with the copy job ID.
+
+    Raises: AttributeError: if accessed with a write method besides ``FILE_LOADS``."""
+    self.validate('FILE_LOADS', 'DESTINATION_COPY_JOBID_PAIRS')
+
+    return self._destination_copy_jobid_pairs
+
+  @property
+  def failed_rows(self):
+    """A ``STREAMING_INSERTS`` method attribute
+
+    Returns: A PCollection of rows that failed when inserting to BigQuery.
+
+    Raises: AttributeError: if accessed with a write method besides ``STREAMING_INSERTS``."""
+    self.validate('STREAMING_INSERTS', 'FAILED_ROWS')
+
+    return self._failed_rows
+
+  @property
+  def failed_rows_with_errors(self):

Review Comment:
   Please also add the return types for these methods, since Python may not be able to infer them from the type annotations on the attributes (i.e. `-> PCollection[...]`)



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -235,6 +235,54 @@ def compute_table_name(row):
 [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
 [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
 
+Output of the WriteToBigQuery transform
+---------------------------------------
+
+Writing to BigQuery returns a WriteResult object that includes metadata
+relating to the write you configured. This data can be used in later steps
+in your pipeline:::
+
+  schema = {'fields': [
+      {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+  error_schema = {'fields': [
+      {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
+      {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
+
+  with Pipeline() as p:
+    result = (p
+      | 'Create Columns' >> beam.Create([
+              {'column': 'value'},
+              {'bad_column': 'bad_value'}
+            ])
+      | 'Write Data' >> WriteToBigQuery(
+              method=WriteToBigQuery.Method.STREAMING_INSERTS,
+              table=my_table,
+              schema=schema,
+              insert_retry_strategy=RetryStrategy.RETRY_NEVER
+            ))
+
+    _ = (result.failed_rows_with_errors
+      | 'Get Errors' >> beam.Map(lambda e: {
+              "destination": e[0],
+              "row": json.dumps(e[1]),
+              "error_message": e[2][0]['message']
+            })
+      | 'Write Errors' >> WriteToBigQuery(
+              method=WriteToBigQuery.Method.STREAMING_INSERTS,
+              table=error_log_table,
+              schema=error_schema,
+            ))
+
+Attributes can be accessed using dot notation or bracket notation:
+
+result.failed_rows                  <--> result['FailedRows']
+result.failed_rows_with_errors      <--> result['FailedRowsWithErrors']
+result.destination_load_jobid_pairs <--> result['destination_load_jobid_pairs']
+result.destination_file_pairs       <--> result['destination_file_pairs']
+result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']

Review Comment:
   This looks great. Thanks! Do you know whether lines 280-284 show up as code/monospace font? Maybe you need to add backticks or indentantion to make it look as code?
   
   Finally, let's add a sub-section that says something like this:
   
   ```
   ### Chaining of operations after WriteToBigQuery
   
   WriteToBigQuery returns an object with several PCollections with metadata about the write operations. These are
   usfeul to inspect the write operation; however, often the simplest use case is to chain an operation after writing data
   to BigQuery. 
   
   To chain an operation, one can chain it after one of the output PCollections. A generic way in which one could chain
   this operation (independent of write method) would be:::
   
     def chain_after(result):
       try:
         # This should work for FILE_LOADS where we run load and possibly copy jobs.
         return (result.load_jobid_pairs, result.copy_jobid_pairs) | beam.Flatten()
       except AttributeError:
         return result.failed_rows
   
     result = (pcoll | WriteToBigQuery(...))
   
     _ = (chain_after(result) 
          | beam.Reshuffle()  # Force a 'commit' of the intermediate date
          | MyOperationAfterWriteToBQ())
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem merged PR #21872:
URL: https://github.com/apache/beam/pull/21872


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r907641916


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1544,112 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  """The result of a WriteToBigQuery transform.
+  """
+  def __init__(
+      self,
+      method=None,
+      destination_load_jobid_pairs=None,
+      destination_file_pairs=None,
+      destination_copy_jobid_pairs=None,
+      failed_rows=None,
+      failed_rows_with_errors=None):
+
+    self.method: str = method
+    self._destination_load_jobid_pairs: PCollection = destination_load_jobid_pairs
+    self._destination_file_pairs: PCollection = destination_file_pairs
+    self._destination_copy_jobid_pairs: PCollection = destination_copy_jobid_pairs
+    self._failed_rows: PCollection = failed_rows
+    self._failed_rows_with_errors: PCollection = failed_rows_with_errors

Review Comment:
   thanks! can you add the subtype for the `PCollection` types? (e.g. `PCollection[Tuple[str, str]]` for destination_load_jobid_pairs, etc).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r907540523


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:

Review Comment:
   I added some documentation at the top of the file as well as next to each attribute. I added an example and mentioned how attributes can be accessed with dot and bracket notation. Let me know your thoughts and if there's anything I should add/remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r906871936


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -1494,6 +1494,91 @@ def writer(self, test_bigquery_client=None, buffer_size=None):
         buffer_size=buffer_size)
 
 
+class WriteResult:
+  def __init__(
+      self,
+      method=None,
+      destination=None,
+      schema=None,

Review Comment:
   For the scope of this PR, I agree we don't need them. Maybe down the line we might find it useful to convey more information about the write configuration in the return object.
   
   I went ahead and removed those attributes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1173178940

   Run Python 3.8 PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on code in PR #21872:
URL: https://github.com/apache/beam/pull/21872#discussion_r908777468


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -235,53 +235,39 @@ def compute_table_name(row):
 [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert
 [3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
 
-Output of the WriteToBigQuery transform
+Chaining of operations after WriteToBigQuery
 ---------------------------------------
+WritToBigQuery returns an object with several PCollections that consist of
+metadata about the write operations. These are useful to inspect the write
+operation and follow up on the results. Often, the simplest use case is to
+chain an operation after writing data to BigQuery.
 
-Writing to BigQuery returns a WriteResult object that includes metadata
-relating to the write you configured. This data can be used in later steps
-in your pipeline:::
-
-  schema = {'fields': [
-      {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}
-
-  error_schema = {'fields': [
-      {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'},
-      {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'},
-      {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}
-
-  with Pipeline() as p:
-    result = (p
-      | 'Create Columns' >> beam.Create([
-              {'column': 'value'},
-              {'bad_column': 'bad_value'}
-            ])
-      | 'Write Data' >> WriteToBigQuery(
-              method=WriteToBigQuery.Method.STREAMING_INSERTS,
-              table=my_table,
-              schema=schema,
-              insert_retry_strategy=RetryStrategy.RETRY_NEVER
-            ))
-
-    _ = (result.failed_rows_with_errors
-      | 'Get Errors' >> beam.Map(lambda e: {
-              "destination": e[0],
-              "row": json.dumps(e[1]),
-              "error_message": e[2][0]['message']
-            })
-      | 'Write Errors' >> WriteToBigQuery(
-              method=WriteToBigQuery.Method.STREAMING_INSERTS,
-              table=error_log_table,
-              schema=error_schema,
-            ))

Review Comment:
   Ah I didn't want to crowd the space with it, I felt like your example was more succinct.
   
   But I'm prob being a little too conservative, more detail for users is better. I'll bring it back 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1176818821

   LGTM thanks @ahmedabu98 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1155747128

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21872: Standardizing output of WriteToBigQuery

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21872:
URL: https://github.com/apache/beam/pull/21872#issuecomment-1155747129

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org