You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2016/06/24 03:30:16 UTC

incubator-airflow git commit: [AIRFLOW-274] Add XCom functionality to GoogleCloudStorageDownloadOperator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master bd9127ebb -> 518e0073a


[AIRFLOW-274] Add XCom functionality to GoogleCloudStorageDownloadOperator

Updated GoogleCloudStorageDownloadOperator so that it has the option to
store the downloaded file's contents in an XCom instead of saving to
disk. It now also takes filename as an optional argument instead of a
required argument, so it is not necessary to save to disk if you do not
need to.

Closes #1618 from illop/risk_analytics


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/518e0073
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/518e0073
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/518e0073

Branch: refs/heads/master
Commit: 518e0073a81585b39757afb4c0d8d65d400a88e2
Parents: bd9127e
Author: Ilya Rakoshes <il...@wepay.com>
Authored: Thu Jun 23 20:30:11 2016 -0700
Committer: Chris Riccomini <ch...@wepay.com>
Committed: Thu Jun 23 20:30:11 2016 -0700

----------------------------------------------------------------------
 .../contrib/operators/gcs_download_operator.py   | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/518e0073/airflow/contrib/operators/gcs_download_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_download_operator.py b/airflow/contrib/operators/gcs_download_operator.py
index 7e19e13..511248e 100644
--- a/airflow/contrib/operators/gcs_download_operator.py
+++ b/airflow/contrib/operators/gcs_download_operator.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+import sys
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.models import BaseOperator
@@ -31,7 +32,8 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         self,
         bucket,
         object,
-        filename,
+        filename=False,
+        store_to_xcom_key=False,
         google_cloud_storage_conn_id='google_cloud_storage_default',
         delegate_to=None,
         *args,
@@ -46,7 +48,13 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         :type object: string
         :param filename: The file path on the local file system (where the
             operator is being executed) that the file should be downloaded to.
+            If false, the downloaded data will not be stored on the local file
+            system.
         :type filename: string
+        :param store_to_xcom_key: If this param is set, the operator will push
+            the contents of the downloaded file to XCom with the key set in this
+            paramater. If false, the downloaded data will not be pushed to XCom.
+        :type store_to_xcom_key: string
         :param google_cloud_storage_conn_id: The connection ID to use when
             connecting to Google cloud storage.
         :type google_cloud_storage_conn_id: string
@@ -58,6 +66,7 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         self.bucket = bucket
         self.object = object
         self.filename = filename
+        self.store_to_xcom_key = store_to_xcom_key
         self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
         self.delegate_to = delegate_to
 
@@ -65,4 +74,10 @@ class GoogleCloudStorageDownloadOperator(BaseOperator):
         logging.info('Executing download: %s, %s, %s', self.bucket, self.object, self.filename)
         hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
                                       delegate_to=self.delegate_to)
-        print(hook.download(self.bucket, self.object, self.filename))
+        file_bytes = hook.download(self.bucket, self.object, self.filename)
+        if self.store_to_xcom_key:
+            if sys.getsizeof(file_bytes) < 48000:
+                context['ti'].xcom_push(key=self.store_to_xcom_key, value=file_bytes)
+            else:
+                raise RuntimeError('The size of the downloaded file is too large to push to XCom!')
+        print(file_bytes)