You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Muhammad Anas (Jira)" <ji...@apache.org> on 2021/03/02 19:00:05 UTC
[jira] [Created] (BEAM-11915) Python MondgoDB Connector: TypeError:
'>=' not supported between instances of 'str' and 'ObjectId'
Muhammad Anas created BEAM-11915:
------------------------------------
Summary: Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
Key: BEAM-11915
URL: https://issues.apache.org/jira/browse/BEAM-11915
Project: Beam
Issue Type: Bug
Components: io-py-mongodb
Reporter: Muhammad Anas
Assignee: Yichi Zhang
I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.
Here is a minimal pipeline code that produces the error:
{code:python}
"""A mongodb io workflow."""
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromMongoDB
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
# Read mongodb documents into a PCollection.
step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
db='mydb',
coll='mycoll',
# bucket_auto=True,
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
{code}
Stacktrace when using connecting to local standalone MongoDB:
{noformat}
Traceback (most recent call last):
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
for part, size in self.restriction_provider.split_and_size(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
for part in self.split(element, restriction):
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
for source_bundle in source_bundles:
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
split_keys = self._get_split_keys(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\mongo-pipe-test.py", line 60, in <module>
run()
File ".\mongo-pipe-test.py", line 45, in run
def format_result(step_run):
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
self.result = self.run()
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
return self.runner.run_pipeline(self, self._options)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
stage_results = self._run_stage(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
last_result, deferred_inputs, fired_timers = self._run_bundle(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
response = self.worker.do_instruction(request)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
self.output(decoded_value)
File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
for part, size in self.restriction_provider.split_and_size(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
for part in self.split(element, restriction):
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
for source_bundle in source_bundles:
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
split_keys = self._get_split_keys(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
{noformat}
Stacktrace when using connecting to MongoDB hosted on Atlas:
{noformat}
Traceback (most recent call last):
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
for part, size in self.restriction_provider.split_and_size(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
for part in self.split(element, restriction):
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
for source_bundle in source_bundles:
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File ".\mongo-pipe-test.py", line 59, in <module>
run()
File ".\mongo-pipe-test.py", line 44, in run
def format_result(step_run):
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
self.result = self.run()
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
return self.runner.run_pipeline(self, self._options)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
stage_results = self._run_stage(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
last_result, deferred_inputs, fired_timers = self._run_bundle(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
response = self.worker.do_instruction(request)
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
self.output(decoded_value)
File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
for part, size in self.restriction_provider.split_and_size(
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
for part in self.split(element, restriction):
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
for source_bundle in source_bundles:
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)