You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/04 02:45:23 UTC

[GitHub] [airflow] sunank200 edited a comment on issue #16627: add more filter options to list_keys of S3Hook

sunank200 edited a comment on issue #16627:
URL: https://github.com/apache/airflow/issues/16627#issuecomment-873503661


   @eladkal , @alexInhert @potiuk I would love to add this feature and take this as my first issue on the airflow. Can I take this up?
   
   I can think of the following approach that to implement this feature. Here, the class [S3Hook](https://github.com/apache/airflow/blob/c8a628abf484f0bd9805f44dd37e284d2b5ee7db/airflow/providers/amazon/aws/hooks/s3.py#L96)  Interact with AWS S3, using the boto3 library. The hook has [list_keys](https://github.com/apache/airflow/blob/c8a628abf484f0bd9805f44dd37e284d2b5ee7db/airflow/providers/amazon/aws/hooks/s3.py#L265), which uses [S3.Client.list_objects_v2](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.list_objects_v2) of boto3 to fetch the list of keys. The list_object_v2 documentation doesn't specify the argument to filter keys by creation date of file or last modified date, but the response contains last modified date as per documentation. 
   
   The current implementation of list_keys in the S3Hook uses paginate method of a [Paginator](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html) in order to iterate over the pages of API operation results. Hence, the approach I purpose here is that the keys can be filtered for last modified date using JMESPath. JMESPath is a query language for JSON that can be used directly on paginated results. One can filter results using JMESPath expressions that are applied to each page of results through the search method of a PageIterator of S3 Paginator. I have added the code snippet of the JMESPath expression below which would list the keys based on filter of last modified datetime between `from_datetime` and `to_datetime` which defaults to None.
   
   ```
       paginator = self.get_conn().get_paginator('list_objects_v2')
       response = paginator.paginate(
               Bucket=bucket_name, Prefix=prefix, Delimiter=delimiter, PaginationConfig=config
           )
   
       # JMESPath to query directly on paginated results
       filtered_response = response.search(
               "Contents[?to_string("
               "LastModified)<='\"{}\"' && "
               "to_string(LastModified)>='\"{"
               "}\"'].Key".format(to_datetime, from_datetime)
           )
       keys = []
       for key in filtered_response:
           keys.append(key)
   ```
   
   This change wouldn't affect dependencies for other operators like `S3DeleteObjectsOperator`, `S3ListOperator`, S3Hook methods:`get_wildcard_key`, `delete_bucket` and `S3KeysUnchangedSensor`.
   
   Corresponding unittest can be modified and added to [test_s3.py](https://github.com/apache/airflow/blob/5399f9124a4e75c7bb89e47c267d89b5280060ad/tests/providers/amazon/aws/hooks/test_s3.py#L146) and [test_gcs_to_s3.py](https://github.com/apache/airflow/blob/main/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org