You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "tomlynchRNA (via GitHub)" <gi...@apache.org> on 2023/01/31 06:41:44 UTC

[GitHub] [beam] tomlynchRNA opened a new issue, #25225: [Feature Request]: Python SDK, ability to disable _KNOWN_TABLES in _create_table_if_needed

tomlynchRNA opened a new issue, #25225:
URL: https://github.com/apache/beam/issues/25225

   ### What would you like to happen?
   
   Hello,
   
   I am running an Apache Beam pipeline and streaming inserts into Google BigQuery with the Python SDK.
   
   I am facing an issue where because Beam only creates tables once before storing the name a list called `_KNOWN_TABLES`, if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.
   
   You can see here where the `_create_table_if_needed` method returns early if the table is already in `_KNOWN_TABLES`:
   https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1463
   
   I have the pipeline create disposition set to the default CREATE_IF_NEEDED, therefore I expect that if the table does not exist (and **is needed**) to stream inserts, that it will be created.
   
   I propose that a mechanism be implemented allowing this behaviour, and would be willing to make the changes & open a pull request.
   
   Looking forward to your thoughts,
   Tom
   
   ### Issue Priority
   
   Priority: 2 (default / most feature requests should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on issue #25225: [Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on issue #25225:
URL: https://github.com/apache/beam/issues/25225#issuecomment-1462947173

   That said, I'm OK with getting this fix in if we are clear that it does not modify the guarantee (i.e. the pipeline may still fail of get stuck if output tables get deleted by third parties during execution).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tomlynchRNA commented on issue #25225: [Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition

Posted by "tomlynchRNA (via GitHub)" <gi...@apache.org>.
tomlynchRNA commented on issue #25225:
URL: https://github.com/apache/beam/issues/25225#issuecomment-1420193596

   > Hey @tomlynchRNA, can you provide a stack trace of the error you're seeing here?
   > 
   > > if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.
   
   <details>
   <summary>stack trace, quite big</summary>
   
   ```
   Error message from worker: generic::unknown: Traceback (most recent call last):
     File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1514, in process
       return self._flush_batch(destination)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1564, in _flush_batch
       ignore_unknown_values=self.ignore_unknown_columns)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1264, in insert_rows
       ignore_unknown_values=ignore_unknown_values)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
       return fun(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 713, in _insert_all_rows
       timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC)
     File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3604, in insert_rows_json
       timeout=timeout,
     File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api
       return call()
     File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 354, in retry_wrapped_func
       on_error=on_error,
     File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 191, in retry_target
       return target()
     File "/usr/local/lib/python3.7/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
       raise exceptions.from_http_response(response)
   google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/projects/REDACTED/datasets/REDACTED/tables/REDACTED/insertAll?prettyPrint=false: Not found: Table REDACTED
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
       response = task()
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
       getattr(request, request_type), request.instruction_id)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle
       element.timer_family_id, timer_data)
     File "apache_beam/runners/worker/operations.py", line 931, in apache_beam.runners.worker.operations.DoOperation.process_timer
     File "apache_beam/runners/common.py", line 1453, in apache_beam.runners.common.DoFnRunner.process_user_timer
     File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1450, in apache_beam.runners.common.DoFnRunner.process_user_timer
     File "apache_beam/runners/common.py", line 579, in apache_beam.runners.common.DoFnInvoker.invoke_user_timer
     File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1491, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 623, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1581, in apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1694, in apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 907, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 908, in apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1419, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1417, in apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 624, in apache_beam.runners.common.SimpleInvoker.invoke_process
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1514, in process
       return self._flush_batch(destination)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1564, in _flush_batch
       ignore_unknown_values=self.ignore_unknown_columns)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1264, in insert_rows
       ignore_unknown_values=ignore_unknown_values)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
       return fun(*args, **kwargs)
     File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 713, in _insert_all_rows
       timeout=BQ_STREAMING_INSERT_TIMEOUT_SEC)
     File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 3604, in insert_rows_json
       timeout=timeout,
     File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api
       return call()
     File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 354, in retry_wrapped_func
       on_error=on_error,
     File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 191, in retry_target
       return target()
     File "/usr/local/lib/python3.7/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
       raise exceptions.from_http_response(response)
   google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/REDACTED/datasets/REDACTED/tables/REDACTED/insertAll?prettyPrint=false: Not found: Table REDACTED [while running 'Write Custom Vars to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-66']
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on issue #25225: [Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on issue #25225:
URL: https://github.com/apache/beam/issues/25225#issuecomment-1437693540

   I think in general, for source/sink I/O we assume read/write resources to not be deleted by third parties. Trying to add this as a feature to Beam I/O in general will probably will need a lot of re-work (even though we might might be able to fix for this instance).
   
   Also, CREATE_IF_NEEDED I think (as it's defined to Beam I/O right now) means that tables will be created once per pipeline (not that tables will be re-created if they are deleted at any state of the pipeline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on issue #25225: [Feature Request]: Mechanism to re-create deleted table if it is already in _KNOWN_TABLES, according to CREATE_IF_NEEDED create disposition

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on issue #25225:
URL: https://github.com/apache/beam/issues/25225#issuecomment-1419046229

   Hey @tomlynchRNA, can you provide a stack trace of the error you're seeing here? 
   > if the table is deleted while the pipeline is running after Beam having already created it, further inserts will error out with a 404 and not attempt to re-create it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org