You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Muhammad Anas (Jira)" <ji...@apache.org> on 2021/03/02 19:00:05 UTC

[jira] [Created] (BEAM-11915) Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

Muhammad Anas created BEAM-11915:
------------------------------------

             Summary: Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
                 Key: BEAM-11915
                 URL: https://issues.apache.org/jira/browse/BEAM-11915
             Project: Beam
          Issue Type: Bug
          Components: io-py-mongodb
            Reporter: Muhammad Anas
            Assignee: Yichi Zhang


I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.

Here is a minimal pipeline code that produces the error:
{code:python}
"""A mongodb io workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromMongoDB
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  with beam.Pipeline(options=pipeline_options) as p:
    # Read mongodb documents into a PCollection.
    step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
                           db='mydb',
                           coll='mycoll',
                        #    bucket_auto=True,
                        )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

{code}
 

Stacktrace when using connecting to local standalone MongoDB:
{noformat}
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
    split_keys = self._get_split_keys(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\mongo-pipe-test.py", line 60, in <module>
    run()
  File ".\mongo-pipe-test.py", line 45, in run
    def format_result(step_run):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
    self.result = self.run()
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
    return self.runner.run_pipeline(self, self._options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
    stage_results = self._run_stage(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
    split_keys = self._get_split_keys(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
{noformat}

Stacktrace when using connecting to MongoDB hosted on Atlas:

{noformat}
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
    for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\mongo-pipe-test.py", line 59, in <module>
    run()
  File ".\mongo-pipe-test.py", line 44, in run
    def format_result(step_run):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
    self.result = self.run()
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
    return self.runner.run_pipeline(self, self._options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
    stage_results = self._run_stage(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
    for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
 {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)