You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/05/05 17:14:22 UTC

[GitHub] [airflow] amatellanes opened a new pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

amatellanes opened a new pull request #15680:
URL: https://github.com/apache/airflow/pull/15680


   Remove invalid `allowDiskUse` argument when calling `find`. This method does not expect that argument according to official documentation [here](https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html?highlight=allowDiskUse#pymongo.collection.Collection.find).
   
   closes #15679 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#issuecomment-832881141


   Makes sense, `allowDiskUse` is an aggregate-only option.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] amatellanes commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
amatellanes commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r629077291



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -49,8 +49,8 @@ class MongoToS3Operator(BaseOperator):
     :type mongo_db: str
     :param replace: whether or not to replace the file in S3 if it previously existed
     :type replace: bool
-    :param allow_disk_use: in the case you are retrieving a lot of data, you may have
-        to use the disk to save it instead of saving all in the RAM
+    :param allow_disk_use: enables writing to temporary files in the case you are handling large dataset
+        when running an aggregate pipeline.

Review comment:
       @xinbinhuang Both changes done :heavy_check_mark: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627041342



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       It's added into the `.find` method in MongoDB version 4.4 and pymongo 3.11




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627077912



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think a note in the docstring saying that it requires MongoDB 4.4+ for `.find` and a link to the doc should be enough. It's neither sensible for us to control the version of an external system nor that we can. IMO, the users should know and control what MongoDB version they are using.
   
   The issue is `aggregate(allowDiskUse=True)` does work prior to MongoDB 4.4, and users running older MongoDB setups may still want to use that. If we always also pass the argument to `find()`, they will have to create two `MongoHook` instances, one with `allow_disk_use` (for `aggregate()`) and one without (for `find()`), which doesn’t feel like good interface design to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627041342



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       It's added into the `.find` method in MongoDB version 4.4 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627073207



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       From our side ("client"), we can do something like: 
   
   1. Not ideal, but this works:
   ```python
   find_method = partial(MongoHook(self.mongo_conn_id).find, 
                   mongo_collection=self.mongo_collection,
                   query=cast(dict, self.mongo_query),
                   mongo_db=self.mongo_db)
   
   if self.allow_disk_use:
       results = find_method(allow_disk_use=self.allow_disk_use)
   else:
       results = find_method()
   ```
   
   2. Or inspect the version of `pymongo`, then decide or what to do with the `allow_disk_use` parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627073207



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       Not ideal, but this works:
   ```python
   find_method = partial(MongoHook(self.mongo_conn_id).find, 
                   mongo_collection=self.mongo_collection,
                   query=cast(dict, self.mongo_query),
                   mongo_db=self.mongo_db)
   
   if self.allow_disk_use:
       results = find_method(allow_disk_use=self.allow_disk_use)
   else:
       results = find_method()
   ```
   Or inspect the version of `pymongo`, then decide or what to do with the `allow_disk_use` parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627040600



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       hmm, this is an interesting one. The [`.find`](https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html?highlight=allowDiskUse#pymongo.collection.Collection.find) method seems to supports the argument but in snake case i.e. `allow_disk_use`. 
   I'm not familiar with Mongo. Can you double-check if `allow_disk_use` is the correct one (it seems to be)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627079852



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       Why two `MongoHook` instances? I think our discussion only applies to the `.find`, and the `.aggregate` will stay as it's.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627082856



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       Ah, I misread and thought `allow_disk_use` were a property on `MongoHook` (it’s on `MongoToS3Operator` instead). Nvermind, I think your proposed solution above would work.

##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       Ah, I misread and thought `allow_disk_use` were a property on `MongoHook` (it’s on `MongoToS3Operator` instead). Nevermind, I think your proposed solution above would work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] amatellanes commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
amatellanes commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r628890619



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       @xinbinhuang I've just updated the docstring as you suggested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627046024



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       The `allow_disk_use` argument in `.find()` maps to MongoDB’s [`cursor.allowDiskUse`](https://docs.mongodb.com/manual/reference/method/cursor.allowDiskUse/), while `.aggregate()`’s `allowDiskUse` corresponds to [`allowDiskUse` in the aggregation pipeline](https://docs.mongodb.com/manual/reference/command/aggregate/#mongodb-dbcommand-dbcmd.aggregate). I’m honestly not familiar with `cursor.allowDiskUse` (in fact I didn’t know it existed until today), but from the documentation the two are quite different.
   
   I think whether we should set `find(allow_disk_use=True)` depends on what we want `MongoToS3Operator.allow_disk_use` to mean. The docstring says
   
   > allow_disk_use: in the case you are retrieving a lot of data, you may have to use the disk to save it instead of saving all in the RAM
   
   which seems to indicate it probably makes sense to set `find(allow_disk_use=True)` from it. But then the question becomes how we can pass it only to MongoDB (not pymongo!) 4.4+ (released in July 2020) because it would crash on earlier versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627073207



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       Not ideal, but it seems to be the only way:
   ```python
   find_method = partial(MongoHook(self.mongo_conn_id).find, 
                   mongo_collection=self.mongo_collection,
                   query=cast(dict, self.mongo_query),
                   mongo_db=self.mongo_db)
   
   if self.allow_disk_use:
       results = find_method(allow_disk_use=self.allow_disk_use)
   else:
       results = find_method()
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r628934812



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -49,8 +49,8 @@ class MongoToS3Operator(BaseOperator):
     :type mongo_db: str
     :param replace: whether or not to replace the file in S3 if it previously existed
     :type replace: bool
-    :param allow_disk_use: in the case you are retrieving a lot of data, you may have
-        to use the disk to save it instead of saving all in the RAM
+    :param allow_disk_use: enables writing to temporary files in the case you are handling large dataset
+        when running an aggregate pipeline.

Review comment:
       ```suggestion
           This only takes effect when `mongo_query` is a list - running an aggregate pipeline
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627068858



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think whether we should set find(allow_disk_use=True) depends on what we want MongoToS3Operator.allow_disk_use to mean. 
   
   Though the wording seems quite different, they seem to achieve the same thing: _use temporary files to store data when it exceeds certain memory limits (both 100 MB RAM)_ - one is for aggregation stage while the other is for blocking sort operation. (internally, I guess they are pretty similar) But again, I'm not familiar with Mongo enough to be confident in what I said. It will be nice if someone already using Mongo can verify this.
   
   > But then the question becomes how we can pass it only to MongoDB (not pymongo!) 4.4+ (released in July 2020) because it would crash on earlier versions.
   
   I think a note in the docstring saying that it requires MongoDB 4.4+ for `.find` and a link to the doc should be enough. It's not sensible for us to control the version of an external system nor that we can. IMO, the users should know and control what MongoDB version they are using.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627068858



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think whether we should set find(allow_disk_use=True) depends on what we want MongoToS3Operator.allow_disk_use to mean. 
   
   Though the wording seems quite different, they seem to achieve the same thing: _use temporary files to store data when it exceeds certain memory limits (both 100 MB RAM)_ - one is for aggregation stage while the other is for blocking sort operation. But again, I'm not familiar with Mongo enough to be confident in what I said. It will be nice if someone already using Mongo can verify this.
   
   > But then the question becomes how we can pass it only to MongoDB (not pymongo!) 4.4+ (released in July 2020) because it would crash on earlier versions.
   
   I think a note in the docstring saying that it requires MongoDB 4.4+ and a link to the doc should be enough. It's not sensible for us to control the version of an external system. IMO, the users should know and control what MongoDB version they are using.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627077699



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       @amatellanes  I'm ok with just removes it for now, and we can think about how to add it back in another PR in the future when someone requests it. Or we can handle the branching logic in this PR so that _allow_disk_use_ can play with single query as well.
   
   If you wanna go with the former, please update the docstring to reflect the fact that _allow_disk_use_ doesn't apply to single query (`.find`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627077912



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think a note in the docstring saying that it requires MongoDB 4.4+ for `.find` and a link to the doc should be enough. It's neither sensible for us to control the version of an external system nor that we can. IMO, the users should know and control what MongoDB version they are using.
   
   The issue is `aggregate(allowDiskUse=True)` does work prior to MongoDB 4.4, and users running older MongoDB setups may still want to use that. If we always also pass the argument to `find()`, they will have to create two `MongoHook` instances, one with `allow_disk_use` and one without, which doesn’t feel like good interface design to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627041342



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       It's added in MongoDB version 4.4 for the `.find` method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang merged pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang merged pull request #15680:
URL: https://github.com/apache/airflow/pull/15680


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#issuecomment-836752384


   Thank you for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627068858



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think whether we should set find(allow_disk_use=True) depends on what we want MongoToS3Operator.allow_disk_use to mean. 
   
   Though the wording seems quite different, they seem to achieve the same thing: _use temporary files to store data when it exceeds certain memory limits (both 100 MB RAM)_ - one is for aggregation stage while the other is for blocking sort operation. (internally, I guess they are pretty similar) But again, I'm not familiar with Mongo enough to be confident in what I said. It will be nice if someone already using Mongo can verify this.
   
   > But then the question becomes how we can pass it only to MongoDB (not pymongo!) 4.4+ (released in July 2020) because it would crash on earlier versions.
   
   I think a note in the docstring saying that it requires MongoDB 4.4+ for `.find` and a link to the doc should be enough. It's neither sensible for us to control the version of an external system nor that we can. IMO, the users should know and control what MongoDB version they are using.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r628935087



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -49,8 +49,8 @@ class MongoToS3Operator(BaseOperator):
     :type mongo_db: str
     :param replace: whether or not to replace the file in S3 if it previously existed
     :type replace: bool
-    :param allow_disk_use: in the case you are retrieving a lot of data, you may have
-        to use the disk to save it instead of saving all in the RAM
+    :param allow_disk_use: enables writing to temporary files in the case you are handling large dataset
+        when running an aggregate pipeline.

Review comment:
       @amatellanes  Can you also update line:43 to `:type mongo_query: Union[list, dict]`? Much appreciated!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627040600



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       hmm, this is an interesting one. The [`.find`](https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html?highlight=allowDiskUse#pymongo.collection.Collection.find) method seems to supports the argument but in snake case i.e. `allow_disk_use`. 
   I'm not familiar with Mongo. Can you double-check if `allow_disk_use` is supported in both pymongo and MongoDB itself? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627068858



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think whether we should set find(allow_disk_use=True) depends on what we want MongoToS3Operator.allow_disk_use to mean. 
   
   Though the wording seems quite different, they seem to achieve the same thing: _use temporary files to store data when it exceeds certain memory limits (both 100 MB RAM)_ - one is for aggregation stage while the other is for blocking sort operation. (internally, I guess they are pretty similar) But again, I'm not familiar with Mongo enough to be confident in what I said. It will be nice if someone already using Mongo can verify this.
   
   > But then the question becomes how we can pass it only to MongoDB (not pymongo!) 4.4+ (released in July 2020) because it would crash on earlier versions.
   
   I think a note in the docstring saying that it requires MongoDB 4.4+ and a link to the doc should be enough. It's not sensible for us to control the version of an external system nor that we can. IMO, the users should know and control what MongoDB version they are using.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] xinbinhuang commented on a change in pull request #15680: MongoToS3Operator failed when running with a single query (not aggregate pipeline)

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on a change in pull request #15680:
URL: https://github.com/apache/airflow/pull/15680#discussion_r627068858



##########
File path: airflow/providers/amazon/aws/transfers/mongo_to_s3.py
##########
@@ -117,7 +117,6 @@ def execute(self, context) -> bool:
                 mongo_collection=self.mongo_collection,
                 query=cast(dict, self.mongo_query),
                 mongo_db=self.mongo_db,
-                allowDiskUse=self.allow_disk_use,

Review comment:
       > I think whether we should set find(allow_disk_use=True) depends on what we want MongoToS3Operator.allow_disk_use to mean. 
   
   Though the wording seems quite different, they seem to achieve the same thing: _use temporary files to store data when it exceeds certain memory limits (both 100 MB RAM)_ - one is for aggregation stage while the other is for blocking sort operation. But again, I'm not familiar with Mongo enough to be confident in what I said. It will be nice if someone already using Mongo can verify this.
   
   > But then the question becomes how we can pass it only to MongoDB (not pymongo!) 4.4+ (released in July 2020) because it would crash on earlier versions.
   
   I think a note in the docstring saying that it requires MongoDB 4.4+ and a link to the doc should be enough. It's not sensible for us to control the version of an external system nor that we can. IMO, the users should know and control what MongoDB version they are using.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org