You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Yichi Zhang (Jira)" <ji...@apache.org> on 2021/03/11 20:53:00 UTC
[jira] [Commented] (BEAM-11915) Python MondgoDB Connector:
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
[ https://issues.apache.org/jira/browse/BEAM-11915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299856#comment-17299856 ]
Yichi Zhang commented on BEAM-11915:
------------------------------------
Currently ReadFromMongoDB only supports collection with _id field type being the ObjectId type. Are you setting a custom _id field with your mongo db collection? if that is the case it won't work well with current implementation.
> Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> --------------------------------------------------------------------------------------------------
>
> Key: BEAM-11915
> URL: https://issues.apache.org/jira/browse/BEAM-11915
> Project: Beam
> Issue Type: Bug
> Components: io-py-mongodb
> Affects Versions: 2.28.0
> Reporter: Muhammad Anas
> Assignee: Yichi Zhang
> Priority: P2
>
> I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.
> *Python Version:* 3.8.8
> *Runner:* Direct
> *OS:* Windows 10 64 bit
> *apache-beam Python SDK version:* 2.28.0
> Here is a minimal pipeline code that produces the error:
> {code:python}
> """A mongodb io workflow."""
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromMongoDB
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> def run(argv=None, save_main_session=True):
> """Main entry point; defines and runs the pipeline."""
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--output',
> dest='output',
> required=True,
> help='Output file to write results to.')
> known_args, pipeline_args = parser.parse_known_args(argv)
> pipeline_options = PipelineOptions(pipeline_args)
> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
> with beam.Pipeline(options=pipeline_options) as p:
> # Read mongodb documents into a PCollection.
> step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
> db='mydb',
> coll='mycoll',
> # bucket_auto=True,
> )
> if __name__ == '__main__':
> logging.getLogger().setLevel(logging.INFO)
> run()
> {code}
>
> Stacktrace when connecting to local standalone MongoDB:
> {noformat}
> Traceback (most recent call last):
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
> for part, size in self.restriction_provider.split_and_size(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
> for part in self.split(element, restriction):
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
> for source_bundle in source_bundles:
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
> split_keys = self._get_split_keys(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
> if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File ".\mongo-pipe-test.py", line 60, in <module>
> run()
> File ".\mongo-pipe-test.py", line 45, in run
> def format_result(step_run):
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
> self.result = self.run()
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
> return self.runner.run_pipeline(self, self._options)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
> self._latest_run_result = self.run_via_runner_api(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
> stage_results = self._run_stage(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
> last_result, deferred_inputs, fired_timers = self._run_bundle(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
> result, splits = bundle_manager.process_bundle(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
> response = self.worker.do_instruction(request)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
> return getattr(self, request_type)(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
> input_op_by_transform_id[element.transform_id].process_encoded(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
> self.output(decoded_value)
> File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
> File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
> File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
> raise exc.with_traceback(traceback)
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
> for part, size in self.restriction_provider.split_and_size(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
> for part in self.split(element, restriction):
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
> for source_bundle in source_bundles:
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
> split_keys = self._get_split_keys(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
> if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
> {noformat}
> Stacktrace when connecting to MongoDB hosted on Atlas:
> {noformat}
> Traceback (most recent call last):
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
> for part, size in self.restriction_provider.split_and_size(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
> for part in self.split(element, restriction):
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
> for source_bundle in source_bundles:
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
> for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
> if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File ".\mongo-pipe-test.py", line 59, in <module>
> run()
> File ".\mongo-pipe-test.py", line 44, in run
> def format_result(step_run):
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
> self.result = self.run()
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
> return self.runner.run_pipeline(self, self._options)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
> self._latest_run_result = self.run_via_runner_api(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
> return self.run_stages(stage_context, stages)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
> stage_results = self._run_stage(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
> last_result, deferred_inputs, fired_timers = self._run_bundle(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
> result, splits = bundle_manager.process_bundle(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
> result_future = self._worker_handler.control_conn.push(process_bundle_req)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
> response = self.worker.do_instruction(request)
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
> return getattr(self, request_type)(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
> input_op_by_transform_id[element.transform_id].process_encoded(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
> self.output(decoded_value)
> File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
> File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
> File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
> File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
> File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
> File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
> raise exc.with_traceback(traceback)
> File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
> for part, size in self.restriction_provider.split_and_size(
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
> for part in self.split(element, restriction):
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
> for source_bundle in source_bundles:
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
> for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
> File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
> if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)