You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Muhammad Anas (Jira)" <ji...@apache.org> on 2021/03/02 19:05:00 UTC

[jira] [Updated] (BEAM-11915) Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

     [ https://issues.apache.org/jira/browse/BEAM-11915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Muhammad Anas updated BEAM-11915:
---------------------------------
    Description: 
I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.

*Python Version:* 3.8.8

*Runner:* Direct

*OS:* Windows 10 64 bit

*apache-beam Python SDK version:* 2.28.0

Here is a minimal pipeline code that produces the error:
{code:python}
"""A mongodb io workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromMongoDB
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  with beam.Pipeline(options=pipeline_options) as p:
    # Read mongodb documents into a PCollection.
    step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
                           db='mydb',
                           coll='mycoll',
                        #    bucket_auto=True,
                        )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

{code}
 

Stacktrace when connecting to local standalone MongoDB:
{noformat}
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
    split_keys = self._get_split_keys(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\mongo-pipe-test.py", line 60, in <module>
    run()
  File ".\mongo-pipe-test.py", line 45, in run
    def format_result(step_run):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
    self.result = self.run()
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
    return self.runner.run_pipeline(self, self._options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
    stage_results = self._run_stage(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
    split_keys = self._get_split_keys(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
{noformat}
Stacktrace when connecting to MongoDB hosted on Atlas:
{noformat}
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
    for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\mongo-pipe-test.py", line 59, in <module>
    run()
  File ".\mongo-pipe-test.py", line 44, in run
    def format_result(step_run):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
    self.result = self.run()
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
    return self.runner.run_pipeline(self, self._options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
    stage_results = self._run_stage(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
    for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
 {noformat}

  was:
I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.

Here is a minimal pipeline code that produces the error:
{code:python}
"""A mongodb io workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromMongoDB
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

def run(argv=None, save_main_session=True):
  """Main entry point; defines and runs the pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

  with beam.Pipeline(options=pipeline_options) as p:
    # Read mongodb documents into a PCollection.
    step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
                           db='mydb',
                           coll='mycoll',
                        #    bucket_auto=True,
                        )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

{code}
 

Stacktrace when using connecting to local standalone MongoDB:
{noformat}
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
    split_keys = self._get_split_keys(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\mongo-pipe-test.py", line 60, in <module>
    run()
  File ".\mongo-pipe-test.py", line 45, in run
    def format_result(step_run):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
    self.result = self.run()
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
    return self.runner.run_pipeline(self, self._options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
    stage_results = self._run_stage(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
    split_keys = self._get_split_keys(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
{noformat}

Stacktrace when using connecting to MongoDB hosted on Atlas:

{noformat}
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
    for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\mongo-pipe-test.py", line 59, in <module>
    run()
  File ".\mongo-pipe-test.py", line 44, in run
    def format_result(step_run):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
    self.result = self.run()
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
    return self.runner.run_pipeline(self, self._options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
    stage_results = self._run_stage(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
    last_result, deferred_inputs, fired_timers = self._run_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
    response = self.worker.do_instruction(request)
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
    for part, size in self.restriction_provider.split_and_size(
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
    for part in self.split(element, restriction):
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
    for source_bundle in source_bundles:
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
    for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
  File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
    if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
 {noformat}


> Python MondgoDB Connector: TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> --------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11915
>                 URL: https://issues.apache.org/jira/browse/BEAM-11915
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-mongodb
>            Reporter: Muhammad Anas
>            Assignee: Yichi Zhang
>            Priority: P2
>
> I am trying to use ReadFromMongoDB transform to read data from a MongoDB collection. I tried both with a MongoDB hosted on Atlas (with bucket_auto=True) and with a standalone server running locally (without bucket_auto=True). I am getting similar error in both cases, although it originates in different functions for each case.
> *Python Version:* 3.8.8
> *Runner:* Direct
> *OS:* Windows 10 64 bit
> *apache-beam Python SDK version:* 2.28.0
> Here is a minimal pipeline code that produces the error:
> {code:python}
> """A mongodb io workflow."""
> from __future__ import absolute_import
> import argparse
> import logging
> import re
> from past.builtins import unicode
> import apache_beam as beam
> from apache_beam.io import ReadFromMongoDB
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> def run(argv=None, save_main_session=True):
>   """Main entry point; defines and runs the pipeline."""
>   parser = argparse.ArgumentParser()
>   parser.add_argument(
>       '--output',
>       dest='output',
>       required=True,
>       help='Output file to write results to.')
>   known_args, pipeline_args = parser.parse_known_args(argv)
>   pipeline_options = PipelineOptions(pipeline_args)
>   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>   with beam.Pipeline(options=pipeline_options) as p:
>     # Read mongodb documents into a PCollection.
>     step_runs = p | 'Read' >> ReadFromMongoDB(uri='mongodb://127.0.0.1:27017',
>                            db='mydb',
>                            coll='mycoll',
>                         #    bucket_auto=True,
>                         )
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   run()
> {code}
>  
> Stacktrace when connecting to local standalone MongoDB:
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
>     split_keys = self._get_split_keys(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File ".\mongo-pipe-test.py", line 60, in <module>
>     run()
>   File ".\mongo-pipe-test.py", line 45, in run
>     def format_result(step_run):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
>     self.result = self.run()
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
>     stage_results = self._run_stage(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
>     return getattr(self, request_type)(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 220, in split
>     split_keys = self._get_split_keys(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 275, in _get_split_keys
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
> {noformat}
> Stacktrace when connecting to MongoDB hosted on Atlas:
> {noformat}
> Traceback (most recent call last):
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
>     for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId'
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File ".\mongo-pipe-test.py", line 59, in <module>
>     run()
>   File ".\mongo-pipe-test.py", line 44, in run
>     def format_result(step_run):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 580, in __exit__
>     self.result = self.run()
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\pipeline.py", line 559, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 133, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 182, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 193, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 357, in run_stages
>     stage_results = self._run_stage(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 548, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 594, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 896, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 380, in push
>     response = self.worker.do_instruction(request)
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 606, in do_instruction
>     return getattr(self, request_type)(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 999, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 228, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam\runners\worker\operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1241, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1306, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1401, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam\runners\worker\operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam\runners\worker\operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\worker\operations.py", line 719, in apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam\runners\common.py", line 1321, in apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "D:\projects\apache-beam-test\lib\site-packages\future\utils\__init__.py", line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam\runners\common.py", line 1239, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam\runners\common.py", line 587, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam\runners\common.py", line 1374, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1446, in process
>     for part, size in self.restriction_provider.split_and_size(
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\transforms\core.py", line 333, in split_and_size
>     for part in self.split(element, restriction):
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\iobase.py", line 1566, in split
>     for source_bundle in source_bundles:
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 212, in split
>     for bucket in self._get_auto_buckets(desired_bundle_size_in_mb,
>   File "D:\projects\apache-beam-test\lib\site-packages\apache_beam\io\mongodbio.py", line 292, in _get_auto_buckets
>     if start_pos >= _ObjectIdHelper.increment_id(end_pos, -1):
> TypeError: '>=' not supported between instances of 'str' and 'ObjectId' [while running 'Read/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']
>  {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)