You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/05 08:45:46 UTC

[GitHub] kaxil closed pull request #3843: [AIRFLOW-3002] Correctly test os.stat().st_size of local_file in goog…

kaxil closed pull request #3843: [AIRFLOW-3002] Correctly test os.stat().st_size of local_file in goog…
URL: https://github.com/apache/incubator-airflow/pull/3843
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py
index 3c69fb759a..7cb950ab43 100644
--- a/airflow/contrib/operators/dataflow_operator.py
+++ b/airflow/contrib/operators/dataflow_operator.py
@@ -361,17 +361,17 @@ def google_cloud_to_local(self, file_name):
         path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/')
         if len(path_components) < 2:
             raise Exception(
-                'Invalid Google Cloud Storage (GCS) object path: {}.'
+                'Invalid Google Cloud Storage (GCS) object path: {}'
                 .format(file_name))
 
         bucket_id = path_components[0]
         object_id = '/'.join(path_components[1:])
         local_file = '/tmp/dataflow{}-{}'.format(str(uuid.uuid4())[:8],
                                                  path_components[-1])
-        file_size = self._gcs_hook.download(bucket_id, object_id, local_file)
+        self._gcs_hook.download(bucket_id, object_id, local_file)
 
-        if os.stat(file_size).st_size > 0:
+        if os.stat(local_file).st_size > 0:
             return local_file
         raise Exception(
-            'Failed to download Google Cloud Storage GCS object: {}'
+            'Failed to download Google Cloud Storage (GCS) object: {}'
             .format(file_name))
diff --git a/tests/contrib/operators/test_dataflow_operator.py b/tests/contrib/operators/test_dataflow_operator.py
index a373126b24..8ed4583255 100644
--- a/tests/contrib/operators/test_dataflow_operator.py
+++ b/tests/contrib/operators/test_dataflow_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -207,5 +207,53 @@ def test_invalid_object_path(self, mock_parent_init):
             gcs_bucket_helper.google_cloud_to_local(file_name)
 
         self.assertEquals(
-            'Invalid Google Cloud Storage (GCS) object path: {}.'.format(file_name),
+            'Invalid Google Cloud Storage (GCS) object path: {}'.format(file_name),
+            str(context.exception))
+
+    @mock.patch(
+        'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
+    )
+    def test_valid_object(self, mock_parent_init):
+
+        file_name = 'gs://test-bucket/path/to/obj.jar'
+        mock_parent_init.return_value = None
+
+        gcs_bucket_helper = GoogleCloudBucketHelper()
+        gcs_bucket_helper._gcs_hook = mock.Mock()
+
+        def _mock_download(bucket, object, filename=None):
+            text_file_contents = 'text file contents'
+            with open(filename, 'w') as text_file:
+                text_file.write(text_file_contents)
+            return text_file_contents
+
+        gcs_bucket_helper._gcs_hook.download.side_effect = _mock_download
+
+        local_file = gcs_bucket_helper.google_cloud_to_local(file_name)
+        self.assertIn('obj.jar', local_file)
+
+    @mock.patch(
+        'airflow.contrib.operators.dataflow_operator.GoogleCloudBucketHelper.__init__'
+    )
+    def test_empty_object(self, mock_parent_init):
+
+        file_name = 'gs://test-bucket/path/to/obj.jar'
+        mock_parent_init.return_value = None
+
+        gcs_bucket_helper = GoogleCloudBucketHelper()
+        gcs_bucket_helper._gcs_hook = mock.Mock()
+
+        def _mock_download(bucket, object, filename=None):
+            text_file_contents = ''
+            with open(filename, 'w') as text_file:
+                text_file.write(text_file_contents)
+            return text_file_contents
+
+        gcs_bucket_helper._gcs_hook.download.side_effect = _mock_download
+
+        with self.assertRaises(Exception) as context:
+            gcs_bucket_helper.google_cloud_to_local(file_name)
+
+        self.assertEquals(
+            'Failed to download Google Cloud Storage (GCS) object: {}'.format(file_name),
             str(context.exception))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services