You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/14 15:54:26 UTC

[GitHub] ashb commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch

ashb commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
URL: https://github.com/apache/incubator-airflow/pull/3744#discussion_r210006800
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_dataflow_hook.py
 ##########
 @@ -124,36 +127,38 @@ def __init__(self, cmd):
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            lines = self._proc.stderr.readlines()
-            for line in lines:
-                self.log.warning(line[:-1])
-            if lines:
-                return lines[-1]
+            return self._proc.stderr.readline()
         if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline()
-            return line
+            return self._proc.stdout.readline()
 
     @staticmethod
     def _extract_job(line):
-        if line is not None:
-            if line.startswith("Submitted job: "):
-                return line[15:-1]
+        job_id_pattern = re.compile(
+            '.*https://console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
+        matched_job = job_id_pattern.match(line or '')
+        if matched_job:
+            return matched_job.group(1)
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
         self.log.info("Start waiting for DataFlow process to complete.")
-        while self._proc.poll() is None:
+        job_id = None
+        while True:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
                     if line:
-                        self.log.debug(line[:-1])
+                        self.log.info(line[:-1])
+                        job_id = job_id or self._extract_job(line)
             else:
                 self.log.info("Waiting for DataFlow process to complete.")
+            if self._proc.poll() is not None:
 
 Review comment:
   Reading each of stdout and stderr independently is hard to do without deadlocking one or the other too -you can't just read one to the end then read the other. https://stackoverflow.com/questions/33886406/how-to-avoid-the-deadlock-in-a-subprocess-without-using-communicate
   
   (Sorry if this is out of context - I haven't looked at the PR, just seen the comments)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services