You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/08/15 08:42:59 UTC

[GitHub] kaxil closed pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch

kaxil closed pull request #3744: [AIRFLOW-2893] fix stuck dataflow job due to name mismatch
URL: https://github.com/apache/incubator-airflow/pull/3744
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/gcp_dataflow_hook.py b/airflow/contrib/hooks/gcp_dataflow_hook.py
index ee3b510ed7..3a8eb28cee 100644
--- a/airflow/contrib/hooks/gcp_dataflow_hook.py
+++ b/airflow/contrib/hooks/gcp_dataflow_hook.py
@@ -34,12 +34,13 @@
 
 
 class _DataflowJob(LoggingMixin):
-    def __init__(self, dataflow, project_number, name, location, poll_sleep=10):
+    def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
+                 job_id=None):
         self._dataflow = dataflow
         self._project_number = project_number
         self._job_name = name
         self._job_location = location
-        self._job_id = None
+        self._job_id = job_id
         self._job = self._get_job()
         self._poll_sleep = poll_sleep
 
@@ -55,13 +56,15 @@ def _get_job_id_from_name(self):
         return None
 
     def _get_job(self):
-        if self._job_name:
-            job = self._get_job_id_from_name()
-        else:
+        if self._job_id:
             job = self._dataflow.projects().jobs().get(
                 projectId=self._project_number,
                 jobId=self._job_id
             ).execute(num_retries=5)
