You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/13 04:53:00 UTC
[jira] [Updated] (BEAM-11741)
apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on duplicate
primary key
[ https://issues.apache.org/jira/browse/BEAM-11741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-11741:
-----------------------------------
Status: Open (was: Triage Needed)
> apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on duplicate primary key
> ---------------------------------------------------------------------------------------
>
> Key: BEAM-11741
> URL: https://issues.apache.org/jira/browse/BEAM-11741
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Affects Versions: 2.27.0
> Environment: Google Cloud Platform Dataflow
> Reporter: Nahian-Al Hasan
> Priority: P2
> Labels: GCP, newbie
>
> Actual Behaviour
> The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the exception below and the entire pipeline crashes.
> Expected Behaviour
> The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles exceptions gracefully and does not crash the pipeline.
> It isn't possible to implement error-handling in pipeline code. It would be easier to just handle the exception inside the `process` function.
> Please see the logs below for more information.
> {code:java}
> main.py:91: FutureWarning: ReadFromSpanner is experimental. No backwards-compatibility guarantees.main.py:91: FutureWarning: ReadFromSpanner is experimental. No backwards-compatibility guarantees. sql=sqlmain.py:102: FutureWarning: WriteToSpanner is experimental. No backwards-compatibility guarantees. database_id=importer_options.DEST_SPANNER_DATASET_ID,warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md
> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.Traceback (most recent call last): File "main.py", line 110, in <module> run() File "main.py", line 106, in run result.wait_until_finish() File "/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1665, in wait_until_finish self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable return callable_(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__ return _end_unary_response_blocking(state, call, False, None) File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking raise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.ALREADY_EXISTS details = "Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists" debug_error_string = "{"created":"@1612247321.800986805","description":"Error received from peer ipv4:XXX.XXX.XX.XX:XXX","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists","grpc_status":6}">
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 588, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 1098, in process batch_func(**m.kwargs) File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", line 476, in __exit__ self._batch.commit() File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", line 154, in commit metadata=metadata, File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py", line 1556, in commit request, retry=retry, timeout=timeout, metadata=metadata File "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__ return wrapped_func(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func on_error=on_error, File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target return target() File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout return func(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 886, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 588, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 1098, in process batch_func(**m.kwargs) File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", line 476, in __exit__ self._batch.commit() File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", line 154, in commit metadata=metadata, File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py", line 1556, in commit request, retry=retry, timeout=timeout, metadata=metadata File "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__ return wrapped_func(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func on_error=on_error, File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target return target() File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout return func(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable six.raise_from(exceptions.from_grpc_error(exc), exc) File "<string>", line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists [while running 'Write Mutations to destination Spanner/Writing to spanner']
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)