You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/15 04:36:26 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #8313: Simplify handling of downloaded files in Dataflow

mik-laj commented on a change in pull request #8313: Simplify handling of downloaded files in Dataflow
URL: https://github.com/apache/airflow/pull/8313#discussion_r408575682
 
 

 ##########
 File path: airflow/providers/google/cloud/operators/dataflow.py
 ##########
 @@ -471,85 +473,42 @@ def __init__(  # pylint: disable=too-many-arguments
 
     def execute(self, context):
         """Execute the python dataflow job."""
-        bucket_helper = GoogleCloudBucketHelper(
-            self.gcp_conn_id, self.delegate_to)
-        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
-        self.hook = DataflowHook(
-            gcp_conn_id=self.gcp_conn_id,
-            delegate_to=self.delegate_to,
-            poll_sleep=self.poll_sleep
-        )
-        dataflow_options = self.dataflow_default_options.copy()
-        dataflow_options.update(self.options)
-        # Convert argument names from lowerCamelCase to snake case.
-        camel_to_snake = lambda name: re.sub(
-            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
-        formatted_options = {camel_to_snake(key): dataflow_options[key]
-                             for key in dataflow_options}
+        with ExitStack() as exit_stack:
+            if self.py_file.lower().startswith('gs://'):
+                gcs_hook = GCSHook(self.gcp_conn_id, self.delegate_to)
+                tmp_gcs_file = exit_stack.enter_context(  # pylint: disable=no-member
+                    gcs_hook.provide_file(object_url=self.py_file)
+                )
+                self.py_file = tmp_gcs_file.name
+
+            self.hook = DataflowHook(
+                gcp_conn_id=self.gcp_conn_id,
+                delegate_to=self.delegate_to,
+                poll_sleep=self.poll_sleep
+            )
+            dataflow_options = self.dataflow_default_options.copy()
+            dataflow_options.update(self.options)
+            # Convert argument names from lowerCamelCase to snake case.
+            camel_to_snake = lambda name: re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
+            formatted_options = {camel_to_snake(key): dataflow_options[key]
+                                 for key in dataflow_options}
 
-        def set_current_job_id(job_id):
-            self.job_id = job_id
+            def set_current_job_id(job_id):
+                self.job_id = job_id
 
-        self.hook.start_python_dataflow(
-            job_name=self.job_name,
-            variables=formatted_options,
-            dataflow=self.py_file,
-            py_options=self.py_options,
-            py_interpreter=self.py_interpreter,
-            py_requirements=self.py_requirements,
-            py_system_site_packages=self.py_system_site_packages,
-            on_new_job_id_callback=set_current_job_id,
-            project_id=self.project_id,
-        )
+            self.hook.start_python_dataflow(
+                job_name=self.job_name,
+                variables=formatted_options,
+                dataflow=self.py_file,
+                py_options=self.py_options,
+                py_interpreter=self.py_interpreter,
+                py_requirements=self.py_requirements,
+                py_system_site_packages=self.py_system_site_packages,
+                on_new_job_id_callback=set_current_job_id,
+                project_id=self.project_id,
+            )
 
     def on_kill(self) -> None:
         self.log.info("On kill.")
         if self.job_id:
             self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id)
-
-
-class GoogleCloudBucketHelper:
-    """GoogleCloudStorageHook helper class to download GCS object."""
-    GCS_PREFIX_LENGTH = 5
-
-    def __init__(self,
-                 gcp_conn_id: str = 'google_cloud_default',
-                 delegate_to: Optional[str] = None) -> None:
-        self._gcs_hook = GCSHook(gcp_conn_id, delegate_to)
-
-    def google_cloud_to_local(self, file_name: str) -> str:
-        """
-        Checks whether the file specified by file_name is stored in Google Cloud
-        Storage (GCS), if so, downloads the file and saves it locally. The full
-        path of the saved file will be returned. Otherwise the local file_name
-        will be returned immediately.
-
-        :param file_name: The full path of input file.
-        :type file_name: str
-        :return: The full path of local file.
-        :rtype: str
-        """
-        if not file_name.startswith('gs://'):
-            return file_name
-
-        # Extracts bucket_id and object_id by first removing 'gs://' prefix and
-        # then split the remaining by path delimiter '/'.
-        path_components = file_name[self.GCS_PREFIX_LENGTH:].split('/')
-        if len(path_components) < 2:
-            raise Exception(
-                'Invalid Google Cloud Storage (GCS) object path: {}'
-                .format(file_name))
-
-        bucket_id = path_components[0]
-        object_id = '/'.join(path_components[1:])
-        local_file = os.path.join(
-            tempfile.gettempdir(),
 
 Review comment:
   This could cause problems because these files have never been deleted.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services