+        elif self._job_name:
+            job = self._get_job_id_from_name()
+        else:
+            raise Exception('Missing both dataflow job ID and name.')
 
         if job and 'currentState' in job:
             self.log.info(
@@ -124,36 +127,50 @@ def __init__(self, cmd):
 
     def _line(self, fd):
         if fd == self._proc.stderr.fileno():
-            lines = self._proc.stderr.readlines()
-            for line in lines:
+            line = b''.join(self._proc.stderr.readlines())
+            if line:
                 self.log.warning(line[:-1])
-            if lines:
-                return lines[-1]
+            return line
         if fd == self._proc.stdout.fileno():
-            line = self._proc.stdout.readline()
+            line = b''.join(self._proc.stdout.readlines())
+            if line:
+                self.log.info(line[:-1])
             return line
 
     @staticmethod
     def _extract_job(line):
-        if line is not None:
-            if line.startswith("Submitted job: "):
-                return line[15:-1]
+        # Job id info: https://goo.gl/SE29y9.
+        job_id_pattern = re.compile(
+            b'.*console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
+        matched_job = job_id_pattern.search(line or '')
+        if matched_job:
+            return matched_job.group(1).decode()
 
     def wait_for_done(self):
         reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
         self.log.info("Start waiting for DataFlow process to complete.")
-        while self._proc.poll() is None:
+        job_id = None
+        # Make sure logs are processed regardless whether the subprocess is
+        # terminated.
+        process_ends = False
+        while True:
             ret = select.select(reads, [], [], 5)
             if ret is not None:
                 for fd in ret[0]:
                     line = self._line(fd)
                     if line:
-                        self.log.debug(line[:-1])
+                        job_id = job_id or self._extract_job(line)
             else:
                 self.log.info("Waiting for DataFlow process to complete.")
+            if process_ends:
+                break
+            if self._proc.poll() is not None:
+                # Mark process completion but allows its outputs to be consumed.
+                process_ends = True
         if self._proc.returncode is not 0:
             raise Exception("DataFlow failed with return code {}".format(
                 self._proc.returncode))
+        return job_id
 
 
 class DataFlowHook(GoogleCloudBaseHook):
@@ -178,9 +195,10 @@ def _start_dataflow(self, task_id, variables, name,
         variables = self._set_variables(variables)
         cmd = command_prefix + self._build_cmd(task_id, variables,
                                                label_formatter)
-        _Dataflow(cmd).wait_for_done()
+        job_id = _Dataflow(cmd).wait_for_done()
         _DataflowJob(self.get_conn(), variables['project'], name,
-                     variables['region'], self.poll_sleep).wait_for_done()
+                     variables['region'],
+                     self.poll_sleep, job_id).wait_for_done()
 
     @staticmethod
     def _set_variables(variables):
diff --git a/tests/contrib/hooks/test_gcp_dataflow_hook.py b/tests/contrib/hooks/test_gcp_dataflow_hook.py
index bc7c587135..686db10e5c 100644
--- a/tests/contrib/hooks/test_gcp_dataflow_hook.py
+++ b/tests/contrib/hooks/test_gcp_dataflow_hook.py
@@ -22,8 +22,8 @@
 from mock import call
 from mock import MagicMock
 
-from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook
-from airflow.contrib.hooks.gcp_dataflow_hook import _Dataflow
+from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook,\
+    _Dataflow, _DataflowJob
 
 try:
     from unittest import mock
@@ -62,6 +62,10 @@
 BASE_STRING = 'airflow.contrib.hooks.gcp_api_base_hook.{}'
 DATAFLOW_STRING = 'airflow.contrib.hooks.gcp_dataflow_hook.{}'
 MOCK_UUID = '12345678'
+TEST_PROJECT = 'test-project'
+TEST_JOB_NAME = 'test-job-name'
+TEST_JOB_ID = 'test-job-id'
+TEST_LOCATION = 'us-central1'
 
 
 def mock_init(self, gcp_conn_id, delegate_to=None):
@@ -152,25 +156,25 @@ def test_start_java_dataflow_with_job_class(
     @mock.patch('subprocess.Popen')
     @mock.patch('select.select')
     def test_dataflow_wait_for_done_logging(self, mock_select, mock_popen, mock_logging):
-      mock_logging.info = MagicMock()
-      mock_logging.warning = MagicMock()
-      mock_proc = MagicMock()
-      mock_proc.stderr = MagicMock()
-      mock_proc.stderr.readlines = MagicMock(return_value=['test\n','error\n'])
-      mock_stderr_fd = MagicMock()
-      mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)
-      mock_proc_poll = MagicMock()
-      mock_select.return_value = [[mock_stderr_fd]]
-      def poll_resp_error():
-        mock_proc.return_code = 1
-        return True
-      mock_proc_poll.side_effect=[None, poll_resp_error]
-      mock_proc.poll = mock_proc_poll
-      mock_popen.return_value = mock_proc
-      dataflow = _Dataflow(['test', 'cmd'])
-      mock_logging.info.assert_called_with('Running command: %s', 'test cmd')
-      self.assertRaises(Exception, dataflow.wait_for_done)
-      mock_logging.warning.assert_has_calls([call('test'), call('error')])
+        mock_logging.info = MagicMock()
+        mock_logging.warning = MagicMock()
+        mock_proc = MagicMock()
+        mock_proc.stderr = MagicMock()
+        mock_proc.stderr.readlines = MagicMock(return_value=['test\n', 'error\n'])
+        mock_stderr_fd = MagicMock()
+        mock_proc.stderr.fileno = MagicMock(return_value=mock_stderr_fd)
+        mock_proc_poll = MagicMock()
+        mock_select.return_value = [[mock_stderr_fd]]
+
+        def poll_resp_error():
+            mock_proc.return_code = 1
+            return True
+        mock_proc_poll.side_effect = [None, poll_resp_error]
+        mock_proc.poll = mock_proc_poll
+        mock_popen.return_value = mock_proc
+        dataflow = _Dataflow(['test', 'cmd'])
+        mock_logging.info.assert_called_with('Running command: %s', 'test cmd')
+        self.assertRaises(Exception, dataflow.wait_for_done)
 
     def test_valid_dataflow_job_name(self):
         job_name = self.dataflow_hook._build_dataflow_job_name(
@@ -254,3 +258,40 @@ def test_start_template_dataflow(self, internal_dataflow_mock):
             dataflow_template=TEMPLATE)
         internal_dataflow_mock.assert_called_once_with(
             mock.ANY, DATAFLOW_OPTIONS_TEMPLATE, PARAMETERS, TEMPLATE)
+
+
+class DataFlowJobTest(unittest.TestCase):
+
+    def setUp(self):
+        self.mock_dataflow = MagicMock()
+
+    def test_dataflow_job_init_with_job_id(self):
+        mock_jobs = MagicMock()
+        self.mock_dataflow.projects.return_value.\
+            jobs.return_value = mock_jobs
+        _DataflowJob(self.mock_dataflow, TEST_PROJECT, TEST_JOB_NAME,
+                     TEST_LOCATION, 10, TEST_JOB_ID)
+        mock_jobs.get.assert_called_with(projectId=TEST_PROJECT, jobId=TEST_JOB_ID)
+
+    def test_dataflow_job_init_without_job_id(self):
+        mock_jobs = MagicMock()
+        self.mock_dataflow.projects.return_value.locations.return_value.\
+            jobs.return_value = mock_jobs
+        _DataflowJob(self.mock_dataflow, TEST_PROJECT, TEST_JOB_NAME,
+                     TEST_LOCATION, 10)
+        mock_jobs.list.assert_called_with(projectId=TEST_PROJECT,
+                                          location=TEST_LOCATION)
+
+
+class DataflowTest(unittest.TestCase):
+
+    def test_data_flow_valid_job_id(self):
+        cmd = ['echo', 'additional unit test lines.\n' +
+               'INFO: the Dataflow monitoring console, please navigate to' +
+               'https://console.cloud.google.com/dataflow/jobsDetail/locations/' +
+               '{}/jobs/{}?project={}'.format(TEST_LOCATION, TEST_JOB_ID, TEST_PROJECT)]
+        self.assertEqual(_Dataflow(cmd).wait_for_done(), TEST_JOB_ID)
+
+    def test_data_flow_missing_job_id(self):
+        cmd = ['echo', 'unit testing']
+        self.assertEqual(_Dataflow(cmd).wait_for_done(), None)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services