You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Geoff Hutton (Jira)" <ji...@apache.org> on 2021/03/02 22:03:00 UTC

[jira] [Commented] (BEAM-7256) Add support for allowDiskUse (AggregationOptions) in MongoDbIO

    [ https://issues.apache.org/jira/browse/BEAM-7256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17294012#comment-17294012 ] 

Geoff Hutton commented on BEAM-7256:
------------------------------------

We are encountering exactly this issue trying to use MongoDbIO (in Python).  We need to read from MongoDB Atlas, so we are required to pass  "bucket_auto=True" (as mentioned above by [~sandboxws]).  We get exactly the error mentioned in this issue, because the collections we are reading are too large to be sorted in memory.

Ideally we would like to be able to turn on allow_disk_use.

> Add support for allowDiskUse (AggregationOptions) in MongoDbIO 
> ---------------------------------------------------------------
>
>                 Key: BEAM-7256
>                 URL: https://issues.apache.org/jira/browse/BEAM-7256
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-mongodb
>    Affects Versions: 2.12.0
>            Reporter: Javier Cornejo
>            Priority: P3
>         Attachments: Screen Shot 2019-05-09 at 12.30.51.png
>
>
> When a read is executed over a collection that exceed the memory limit of 104857600 an exception occurs. This is declared by mongodb and is possible to control the error passing a AggregationOptions allowDiskUse true so mongo can sort with disk usage. 
> This should be happen only when aggregations are added to read but now is happening even without aggregation at all. 
> Please let me know how can help with this improvement /  bug.
>  
> !Screen Shot 2019-05-09 at 12.30.51.png!  
> {code:java}
> PCollection<KV<String, Document>> updateColls = p.apply("Reading Ops Collection: " + key, MongoDbIO .read() .withUri(options.getMongoDBUri()) .withDatabase("local") .withCollection("oplog.rs") .withBucketAuto(true) // .withQueryFn( // FindQuery.create().withFilters( // Filters.and( // Filters.gt("ts", ts.format(dtf)), // Filters.eq("ns", options.getMongoDBDBName() + "" + key), // Filters.eq("op", "u") // ) // ) // // AggregationQuery.create().withMongoDbPipeline(updatedDocsOplogAggregation) // ) )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)