You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Yichi Zhang (Jira)" <ji...@apache.org> on 2021/03/11 20:53:00 UTC

[jira] [Commented] (BEAM-11915) Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

    [ https://issues.apache.org/jira/browse/BEAM-11915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299856#comment-17299856 ] 

Yichi Zhang commented on BEAM-11915:
------------------------------------

Currently ReadFromMongoDB only supports collection with _id field type being the ObjectId type. Are you setting a custom _id field with your mongo db collection? if that is the case it won't work well with current implementation. 

> Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> --------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11915
>                 URL: https://issues.apache.org/jira/browse/BEAM-11915
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-mongodb
>    Affects Versions: 2.28.0
>            Reporter: Muhammad Anas
>            Assignee: Yichi Zhang
>            Priority: P2
>
> I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.
> *Python Version:* 3.8.8
> *Runner:* Direct
> *OS:* Windows 10 64 bit
> *apache-beam Python SDK version:* 2.28.0
> Here is a minimal pipeline code that produces the error:
> {code:python}
> """A mongodb io workflow."""
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromMongoDB
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> def run(argv=None, save_main_session=True):
>   """Main entry point; defines and runs the pipeline."""
>   parser = argparse.ArgumentParser()
>   parser.add_argument(
>       '--output',
>       dest='output',
>       required=True,
>       help='Output file to write results to.')
>   known_args, pipeline_args = parser.parse_known_args(argv)
>   pipeline_options = PipelineOptions(pipeline_args)
>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>   with beam.Pipeline(options=pipeline_options) as p:
>     # Read mongodb documents into a PCollection.
>     step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
>                            db='mydb',
>                            coll='mycoll',
>                         #    bucket_auto=True,
>                         )
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   run()
> {code}
>  
> Stacktrace when connecting to local standalone MongoDB:
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
>     split_keys = self._get_split_keys(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File ".\mongo-pipe-test.py", line 60, in <module>
>     run()
>   File ".\mongo-pipe-test.py", line 45, in run
>     def format_result(step_run):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
>     self.result = self.run()
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
>     stage_results = self._run_stage(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
>     return getattr(self, request_type)(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
>     split_keys = self._get_split_keys(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
> {noformat}
> Stacktrace when connecting to MongoDB hosted on Atlas:
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
>     for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File ".\mongo-pipe-test.py", line 59, in <module>
>     run()
>   File ".\mongo-pipe-test.py", line 44, in run
>     def format_result(step_run):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
>     self.result = self.run()
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
>     stage_results = self._run_stage(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
>     return getattr(self, request_type)(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
>     for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
>  {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)