You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/13 04:53:00 UTC

[jira] [Updated] (BEAM-11741) apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on duplicate primary key

     [ https://issues.apache.org/jira/browse/BEAM-11741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-11741:
-----------------------------------
    Status: Open  (was: Triage Needed)

> apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on duplicate primary key
> ---------------------------------------------------------------------------------------
>
>                 Key: BEAM-11741
>                 URL: https://issues.apache.org/jira/browse/BEAM-11741
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>    Affects Versions: 2.27.0
>         Environment: Google Cloud Platform Dataflow
>            Reporter: Nahian-Al Hasan
>            Priority: P2
>              Labels: GCP, newbie
>
> Actual Behaviour
>  The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner fails on the exception below and the entire pipeline crashes.
> Expected Behaviour
>  The apache_beam.io.gcp.experimental.spannerio.WriteToSpanner module handles exceptions gracefully and does not crash the pipeline.
> It isn't possible to implement error-handling in pipeline code. It would be easier to just handle the exception inside the `process` function.
> Please see the logs below for more information.  
> {code:java}
> main.py:91: FutureWarning: ReadFromSpanner is experimental. No backwards-compatibility guarantees.main.py:91: FutureWarning: ReadFromSpanner is experimental. No backwards-compatibility guarantees.  sql=sqlmain.py:102: FutureWarning: WriteToSpanner is experimental. No backwards-compatibility guarantees.  database_id=importer_options.DEST_SPANNER_DATASET_ID,warning: sdist: standard file not found: should have one of README, README.rst, README.txt, README.md
> WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.Traceback (most recent call last):  File "main.py", line 110, in <module>    run()  File "main.py", line 106, in run    result.wait_until_finish()  File "/home/notion/.local/share/virtualenvs/ods-to-ods-bigquery-EZMTrMjb/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1665, in wait_until_finish    self)apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:Traceback (most recent call last):  File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable    return callable_(*args, **kwargs)  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__    return _end_unary_response_blocking(state, call, False, None)  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking    raise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.ALREADY_EXISTS details = "Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists" debug_error_string = "{"created":"@1612247321.800986805","description":"Error received from peer ipv4:XXX.XXX.XX.XX:XXX","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists","grpc_status":6}">
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 588, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 1098, in process    batch_func(**m.kwargs)  File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", line 476, in __exit__    self._batch.commit()  File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", line 154, in commit    metadata=metadata,  File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py", line 1556, in commit    request, retry=retry, timeout=timeout, metadata=metadata  File "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__    return wrapped_func(*args, **kwargs)  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func    on_error=on_error,  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target    return target()  File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout    return func(*args, **kwargs)  File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable    six.raise_from(exceptions.from_grpc_error(exc), exc)  File "<string>", line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work    work_executor.execute()  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute    op.start()  File "dataflow_worker/shuffle_operations.py", line 63, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File "dataflow_worker/shuffle_operations.py", line 64, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File "dataflow_worker/shuffle_operations.py", line 79, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File "dataflow_worker/shuffle_operations.py", line 80, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File "dataflow_worker/shuffle_operations.py", line 84, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "dataflow_worker/shuffle_operations.py", line 261, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process  File "dataflow_worker/shuffle_operations.py", line 268, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process  File "apache_beam/runners/common.py", line 891, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 158, in apache_beam.runners.worker.operations.ConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 768, in apache_beam.runners.common.PerWindowInvoker.invoke_process  File "apache_beam/runners/common.py", line 886, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented  File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback    raise exc.with_traceback(traceback)  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process  File "apache_beam/runners/common.py", line 588, in apache_beam.runners.common.SimpleInvoker.invoke_process  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/experimental/spannerio.py", line 1098, in process    batch_func(**m.kwargs)  File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/database.py", line 476, in __exit__    self._batch.commit()  File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/batch.py", line 154, in commit    metadata=metadata,  File "/usr/local/lib/python3.7/site-packages/google/cloud/spanner_v1/gapic/spanner_client.py", line 1556, in commit    request, retry=retry, timeout=timeout, metadata=metadata  File "/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py", line 145, in __call__    return wrapped_func(*args, **kwargs)  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func    on_error=on_error,  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target    return target()  File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout    return func(*args, **kwargs)  File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable    six.raise_from(exceptions.from_grpc_error(exc), exc)  File "<string>", line 3, in raise_fromgoogle.api_core.exceptions.AlreadyExists: 409 Row [0000085c-0fca-5c04-a538-3d44e4ec9d23] in table TestItems already exists [while running 'Write Mutations to destination Spanner/Writing to spanner']
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)