You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/19 23:23:08 UTC

[GitHub] [airflow] IKholopov opened a new pull request, #27155: Metric for raw task return codes

IKholopov opened a new pull request, #27155:
URL: https://github.com/apache/airflow/pull/27155

   **Problem:** One of the challenges of running Celery workers in containerized environment is detecting system termination of a raw task instance. 
   For example, if running task hits Airflow celery worker container memory limit and terminated by OOM killer, the only way for a DAG author to discover that task failed because of the memory pressure - is to guess it from the log entry "Task exited with return code negsignal.SIGKILL". 
   ![image](https://user-images.githubusercontent.com/2447492/196821258-e320070a-bca9-40c4-bfe3-b1f82abe7983.png)
   
   This message is a bit cryptic and from the point of the engineer responsible for Airflow infrastructure management (who is often a different person from the DAG authors) it would be much nicer to setup the dashboard that could display such events and setup alerts for them. Of course, it is possible to parse the log entries of all tasks, but this is a fragile invariant which would require additional tooling.
   
   **Proposed solution:**  Introduce a metric for task instances raw task execution return codes. The proposed structure is a counter with the name: `ti.raw_task_return_code.<dag-id>.<task-id>.<return code>`. This will allow to both detect the changes in the frequency of particular return codes (like SIGKILL or SIGTERM) across the whole Airflow deployment and to scope down failures to particular tasks.
   
   *TODO:* Update documentation - I expect to have some discussion around the idea of this metric in this PR, so I want to have a consensus on the name/implementation before putting it in the docs. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1018513609


##########
airflow/jobs/local_task_job.py:
##########
@@ -282,6 +283,11 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
             )
             session.rollback()
 
+    def _log_return_code_metric(self, return_code: int):
+        Stats.incr(
+            f'local_task_job.task_exit.{self.id}.{self.dag_id}.{self.task_instance.task_id}.{return_code}'
+        )

Review Comment:
   My reasoning for moving it here was to not mix the logic of processing exited task and preparing metric value + sending it. The second part is trivial as on now, but this is most likely to be a subject to change.
   
   As there are plans to migrate metrics from StatsD to OpenTelemetry [AIP-49](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow), I thought it would be cleaner to keep these pieces of code that will need to be refactored in this migration separate from the other logic of the job. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1017043850


##########
tests/jobs/test_local_task_job.py:
##########
@@ -374,6 +374,29 @@ def test_localtaskjob_double_trigger(self):
 
         session.close()
 
+    @patch.object(StandardTaskRunner, 'return_code')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_raw_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
+
+        _, task = create_dummy_dag('test_localtaskjob_double_trigger')
+        mock_stats_incr.reset_mock()

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1017040333


##########
tests/jobs/test_local_task_job.py:
##########
@@ -374,6 +374,29 @@ def test_localtaskjob_double_trigger(self):
 
         session.close()
 
