You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/17 01:47:57 UTC

[airflow] branch v2-1-test updated (102ad7d -> 3248cd4)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a change to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 102ad7d  Forces rebuilding the image for cache pushing
     new 15192d7  Run mini scheduler in LocalTaskJob during task exit (#16289)
     new b789d80  Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)
     new 37b8ee6  Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
     new ef9a0c8  Improve `dag_maker` fixture (#17324)
     new 3248cd4  Add Changelog updates for 2.1.3

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG.txt                     | 43 +++++++++++++++++++++++++++++++++++++++
 tests/jobs/test_local_task_job.py | 28 +++++++++++++++++++++++++
 2 files changed, 71 insertions(+)

[airflow] 04/05: Improve `dag_maker` fixture (#17324)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ef9a0c8b4a10455c1fd18ed881bd16b4219f018d
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 2 07:37:40 2021 +0100

    Improve `dag_maker` fixture (#17324)
    
    This PR improves the dag_maker fixture to enable creation of dagrun, dag and dag_model separately
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 5c1e09cafacea922b9281e901db7da7cadb3e9be)
---
 tests/jobs/test_local_task_job.py | 21 ++++++---------------
 1 file changed, 6 insertions(+), 15 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 2e28332..c18e6e5 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -836,34 +836,25 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
-    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
         """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
-        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
-        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+        with dag_maker(dag_id='test_dags') as dag:
+            op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
 
         session = settings.Session()
-        orm_dag = DagModel(
-            dag_id=dag.dag_id,
+        dag_maker.make_dagmodel(
             has_task_concurrency_limits=False,
-            next_dagrun=dag.start_date,
             next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
             is_active=True,
             is_paused=True,
         )
-        session.add(orm_dag)
-        session.flush()
         # Write Dag to DB
         dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
         dagbag.bag_dag(dag, root_dag=dag)
         dagbag.sync_to_db()
 
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            session=session,
-        )
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
         assert dr.state == State.RUNNING
         ti = TaskInstance(op1, dr.execution_date)
         job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())

[airflow] 01/05: Run mini scheduler in LocalTaskJob during task exit (#16289)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 15192d766813ad47184db02ca4750f5d4d5b47c6
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jun 10 14:29:30 2021 +0100

    Run mini scheduler in LocalTaskJob during task exit (#16289)
    
    Currently, the chances of tasks being killed by the LocalTaskJob heartbeat is high.
    
    This is because, after marking a task successful/failed in Taskinstance.py and mini scheduler is enabled,
    we start running the mini scheduler. Whenever the mini scheduling takes time and meet the next job heartbeat,
    the heartbeat detects that this task has succeeded with no return code because LocalTaskJob.handle_task_exit
    was not called after the task succeeded. Hence, the heartbeat thinks that this task was externally marked failed/successful.
    
    This change resolves this by moving the mini scheduler to LocalTaskJob at the handle_task_exit method ensuring
    that the task will no longer be killed by the next heartbeat
    
    (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588)
---
 tests/jobs/test_local_task_job.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 14c74ce..060bce8 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -842,12 +842,12 @@ class TestLocalTaskJob:
             op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
 
         session = settings.Session()
-        dag_maker.make_dagmodel(
-            has_task_concurrency_limits=False,
-            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
-            is_active=True,
-            is_paused=True,
-        )
+        dagmodel = dag_maker.dag_model
+        dagmodel.next_dagrun_create_after = dag.following_schedule(DEFAULT_DATE)
+        dagmodel.is_paused = True
+        session.merge(dagmodel)
+        session.flush()
+
         # Write Dag to DB
         dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
         dagbag.bag_dag(dag, root_dag=dag)

[airflow] 03/05: Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 37b8ee6b526681d1494c27342880a40b249e1f11
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jul 28 15:57:35 2021 +0100

    Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
    
    Currently, tasks are not retried when they receive SIGKILL or SIGTERM even if the task has retry. This change fixes it
    and added test for both SIGTERM and SIGKILL so we don't experience regression
    
    Also, SIGTERM sets the task as failed and raises AirflowException which heartbeat sometimes see as externally set to fail
    and not call failure_callbacks. This commit also fixes this by calling handle_task_exit when a task gets SIGTERM
    
    Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
    (cherry picked from commit 4e2a94c6d1bde5ddf2aa0251190c318ac22f3b17)
---
 tests/jobs/test_local_task_job.py | 50 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 50 insertions(+)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index c6a92b2..2e28332 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,6 +786,56 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
+    def test_process_sigterm_works_with_retries(self, dag_maker):
+        """
+        Test that ensures that task runner sets tasks to retry when they(task runner)
+         receive sigterm
+        """
+        # use shared memory value so we can properly track value change even if
+        # it's been updated across processes.
+        retry_callback_called = Value('i', 0)
+        task_terminated_externally = Value('i', 1)
+        shared_mem_lock = Lock()
+
+        def retry_callback(context):
+            with shared_mem_lock:
+                retry_callback_called.value += 1
+            assert context['dag_run'].dag_id == 'test_mark_failure_2'
+
+        def task_function(ti):
+            time.sleep(60)
+            # This should not happen -- the state change should be noticed and the task should get killed
+            with shared_mem_lock:
+                task_terminated_externally.value = 0
+
+        with dag_maker(dag_id='test_mark_failure_2'):
+            task = PythonOperator(
+                task_id='test_on_failure',
+                python_callable=task_function,
+                retries=1,
+                retry_delay=timedelta(seconds=2),
+                on_retry_callback=retry_callback,
+            )
+        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.task_runner.start()
+        settings.engine.dispose()
+        process = multiprocessing.Process(target=job1.run)
+        process.start()
+        for _ in range(0, 25):
+            ti.refresh_from_db()
+            if ti.state == State.RUNNING and ti.pid is not None:
+                break
+            time.sleep(0.2)
+        os.kill(process.pid, signal.SIGTERM)
+        process.join()
+        ti.refresh_from_db()
+        assert ti.state == State.UP_FOR_RETRY
+        assert retry_callback_called.value == 1
+        assert task_terminated_externally.value == 1
+
     def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
         """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
         dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)

[airflow] 02/05: Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b789d80efaa657e6269b42bcca9180004471f47e
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 20 18:48:35 2021 +0100

    Add Pytest fixture to create dag and dagrun and use it on local task job tests (#16889)
    
    This change adds pytest fixture to create dag and dagrun then use it on local task job tests
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 7c0d8a2f83cc6db25bdddcf6cecb6fb56f05f02f)
---
 tests/jobs/test_local_task_job.py | 77 ++++++++++++++++-----------------------
 1 file changed, 32 insertions(+), 45 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 060bce8..c6a92b2 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -786,55 +786,42 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
-    def test_process_sigterm_works_with_retries(self, dag_maker):
-        """
-        Test that ensures that task runner sets tasks to retry when they(task runner)
-         receive sigterm
-        """
-        # use shared memory value so we can properly track value change even if
-        # it's been updated across processes.
-        retry_callback_called = Value('i', 0)
-        task_terminated_externally = Value('i', 1)
-        shared_mem_lock = Lock()
-
-        def retry_callback(context):
-            with shared_mem_lock:
-                retry_callback_called.value += 1
-            assert context['dag_run'].dag_id == 'test_mark_failure_2'
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+        """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
 
-        def task_function(ti):
-            time.sleep(60)
-            # This should not happen -- the state change should be noticed and the task should get killed
-            with shared_mem_lock:
-                task_terminated_externally.value = 0
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=dag.start_date,
+            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
+            is_active=True,
+            is_paused=True,
+        )
+        session.add(orm_dag)
+        session.flush()
+        # Write Dag to DB
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+        dagbag.bag_dag(dag, root_dag=dag)
+        dagbag.sync_to_db()
 
-        with dag_maker(dag_id='test_mark_failure_2'):
-            task = PythonOperator(
-                task_id='test_on_failure',
-                python_callable=task_function,
-                retries=1,
-                retry_delay=timedelta(seconds=2),
-                on_retry_callback=retry_callback,
-            )
-        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
-        ti.refresh_from_db()
+        dr = dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            state=State.RUNNING,
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            session=session,
+        )
+        assert dr.state == State.RUNNING
+        ti = TaskInstance(op1, dr.execution_date)
         job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
         job1.task_runner = StandardTaskRunner(job1)
-        job1.task_runner.start()
-        settings.engine.dispose()
-        process = multiprocessing.Process(target=job1.run)
-        process.start()
-        for _ in range(0, 25):
-            ti.refresh_from_db()
-            if ti.state == State.RUNNING and ti.pid is not None:
-                break
-            time.sleep(0.2)
-        os.kill(process.pid, signal.SIGTERM)
-        process.join()
-        ti.refresh_from_db()
-        assert ti.state == State.UP_FOR_RETRY
-        assert retry_callback_called.value == 1
-        assert task_terminated_externally.value == 1
+        job1.run()
+        session.add(dr)
+        session.refresh(dr)
+        assert dr.state == State.SUCCESS
 
     def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
         """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""

[airflow] 05/05: Add Changelog updates for 2.1.3

Posted by jh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3248cd4054fde3afae7d0556bf8de25d8d60906c
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Mon Aug 16 18:39:18 2021 -0700

    Add Changelog updates for 2.1.3
---
 CHANGELOG.txt | 43 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/CHANGELOG.txt b/CHANGELOG.txt
index 0ff1f3f..f0e5eaa 100644
--- a/CHANGELOG.txt
+++ b/CHANGELOG.txt
@@ -1,3 +1,46 @@
+Airflow 2.1.3, 2021-08-23
+-------------------------
+
+Bug Fixes
+"""""""""
+
+- Fix task retries when they receive sigkill and have retries and properly handle sigterm (#16301)
+- Fix redacting secrets in context exceptions. (#17618)
+- Fix race condition with dagrun callbacks (#16741)
+- Add 'queued' to DagRunState (#16854)
+- Add 'queued' state to DagRun (#16401)
+- Fix external elasticsearch logs link (#16357)
+- Add proper warning message when recorded PID is different from current PID (#17411)
+- Fix running tasks with default_impersonation config (#17229)
+- Rescue if a DagRun's DAG was removed from db (#17544)
+- Fixed broken json_client (#17529)
+- Handle and log exceptions raised during task callback (#17347)
+- Fix CLI 'kubernetes cleanup-pods' which fails on invalid label key (#17298)
+- Show serialization exceptions in DAG parsing log (#17277)
+- Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)
+- Adds more explanatory message when SecretsMasker is not configured (#17101)
+- Enable the use of __init_subclass__ in subclasses of BaseOperator (#17027)
+- Fix task instance retrieval in XCom view (#16923)
+- Validate type of `priority_weight` during parsing (#16765)
+- Correctly handle custom `deps` and `task_group` during DAG Serialization (#16734)
+- Fix slow (cleared) tasks being be adopted by Celery worker. (#16718)
+- Fix calculating duration in tree view (#16695)
+- Fix ``AttributeError``: ``datetime.timezone`` object has no attribute ``name`` (#16599)
+- Redact conn secrets in webserver logs (#16579)
+- Change graph focus to top of view instead of center (#16484)
+- Fail tasks in scheduler when executor reports they failed (#15929)
+- fix(smart_sensor): Unbound variable errors (#14774)
+- Add back missing permissions to UserModelView controls. (#17431)
+- Better diagnostics and self-healing of docker-compose (#17484)
+- Improve diagnostics message when users have secret_key misconfigured (#17410)
+- Add SSH_PORT variable
+
+Misc
+""""
+
+- Run mini scheduler in LocalTaskJob during task exit (#16289)
+
+
 Airflow 2.1.2, 2021-07-13
 -------------------------