You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/07/07 14:18:10 UTC
[airflow] branch v2-1-test updated: Fix impersonation issue with
LocalTaskJob (#16852)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-1-test by this push:
new 72f0ce4 Fix impersonation issue with LocalTaskJob (#16852)
72f0ce4 is described below
commit 72f0ce4340bdde668560bf74df3c74cc8410e6dc
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 7 15:06:21 2021 +0100
Fix impersonation issue with LocalTaskJob (#16852)
Running a task with run_as_user fails because PIDs are not matched
correctly.
This change fixes it by matching the parent process ID (the `sudo`
process) of the task instance to the current process ID of the task_runner
process when we use impersonation
Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
(cherry picked from commit feea38057ae16b5c09dfdda19552a5e75c01a2dd)
---
airflow/jobs/local_task_job.py | 5 +++-
tests/jobs/test_local_task_job.py | 54 +++++++++++++++++++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index efd84d6..c697a85 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -20,6 +20,8 @@
import signal
from typing import Optional
+import psutil
+
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.jobs.base_job import BaseJob
@@ -188,9 +190,10 @@ class LocalTaskJob(BaseJob):
fqdn,
)
raise AirflowException("Hostname of job runner does not match")
-
current_pid = self.task_runner.process.pid
same_process = ti.pid == current_pid
+ if ti.run_as_user:
+ same_process = psutil.Process(ti.pid).ppid() == current_pid
if ti.pid is not None and not same_process:
self.log.warning("Recorded pid %s does not match " "the current pid %s", ti.pid, current_pid)
raise AirflowException("PID of job runner does not match")
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 97645e4..1eaad46 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -49,6 +49,8 @@ from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.db import clear_db_jobs, clear_db_runs
from tests.test_utils.mock_executor import MockExecutor
+# pylint: skip-file
+
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER']
@@ -135,6 +137,58 @@ class TestLocalTaskJob(unittest.TestCase):
with pytest.raises(AirflowException):
job1.heartbeat_callback() # pylint: disable=no-value-for-parameter
+ @mock.patch('airflow.jobs.local_task_job.psutil')
+ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock):
+ session = settings.Session()
+ dag = DAG('test_localtaskjob_heartbeat', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})
+
+ with dag:
+ op1 = DummyOperator(task_id='op1', run_as_user='myuser')
+
+ dag.clear()
+ dr = dag.create_dagrun(
+ run_id="test",
+ state=State.SUCCESS,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session,
+ )
+
+ ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+ ti.state = State.RUNNING
+ ti.pid = 2
+ ti.hostname = get_hostname()
+ session.commit()
+
+ job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+ ti.task = op1
+ ti.refresh_from_task(op1)
+ job1.task_runner = StandardTaskRunner(job1)
+ job1.task_runner.process = mock.Mock()
+ job1.task_runner.process.pid = 2
+ # Here, ti.pid is 2, the parent process of ti.pid is a mock(different).
+ # And task_runner process is 2. Should fail
+ with pytest.raises(AirflowException, match='PID of job runner does not match'):
+ job1.heartbeat_callback()
+
+ job1.task_runner.process.pid = 1
+ # We make the parent process of ti.pid to equal the task_runner process id
+ psutil_mock.Process.return_value.ppid.return_value = 1
+ ti.state = State.RUNNING
+ ti.pid = 2
+ # The task_runner process id is 1, same as the parent process of ti.pid
+ # as seen above
+ assert ti.run_as_user
+ session.merge(ti)
+ session.commit()
+ job1.heartbeat_callback(session=None)
+
+ # Here the task_runner process id is changed to 2
+ # while parent process of ti.pid is kept at 1, which is different
+ job1.task_runner.process.pid = 2
+ with pytest.raises(AirflowException, match='PID of job runner does not match'):
+ job1.heartbeat_callback()
+
def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast