You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/13 23:17:38 UTC

[GitHub] TrevorEdwards commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch

TrevorEdwards commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
URL: https://github.com/apache/incubator-airflow/pull/3744#discussion_r209787554
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_dataflow_hook.py
 ##########
 @@ -124,36 +127,38 @@ def __init__(self, cmd):
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            lines = self._proc.stderr.readlines()
-            for line in lines:
-                self.log.warning(line[:-1])
-            if lines:
-                return lines[-1]
+            return self._proc.stderr.readline()
         if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline()
-            return line
+            return self._proc.stdout.readline()
 
     @staticmethod
     def _extract_job(line):
-        if line is not None:
-            if line.startswith("Submitted job: "):
-                return line[15:-1]
+        job_id_pattern = re.compile(
+            '.*https://console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
+        matched_job = job_id_pattern.match(line or '')
+        if matched_job:
+            return matched_job.group(1)
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
         self.log.info("Start waiting for DataFlow process to complete.")
-        while self._proc.poll() is None:
+        job_id = None
+        while True:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
                     if line:
-                        self.log.debug(line[:-1])
+                        self.log.info(line[:-1])
+                        job_id = job_id or self._extract_job(line)
             else:
                 self.log.info("Waiting for DataFlow process to complete.")
+            if self._proc.poll() is not None:
 
 Review comment:
   Is this here now to prevent final logs from getting dropped? A comment would help prevent this from being undone.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services