You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/11 11:25:44 UTC

[GitHub] [airflow] dinigo edited a comment on issue #8804: Add ability for all operators to interact with storages of AWS/GCP/AZURE

dinigo edited a comment on issue #8804:
URL: https://github.com/apache/airflow/issues/8804#issuecomment-626625086


   You are suggesting to implement kind of what we have with [GenericTransfer](https://github.com/apache/airflow/blob/master/airflow/operators/generic_transfer.py). I've thought of this too. We would need to have a common API for getting and sending files. Preferably a `flile-like-object` so transfers are done with a stream and they don't take storage.
   
   I would rather have an `GenericFileTransfer`
   
   Having `FsApiHook` abstract implementing functions similar to [`DbApiHook`](https://github.com/apache/airflow/blob/master/airflow/hooks/dbapi_hook.py#L46) with the `get_records` and `insert_rows` but for files. For example `get_file_stream` and `write_file_stream`. 
   
   So for example having a `GCSHook` and `S3Hook` extend `FsApiHook`
   ```python
   class GCSHook (GcpBaseHook, FsApiHook):
     # ...
     def get_file_stream(bucket: str, file_path: str):
       return gcsfs.open(file_path)
     # ...
   ```
   ```python
   class S3Hook (AwsBaseHook, FsApi):
     # ...
     def write_file_stream(bucket: str, file_path: str):
       """ Already implemented in https://github.com/apache/airflow/blob/master/airflow/providers/amazon/aws/hooks/s3.py#L582 """
       client.upload_fileobj(file_obj, bucket_name, key, ExtraArgs=extra_args)
     # ...
    ```
   Having a `GenericFileTransfer` similar to what `GenericTransfer` does in the [`execute`](https://github.com/apache/airflow/blob/master/airflow/operators/generic_transfer.py#L67) method:
   ```
   class GenericFileTransfer(BaseOperator):
     # ...
     def execute() 
           source_hook = BaseHook.get_hook(self.source_conn_id)
           exchange_stream = source_hook.get_file_stream(self.source_bucket, self.source_file)
           dest_hook = BaseHook.get_hook(self.dest_conn_id)
           dest_hook.write_file_stream(self.dest_bucket, self.dest_file)
   ```
   
   Now I can configure the Hooks for each, source and destination, such as:
   ```python
   gcs_to_s3 = GenericFileTransfer(
     source_conn='gcs-conn-id',
     source_hook=GCSHook,
     source_bucket='my-gcs-bucket',
     source_file='my-file{{ ds }}.csv',
     dest_conn='s3-conn-id',
     dest_hook=S3Hook,
     dest_bucket='my-s3-bucket',
     dest_file='my-file{{ ds }}.csv'
   )
   ```
   
   And now you can remove all the specific copy operators like `S3ToSFTP`, `AzureBlobbToGCS`... Or, if they don't exist you don't need to implement them anymore!
   
   What do you think @turbaszek? I don't know how the Core development is organized (I know there's a Jira and a mailing list, but know nothing of the organization processes). This will be a core change. But if Airflow aims to thrive, it's necessary (IMHO)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org