You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/14 00:15:12 UTC

[GitHub] fenglu-g commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch

fenglu-g commented on a change in pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
URL: https://github.com/apache/incubator-airflow/pull/3744#discussion_r209797314
 
 

 ##########
 File path: airflow/contrib/hooks/gcp_dataflow_hook.py
 ##########
 @@ -124,36 +127,38 @@ def __init__(self, cmd):
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            lines = self._proc.stderr.readlines()
-            for line in lines:
-                self.log.warning(line[:-1])
-            if lines:
-                return lines[-1]
+            return self._proc.stderr.readline()
         if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline()
-            return line
+            return self._proc.stdout.readline()
 
     @staticmethod
     def _extract_job(line):
-        if line is not None:
-            if line.startswith("Submitted job: "):
-                return line[15:-1]
+        job_id_pattern = re.compile(
+            '.*https://console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
+        matched_job = job_id_pattern.match(line or '')
+        if matched_job:
+            return matched_job.group(1)
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
         self.log.info("Start waiting for DataFlow process to complete.")
-        while self._proc.poll() is None:
+        job_id = None
+        while True:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
                     if line:
-                        self.log.debug(line[:-1])
+                        self.log.info(line[:-1])
 
 Review comment:
   It doesn't look like the logging is excessive (about 100 lines) and can provide useful information for user to understand what's happening to their jobs launched from command line. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services