You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/17 01:47:57 UTC
[airflow] branch v2-1-test updated (102ad7d -> 3248cd4)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.
from 102ad7d Forces rebuilding the image for cache pushing
new 15192d7 Run mini scheduler in LocalTaskJob during task exit (#16289)
new b789d80 Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)
new 37b8ee6 Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
new ef9a0c8 Improve `dag_maker` fixture (#17324)
new 3248cd4 Add Changelog updates for 2.1.3
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
CHANGELOG.txt | 43 +++++++++++++++++++++++++++++++++++++++
tests/jobs/test_local_task_job.py | 28 +++++++++++++++++++++++++
2 files changed, 71 insertions(+)
[airflow] 04/05: Improve `dag_maker` fixture (#17324)
Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ef9a0c8b4a10455c1fd18ed881bd16b4219f018d
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 2 07:37:40 2021 +0100
Improve `dag_maker` fixture (#17324)
This PR improves the dag_maker fixture to enable creation of dagrun, dag and dag_model separately
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 5c1e09cafacea922b9281e901db7da7cadb3e9be)
---
tests/jobs/test_local_task_job.py | 21 ++++++---------------
1 file changed, 6 insertions(+), 15 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 2e28332..c18e6e5 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -836,34 +836,25 @@ class TestLocalTaskJob:
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1
- def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
"""Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
- dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
- op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+ with dag_maker(dag_id='test_dags') as dag:
+ op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
session = settings.Session()
- orm_dag = DagModel(
- dag_id=dag.dag_id,
+ dag_maker.make_dagmodel(
has_task_concurrency_limits=False,
- next_dagrun=dag.start_date,
next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
is_active=True,
is_paused=True,
)
- session.add(orm_dag)
- session.flush()
# Write Dag to DB
dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
dagbag.bag_dag(dag, root_dag=dag)
dagbag.sync_to_db()
- dr = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
- execution_date=DEFAULT_DATE,
- start_date=DEFAULT_DATE,
- session=session,
- )
+ dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
assert dr.state == State.RUNNING
ti = TaskInstance(op1, dr.execution_date)
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
[airflow] 01/05: Run mini scheduler in LocalTaskJob during task
exit (#16289)
Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 15192d766813ad47184db02ca4750f5d4d5b47c6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jun 10 14:29:30 2021 +0100
Run mini scheduler in LocalTaskJob during task exit (#16289)
Currently, the chances of tasks being killed by the LocalTaskJob heartbeat is high.
This is because, after marking a task successful/failed in Taskinstance.py and mini scheduler is enabled,
we start running the mini scheduler. Whenever the mini scheduling takes time and meet the next job heartbeat,
the heartbeat detects that this task has succeeded with no return code because LocalTaskJob.handle_task_exit
was not called after the task succeeded. Hence, the heartbeat thinks that this task was externally marked failed/successful.
This change resolves this by moving the mini scheduler to LocalTaskJob at the handle_task_exit method ensuring
that the task will no longer be killed by the next heartbeat
(cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588)
---
tests/jobs/test_local_task_job.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 14c74ce..060bce8 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -842,12 +842,12 @@ class TestLocalTaskJob:
op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
session = settings.Session()
- dag_maker.make_dagmodel(
- has_task_concurrency_limits=False,
- next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
- is_active=True,
- is_paused=True,
- )
+ dagmodel = dag_maker.dag_model
+ dagmodel.next_dagrun_create_after = dag.following_schedule(DEFAULT_DATE)
+ dagmodel.is_paused = True
+ session.merge(dagmodel)
+ session.flush()
+
# Write Dag to DB
dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
dagbag.bag_dag(dag, root_dag=dag)
[airflow] 03/05: Fix task retries when they receive sigkill and
have retries and properly handle sigterm (#16301)
Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 37b8ee6b526681d1494c27342880a40b249e1f11
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 28 15:57:35 2021 +0100
Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
Currently, tasks are not retried when they receive SIGKILL or SIGTERM even if the task has retry. This change fixes it
and added test for both SIGTERM and SIGKILL so we don't experience regression
Also, SIGTERM sets the task as failed and raises AirflowException which heartbeat sometimes see as externally set to fail
and not call failure_callbacks. This commit also fixes this by calling handle_task_exit when a task gets SIGTERM
Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
(cherry picked from commit 4e2a94c6d1bde5ddf2aa0251190c318ac22f3b17)
---
tests/jobs/test_local_task_job.py | 50 +++++++++++++++++++++++++++++++++++++++
1 file changed, 50 insertions(+)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index c6a92b2..2e28332 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,6 +786,56 @@ class TestLocalTaskJob:
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1
+ def test_process_sigterm_works_with_retries(self, dag_maker):
+ """
+ Test that ensures that task runner sets tasks to retry when they(task runner)
+ receive sigterm
+ """
+ # use shared memory value so we can properly track value change even if
+ # it's been updated across processes.
+ retry_callback_called = Value('i', 0)
+ task_terminated_externally = Value('i', 1)
+ shared_mem_lock = Lock()
+
+ def retry_callback(context):
+ with shared_mem_lock:
+ retry_callback_called.value += 1
+ assert context['dag_run'].dag_id == 'test_mark_failure_2'
+
+ def task_function(ti):
+ time.sleep(60)
+ # This should not happen -- the state change should be noticed and the task should get killed
+ with shared_mem_lock:
+ task_terminated_externally.value = 0
+
+ with dag_maker(dag_id='test_mark_failure_2'):
+ task = PythonOperator(
+ task_id='test_on_failure',
+ python_callable=task_function,
+ retries=1,
+ retry_delay=timedelta(seconds=2),
+ on_retry_callback=retry_callback,
+ )
+ ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+ ti.refresh_from_db()
+ job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+ job1.task_runner = StandardTaskRunner(job1)
+ job1.task_runner.start()
+ settings.engine.dispose()
+ process = multiprocessing.Process(target=job1.run)
+ process.start()
+ for _ in range(0, 25):
+ ti.refresh_from_db()
+ if ti.state == State.RUNNING and ti.pid is not None:
+ break
+ time.sleep(0.2)
+ os.kill(process.pid, signal.SIGTERM)
+ process.join()
+ ti.refresh_from_db()
+ assert ti.state == State.UP_FOR_RETRY
+ assert retry_callback_called.value == 1
+ assert task_terminated_externally.value == 1
+
def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
"""Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
[airflow] 02/05: Add Pytest fixture to create dag and dagrun and
use it on local task job tests (#16889)
Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b789d80efaa657e6269b42bcca9180004471f47e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 20 18:48:35 2021 +0100
Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)
This change adds pytest fixture to create dag and dagrun then use it on local task job tests
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 7c0d8a2f83cc6db25bdddcf6cecb6fb56f05f02f)
---
tests/jobs/test_local_task_job.py | 77 ++++++++++++++++-----------------------
1 file changed, 32 insertions(+), 45 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 060bce8..c6a92b2 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,55 +786,42 @@ class TestLocalTaskJob:
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1
- def test_process_sigterm_works_with_retries(self, dag_maker):
- """
- Test that ensures that task runner sets tasks to retry when they(task runner)
- receive sigterm
- """
- # use shared memory value so we can properly track value change even if
- # it's been updated across processes.
- retry_callback_called = Value('i', 0)
- task_terminated_externally = Value('i', 1)
- shared_mem_lock = Lock()
-
- def retry_callback(context):
- with shared_mem_lock:
- retry_callback_called.value += 1
- assert context['dag_run'].dag_id == 'test_mark_failure_2'
+ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+ """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+ dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+ op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
- def task_function(ti):
- time.sleep(60)
- # This should not happen -- the state change should be noticed and the task should get killed
- with shared_mem_lock:
- task_terminated_externally.value = 0
+ session = settings.Session()
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ has_task_concurrency_limits=False,
+ next_dagrun=dag.start_date,
+ next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+ is_active=True,
+ is_paused=True,
+ )
+ session.add(orm_dag)
+ session.flush()
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
- with dag_maker(dag_id='test_mark_failure_2'):
- task = PythonOperator(
- task_id='test_on_failure',
- python_callable=task_function,
- retries=1,
- retry_delay=timedelta(seconds=2),
- on_retry_callback=retry_callback,
- )
- ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
- ti.refresh_from_db()
+ dr = dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE,
+ session=session,
+ )
+ assert dr.state == State.RUNNING
+ ti = TaskInstance(op1, dr.execution_date)
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
- job1.task_runner.start()
- settings.engine.dispose()
- process = multiprocessing.Process(target=job1.run)
- process.start()
- for _ in range(0, 25):
- ti.refresh_from_db()
- if ti.state == State.RUNNING and ti.pid is not None:
- break
- time.sleep(0.2)
- os.kill(process.pid, signal.SIGTERM)
- process.join()
- ti.refresh_from_db()
- assert ti.state == State.UP_FOR_RETRY
- assert retry_callback_called.value == 1
- assert task_terminated_externally.value == 1
+ job1.run()
+ session.add(dr)
+ session.refresh(dr)
+ assert dr.state == State.SUCCESS
def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
"""Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
[airflow] 05/05: Add Changelog updates for 2.1.3
Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3248cd4054fde3afae7d0556bf8de25d8d60906c
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Mon Aug 16 18:39:18 2021 -0700
Add Changelog updates for 2.1.3
---
CHANGELOG.txt | 43 +++++++++++++++++++++++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 0ff1f3f..f0e5eaa 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,3 +1,46 @@
+Airflow 2.1.3, 2021-08-23
+-------------------------
+
+Bug Fixes
+"""""""""
+
+- Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
+- Fix redacting secrets in context exceptions. (#17618)
+- Fix race condition with dagrun callbacks (#16741)
+- Add 'queued' to DagRunState (#16854)
+- Add 'queued' state to DagRun (#16401)
+- Fix external elasticsearch logs link (#16357)
+- Add proper warning message when recorded PID is different from current PID (#17411)
+- Fix running tasks with default_impersonation config (#17229)
+- Rescue if a DagRun's DAG was removed from db (#17544)
+- Fixed broken json_client (#17529)
+- Handle and log exceptions raised during task callback (#17347)
+- Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)
+- Show serialization exceptions in DAG parsing log (#17277)
+- Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)
+- Adds more explanatory message when SecretsMasker is not configured (#17101)
+- Enable the use of __init_subclass__ in subclasses of BaseOperator (#17027)
+- Fix task instance retrieval in XCom view (#16923)
+- Validate type of `priority_weight` during parsing (#16765)
+- Correctly handle custom `deps` and `task_group` during DAG Serialization (#16734)
+- Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)
+- Fix calculating duration in tree view (#16695)
+- Fix ``AttributeError``: ``datetime.timezone`` object has no attribute ``name`` (#16599)
+- Redact conn secrets in webserver logs (#16579)
+- Change graph focus to top of view instead of center (#16484)
+- Fail tasks in scheduler when executor reports they failed (#15929)
+- fix(smart_sensor): Unbound variable errors (#14774)
+- Add back missing permissions to UserModelView controls. (#17431)
+- Better diagnostics and self-healing of docker-compose (#17484)
+- Improve diagnostics message when users have secret_key misconfigured (#17410)
+- Add SSH_PORT variable
+
+Misc
+""""
+
+- Run mini scheduler in LocalTaskJob during task exit (#16289)
+
+
Airflow 2.1.2, 2021-07-13
-------------------------