You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/09/06 04:03:21 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #4633: AIRFLOW-3791: Dataflow - Support check status if pipeline spans on multiple jobs

mik-laj commented on a change in pull request #4633: AIRFLOW-3791: Dataflow - Support check status if pipeline spans on multiple jobs
URL: https://github.com/apache/airflow/pull/4633#discussion_r321568473
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_dataflow_hook.py
 ##########
 @@ -47,58 +47,131 @@ class DataflowJobStatus:
     JOB_STATE_FAILED = "JOB_STATE_FAILED"
     JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
     JOB_STATE_PENDING = "JOB_STATE_PENDING"
+    FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
+    SUCCEEDED_END_STATES = {JOB_STATE_DONE}
+    END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES
 
 
 class _DataflowJob(LoggingMixin):
     def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
-                 job_id=None, num_retries=None):
+                 job_id=None, num_retries=None, multiple_jobs=None):
         self._dataflow = dataflow
         self._project_number = project_number
         self._job_name = name
         self._job_location = location
+        self._multiple_jobs = multiple_jobs
         self._job_id = job_id
         self._num_retries = num_retries
-        self._job = self._get_job()
+        if self._num_retries is None:
+            self._num_retries = 0
         self._poll_sleep = poll_sleep
+        self._jobs = self._get_jobs()
 
-    def _get_job_id_from_name(self):
-        jobs = self._dataflow.projects().locations().jobs().list(
-            projectId=self._project_number,
-            location=self._job_location
-        ).execute(num_retries=self._num_retries)
-        for job in jobs['jobs']:
-            if job['name'].lower() == self._job_name.lower():
-                self._job_id = job['id']
-                return job
-        return None
+    def is_job_running(self):
+        """
+        Helper method to check if jos is still running in dataflow
+
+        :return: True if job is running.
+        :rtype: bool
+        """
+        for job in self._jobs:
+            if job['currentState'] not in DataflowJobStatus.END_STATES:
+                return True
+        return False
 
-    def _get_job(self):
-        if self._job_id:
-            job = self._dataflow.projects().locations().jobs().get(
+    # pylint: disable=too-many-nested-blocks
+    def _get_dataflow_jobs(self):
+        """
+        Helper method to get list of jobs that start with job name or id
+
+        :return: list of jobs including id's
+        :rtype: list
+        """
+        if not self._multiple_jobs and self._job_id:
+            return self._dataflow.projects().locations().jobs().get(
 
 Review comment:
   Here is the problem. This method  returns a dictionary here and returns a list of dictionaries below. This makes it impossible to determine jobID
   ```
   [2019-09-06 03:20:51,974] {taskinstance.py:1042} ERROR - string indices must be integers
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/taskinstance.py", line 917, in _run_raw_task
       result = task_copy.execute(context=context)
     File "/opt/airflow/airflow/gcp/operators/dataflow.py", line 216, in execute
       self.jar, self.job_class, True, self.multiple_jobs)
     File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 372, in start_java_dataflow
       self._start_dataflow(variables, name, command_prefix, label_formatter, multiple_jobs)
     File "/opt/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 307, in wrapper
       return func(self, *args, **kwargs)
     File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 327, in _start_dataflow
       variables['region'], self.poll_sleep, job_id, self.num_retries, multiple_jobs) \
     File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 76, in __init__
       self._jobs = self._get_jobs()
     File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 138, in _get_jobs
       self._job_id, job['name']
   TypeError: string indices must be integers
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services