+    @patch.object(StandardTaskRunner, 'return_code')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_raw_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
+
+        _, task = create_dummy_dag('test_localtaskjob_double_trigger')
+        mock_stats_incr.reset_mock()
+
+        ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti_run.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti_run, executor=SequentialExecutor())
+
+        mock_return_code.side_effect = [None, -9, None]
+
+        with timeout(10):
+            job1.run()
+
+        mock_stats_incr.assert_has_calls(
+            [
+                mock.call('ti.raw_task_return_code.test_localtaskjob_double_trigger.op1.-9'),
+            ],
+            any_order=True,

Review Comment:
   Removed any_order. Since return_code is mocked, it it better not to introduce multiple task instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #27155:
URL: https://github.com/apache/airflow/pull/27155#issuecomment-1306301761

   I think some responses would be needed to take this forward @IKholopov are you still working on it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1017040496


##########
tests/jobs/test_local_task_job.py:
##########
@@ -374,6 +374,29 @@ def test_localtaskjob_double_trigger(self):
 
         session.close()
 
+    @patch.object(StandardTaskRunner, 'return_code')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')

Review Comment:
   Done.



##########
airflow/jobs/local_task_job.py:
##########
@@ -162,6 +162,7 @@ def handle_task_exit(self, return_code: int) -> None:
         # Without setting this, heartbeat may get us
         self.terminating = True
         self.log.info("Task exited with return code %s", return_code)
+        Stats.incr(f'ti.raw_task_return_code.{self.dag_id}.{self.task_instance.task_id}.{return_code}')

Review Comment:
   I think `local_task_job.task_exit` makes sense. On the other hand, shouldn't we add then a `job_id`, since this is the identifier of a local_task_job?
   
   I added all 3 (job_id, dag_id, task_id), let me know if you think that it is too much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on PR #27155:
URL: https://github.com/apache/airflow/pull/27155#issuecomment-1307744587

   > I think some responses would be needed to take this forward @IKholopov are you still working on it?
   
   Yep, I've addressed Eugene's feedback. Let me know what you think about this proposal in general.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on pull request #27155: Metric for raw task return codes

Posted by "IKholopov (via GitHub)" <gi...@apache.org>.
IKholopov commented on PR #27155:
URL: https://github.com/apache/airflow/pull/27155#issuecomment-1493121811

   @steveyz-astro mostly for the extra tracking, aimed at use cases where someone might want to dot custom processing as the metric is emitted. For example, for adding some dynamic attributes for the task execution trace in case if return_code != 0. It is true that it has an unbounded cardinality and I wouldn't expect most of the integrations that store it as a metric would keep it. For example, in our (Composer) integration we do plan to drop it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #27155:
URL: https://github.com/apache/airflow/pull/27155


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kosteev commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
kosteev commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1004255873


##########
airflow/jobs/local_task_job.py:
##########
@@ -162,6 +162,7 @@ def handle_task_exit(self, return_code: int) -> None:
         # Without setting this, heartbeat may get us
         self.terminating = True
         self.log.info("Task exited with return code %s", return_code)
+        Stats.incr(f'ti.raw_task_return_code.{self.dag_id}.{self.task_instance.task_id}.{return_code}')

Review Comment:
   Not sure about how is better design here, but I thought for a moment that maybe good to have encapsulated the logic of emitting this metric inside taskinstance.py module (as other metrics relate to task instance).



##########
tests/jobs/test_local_task_job.py:
##########
@@ -374,6 +374,29 @@ def test_localtaskjob_double_trigger(self):
 
         session.close()
 
+    @patch.object(StandardTaskRunner, 'return_code')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')

Review Comment:
   It is good to have autospec=True parameter set for mocked objects, that will enforce method signature checks by unittest.mock library.
   Although, I am not sure if this guideline is followed in the repo.



##########
tests/jobs/test_local_task_job.py:
##########
@@ -374,6 +374,29 @@ def test_localtaskjob_double_trigger(self):
 
         session.close()
 
+    @patch.object(StandardTaskRunner, 'return_code')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_raw_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
+
+        _, task = create_dummy_dag('test_localtaskjob_double_trigger')
+        mock_stats_incr.reset_mock()

Review Comment:
   Is it really needed (reset_mock)? This is just beginning of the method.



##########
airflow/jobs/local_task_job.py:
##########
@@ -162,6 +162,7 @@ def handle_task_exit(self, return_code: int) -> None:
         # Without setting this, heartbeat may get us
         self.terminating = True
         self.log.info("Task exited with return code %s", return_code)
+        Stats.incr(f'ti.raw_task_return_code.{self.dag_id}.{self.task_instance.task_id}.{return_code}')

Review Comment:
   Or maybe metric should be more like `local_task_job.task_exit.{dag_id}.{task_id}.{return_code}`. WDYT?



##########
tests/jobs/test_local_task_job.py:
##########
@@ -374,6 +374,29 @@ def test_localtaskjob_double_trigger(self):
 
         session.close()
 
+    @patch.object(StandardTaskRunner, 'return_code')
+    @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+    def test_raw_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
+
+        _, task = create_dummy_dag('test_localtaskjob_double_trigger')
+        mock_stats_incr.reset_mock()
+
+        ti_run = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti_run.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti_run, executor=SequentialExecutor())
+
+        mock_return_code.side_effect = [None, -9, None]
+
+        with timeout(10):
+            job1.run()
+
+        mock_stats_incr.assert_has_calls(
+            [
+                mock.call('ti.raw_task_return_code.test_localtaskjob_double_trigger.op1.-9'),
+            ],
+            any_order=True,

Review Comment:
   You set "any_order=True", but only one call is expected, looks strange?
   Maybe good idea to actually have multiple calls expected from different tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on PR #27155:
URL: https://github.com/apache/airflow/pull/27155#issuecomment-1322679730

   @uranusjr @potiuk 
   
   Are we good to merge this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1017457180


##########
airflow/jobs/local_task_job.py:
##########
@@ -282,6 +283,11 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
             )
             session.rollback()
 
+    def _log_return_code_metric(self, return_code: int):
+        Stats.incr(
+            f'local_task_job.task_exit.{self.id}.{self.dag_id}.{self.task_instance.task_id}.{return_code}'
+        )

Review Comment:
   Why does this need a separate function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] IKholopov commented on a diff in pull request #27155: Metric for raw task return codes

Posted by GitBox <gi...@apache.org>.
IKholopov commented on code in PR #27155:
URL: https://github.com/apache/airflow/pull/27155#discussion_r1018513609


##########
airflow/jobs/local_task_job.py:
##########
@@ -282,6 +283,11 @@ def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
             )
             session.rollback()
 
+    def _log_return_code_metric(self, return_code: int):
+        Stats.incr(
+            f'local_task_job.task_exit.{self.id}.{self.dag_id}.{self.task_instance.task_id}.{return_code}'
+        )

Review Comment:
   My reasoning for moving it here was to not mix the logic of processing exited task and preparing metric value + sending it.
   
   As there are plans to migrate metrics from StatsD to OpenTelemetry [AIP-49](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow), I thought it would be cleaner to keep these pieces of code that will need to be refactored in this migration separate from the other logic of the job. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] steveyz-astro commented on pull request #27155: Metric for raw task return codes

Posted by "steveyz-astro (via GitHub)" <gi...@apache.org>.
steveyz-astro commented on PR #27155:
URL: https://github.com/apache/airflow/pull/27155#issuecomment-1492629956

   @IKholopov I'm curious why job_id was added to the metric name here.  It increases the cardinality of this metric without seemingly adding much diagnostic value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org