You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/05/08 17:20:01 UTC

[jira] [Commented] (BEAM-12119) Python IO MongoDB: integer and string `_id` keys are not supported

    [ https://issues.apache.org/jira/browse/BEAM-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17341367#comment-17341367 ] 

Beam JIRA Bot commented on BEAM-12119:
--------------------------------------

This issue is assigned but has not received an update in 30 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned.

> Python IO MongoDB: integer and string `_id` keys are not supported
> ------------------------------------------------------------------
>
>                 Key: BEAM-12119
>                 URL: https://issues.apache.org/jira/browse/BEAM-12119
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-mongodb
>    Affects Versions: 2.27.0, 2.28.0
>            Reporter: Maksym Skorupskyi
>            Assignee: Yichi Zhang
>            Priority: P2
>              Labels: MongoDB, Python, stale-assigned
>
> Python IO MongoDB: integer and string `_id` keys are not supported.
>  
> Usually *ObjectId* is using for *`_id`* key, but sometimes you can deal with *int* and *str* keys. Reading from such MongoDB collection will raise errors. 
> h2. Integer `_id` key:
> {code:java}
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'int' object has no attribute 'binary'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
>     sys.exit(run())
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
>     options=options, ingestion_ts=ingestion_ts, table_name=table_name
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
>     self.result = self.run()
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
>     bundle_context_manager,
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
>     bundle_manager)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
>     element.data)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> {code}
> h2. String `_id` key:
> {code:java}
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'str' object has no attribute 'binary'During handling of the above exception, another exception occurred:Traceback (most recent call last):
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
>     sys.exit(run())
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
>     options=options, ingestion_ts=ingestion_ts, table_name=table_name
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
>     self.result = self.run()
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
>     bundle_context_manager,
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
>     bundle_manager)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
>     element.data)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> ERROR:root:mongo_to_bq_raw.py: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)