You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Maksym Skorupskyi (Jira)" <ji...@apache.org> on 2021/04/07 11:04:00 UTC

[jira] [Updated] (BEAM-12122) Python IO MongoDB: integer and string `_id` keys are not supported.

     [ https://issues.apache.org/jira/browse/BEAM-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maksym Skorupskyi updated BEAM-12122:
-------------------------------------
    Description: 
Usually, *ObjectId* is using for *`_id`* key, but sometimes you can deal with *int* and *str* keys. Reading from such MongoDB collection will raise errors. 
h2. Integer `_id` key:
{code:java}
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
    element)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
    range_tracker = element_source.get_range_tracker(None, None)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
    start_position, stop_position)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
    stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
    id_number = _ObjectIdHelper.id_to_int(object_id)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
    ints = struct.unpack('>III', id.binary)
AttributeError: 'int' object has no attribute 'binary'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
    sys.exit(run())
  File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
    options=options, ingestion_ts=ingestion_ts, table_name=table_name
  File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
    self.result = self.run()
  File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
    return self.runner.run_pipeline(self, self._options)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
    bundle_context_manager,
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
    bundle_manager)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
    element.data)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
    element)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
    range_tracker = element_source.get_range_tracker(None, None)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
    start_position, stop_position)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
    stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
    id_number = _ObjectIdHelper.id_to_int(object_id)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
    ints = struct.unpack('>III', id.binary)
AttributeError: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
{code}
h2. String `_id` key:
{code:java}
Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process element) File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction range_tracker = element_source.get_range_tracker(None, None) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker start_position, stop_position) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id id_number = _ObjectIdHelper.id_to_int(object_id) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int ints = struct.unpack('>III', id.binary) AttributeError: 'str' object has no attribute 'binary'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module> sys.exit(run()) File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run options=options, ingestion_ts=ingestion_ts, table_name=table_name File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__ self.result = self.run() File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run return self.runner.run_pipeline(self, self._options) File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline return runner.run_pipeline(pipeline, options) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api return self.run_stages(stage_context, stages) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages bundle_context_manager, File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage bundle_manager) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle data_input, data_output, input_timers, expected_timer_output) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push response = self.worker.do_instruction(request) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle element.data) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process element) File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction range_tracker = element_source.get_range_tracker(None, None) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker start_position, stop_position) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id id_number = _ObjectIdHelper.id_to_int(object_id) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int ints = struct.unpack('>III', id.binary) AttributeError: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction'] ERROR:root:mongo_to_bq_raw.py: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction'] 
{code}

  was:
Usually, *ObjectId* is using for *`_id`* key, but sometimes you can deal with *int* and *str* keys. Reading from such MongoDB collection will raise errors. 
h2. Integer `_id` key:
{code:java}
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
    element)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
    range_tracker = element_source.get_range_tracker(None, None)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
    start_position, stop_position)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
    stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
    id_number = _ObjectIdHelper.id_to_int(object_id)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
    ints = struct.unpack('>III', id.binary)
AttributeError: 'int' object has no attribute 'binary'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
    sys.exit(run())
  File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
    options=options, ingestion_ts=ingestion_ts, table_name=table_name
  File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
    self.result = self.run()
  File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
    return self.runner.run_pipeline(self, self._options)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
    pipeline.to_runner_api(default_environment=self._default_environment))
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
    bundle_context_manager,
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
    bundle_manager)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
    data_input, data_output, input_timers, expected_timer_output)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
    element.data)
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
    element)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
    range_tracker = element_source.get_range_tracker(None, None)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
    start_position, stop_position)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
    stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
    id_number = _ObjectIdHelper.id_to_int(object_id)
  File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
    ints = struct.unpack('>III', id.binary)
AttributeError: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
{code}
h2. String `_id` key:


> Python IO MongoDB: integer and string `_id` keys are not supported.
> -------------------------------------------------------------------
>
>                 Key: BEAM-12122
>                 URL: https://issues.apache.org/jira/browse/BEAM-12122
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-mongodb
>    Affects Versions: 2.27.0, 2.28.0
>         Environment: Google Cloud Dataflow
>            Reporter: Maksym Skorupskyi
>            Assignee: Yichi Zhang
>            Priority: P2
>              Labels: MongoDB, Python, mongo, mongodb, python
>
> Usually, *ObjectId* is using for *`_id`* key, but sometimes you can deal with *int* and *str* keys. Reading from such MongoDB collection will raise errors. 
> h2. Integer `_id` key:
> {code:java}
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'int' object has no attribute 'binary'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module>
>     sys.exit(run())
>   File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run
>     options=options, ingestion_ts=ingestion_ts, table_name=table_name
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__
>     self.result = self.run()
>   File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages
>     bundle_context_manager,
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage
>     bundle_manager)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
>     element.data)
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process
>     element)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker
>     start_position, stop_position)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions
>     stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id
>     id_number = _ObjectIdHelper.id_to_int(object_id)
>   File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int
>     ints = struct.unpack('>III', id.binary)
> AttributeError: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> ERROR:root:mongo_to_bq_raw.py: 'int' object has no attribute 'binary' [while running 'sources/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
> {code}
> h2. String `_id` key:
> {code:java}
> Traceback (most recent call last): File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process element) File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction range_tracker = element_source.get_range_tracker(None, None) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker start_position, stop_position) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id id_number = _ObjectIdHelper.id_to_int(object_id) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int ints = struct.unpack('>III', id.binary) AttributeError: 'str' object has no attribute 'binary'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 325, in <module> sys.exit(run()) File "/data-pipelines/data_pipelines/mongo_to_bq_raw.py", line 240, in run options=options, ingestion_ts=ingestion_ts, table_name=table_name File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 582, in __exit__ self.result = self.run() File "/venv/lib/python3.7/site-packages/apache_beam/pipeline.py", line 561, in run return self.runner.run_pipeline(self, self._options) File "/venv/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 126, in run_pipeline return runner.run_pipeline(pipeline, options) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_pipeline pipeline.to_runner_api(default_environment=self._default_environment)) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 193, in run_via_runner_api return self.run_stages(stage_context, stages) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 360, in run_stages bundle_context_manager, File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 556, in _run_stage bundle_manager) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 596, in _run_bundle data_input, data_output, input_timers, expected_timer_output) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 897, in process_bundle result_future = self._worker_handler.control_conn.push(process_bundle_req) File "/venv/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 380, in push response = self.worker.do_instruction(request) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle element.data) File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/venv/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs File "/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1426, in process element) File "/venv/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1545, in initial_restriction range_tracker = element_source.get_range_tracker(None, None) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 243, in get_range_tracker start_position, stop_position) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 359, in _replace_none_positions stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 418, in increment_id id_number = _ObjectIdHelper.id_to_int(object_id) File "/venv/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 383, in id_to_int ints = struct.unpack('>III', id.binary) AttributeError: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction'] ERROR:root:mongo_to_bq_raw.py: 'str' object has no attribute 'binary' [while running 'tagged_objects/Read Collection/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction'] 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)