You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/13 07:46:23 UTC

[GitHub] neil90 closed pull request #3870: [AIRFLOW-2932] GoogleCloudStorageHook - allow compression of file

neil90 closed pull request #3870: [AIRFLOW-2932] GoogleCloudStorageHook - allow compression of file
URL: https://github.com/apache/incubator-airflow/pull/3870
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index 6cfa1cf565..1663cae1ba 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -24,7 +24,10 @@
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.exceptions import AirflowException
 
+import gzip as gz
+import shutil
 import re
+import os
 
 
 class GoogleCloudStorageHook(GoogleCloudBaseHook):
@@ -172,7 +175,8 @@ def download(self, bucket, object, filename=None):
         return downloaded_file_bytes
 
     # pylint:disable=redefined-builtin
-    def upload(self, bucket, object, filename, mime_type='application/octet-stream'):
+    def upload(self, bucket, object, filename,
+               gzip=False, mime_type='application/octet-stream'):
         """
         Uploads a local file to Google Cloud Storage.
 
@@ -182,16 +186,32 @@ def upload(self, bucket, object, filename, mime_type='application/octet-stream')
         :type object: str
         :param filename: The local file path to the file to be uploaded.
         :type filename: str
+        :param gzip: Option to compress file for upload
+        :type gzip: boolean
         :param mime_type: The MIME type to set when uploading the file.
         :type mime_type: str
         """
         service = self.get_conn()
+
+        if gzip:
+            filename_gz = filename + '.gz'
+
+            with open(filename, 'rb') as f_in:
+                with gz.open(filename_gz, 'wb') as f_out:
+                    shutil.copyfileobj(f_in, f_out)
+                    filename = filename_gz
+
         media = MediaFileUpload(filename, mime_type)
+
         try:
             service \
                 .objects() \
                 .insert(bucket=bucket, name=object, media_body=media) \
                 .execute()
+
+            # Clean up gzip file
+            if gzip:
+                os.remove(filename)
             return True
         except errors.HttpError as ex:
             if ex.resp['status'] == '404':
diff --git a/airflow/contrib/operators/file_to_gcs.py b/airflow/contrib/operators/file_to_gcs.py
index a392a16891..5b85ff7b48 100644
--- a/airflow/contrib/operators/file_to_gcs.py
+++ b/airflow/contrib/operators/file_to_gcs.py
@@ -37,6 +37,8 @@ class FileToGoogleCloudStorageOperator(BaseOperator):
     :type google_cloud_storage_conn_id: str
     :param mime_type: The mime-type string
     :type mime_type: str
+    :type gzip: Allows for file to upload as gzip
+    :param gzip: boolean
     :param delegate_to: The account to impersonate, if any
     :type delegate_to: str
     """
@@ -49,6 +51,7 @@ def __init__(self,
                  bucket,
                  google_cloud_storage_conn_id='google_cloud_default',
                  mime_type='application/octet-stream',
+                 gzip=False,
                  delegate_to=None,
                  *args,
                  **kwargs):
@@ -58,6 +61,7 @@ def __init__(self,
         self.bucket = bucket
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.mime_type = mime_type
+        self.gzip = gzip
         self.delegate_to = delegate_to
 
     def execute(self, context):
@@ -72,4 +76,5 @@ def execute(self, context):
             bucket=self.bucket,
             object=self.dst,
             mime_type=self.mime_type,
+            gzip=self.gzip,
             filename=self.src)
diff --git a/dags/test_gcs_dag.py b/dags/test_gcs_dag.py
new file mode 100644
index 0000000000..e41314af27
--- /dev/null
+++ b/dags/test_gcs_dag.py
@@ -0,0 +1,36 @@
+"""
+Code that goes along with the Airflow tutorial located at:
+https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
+"""
+from airflow import DAG
+from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
+from datetime import datetime, timedelta
+
+
+default_args = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': datetime(2015, 6, 1),
+    'email': ['airflow@example.com'],
+    'email_on_failure': False,
+    'email_on_retry': False,
+    'retries': 1,
+    'retry_delay': timedelta(minutes=5),
+    # 'queue': 'bash_queue',
+    # 'pool': 'backfill',
+    # 'priority_weight': 10,
+    # 'end_date': datetime(2016, 1, 1),
+}
+
+dag = DAG('gcs_upload_gzip', default_args=default_args)
+
+t1 = FileToGoogleCloudStorageOperator(task_id='gcs_upload',src='/app/2008.csv',
+dst='2008.csv1', bucket='gcsnotes-neil', gzip=False, dag=dag)
+
+#export AIRFLOW_HOME=/airflow
+#airflow initdb
+#airflow webserver -p 8080
+#pip install -e ".[hdfs,hive,druid,devel,gcs]"
+#airflow test gcs_upload_gzip gcs_upload 2015-01-01
+#https://www.googleapis.com/auth/cloud-platform
+#pip install --upgrade google-api-python-client


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services