You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/08/15 18:44:36 UTC

[airflow] branch v2-3-test updated (881ac77082 -> 4a8d13997f)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a change to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


    from 881ac77082 Better behaviour for self-update of Breeze (#25572)
     new bf9f4e1282 Clear next method when clearing TIs (#23929)
     new 658f5abe60 Don't rely on current ORM structure for db clean command (#23574)
     new 394ebc80b5 Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
     new d9ca589649 Doc: Add hyperlinks to Github PRs for Release Notes (#24532)
     new 8a95092c9e TriggerDagRunOperator.operator_extra_links is attr (#24676)
     new 03b2fad778 Update PythonVirtualenvOperator Howto (#24782)
     new 98baf6054b Note how DAG policy works with default_args (#24804)
     new 9b4eec2a0e Add %z for %(asctime)s to fix timezone for logs on UI (#24811)
     new a720e4fb97 Bind log server on worker to IPv6 address (#24755) (#24846)
     new 9b6df301ac Fix zombie task handling with multiple schedulers (#24906)
     new 9a5f8739f3 Fix tag link on dag detail page (#24918)
     new 481877a564 Fix syntax in mysql setup documentation (#24893 (#24939)
     new 977d11dbed Extends resolve_xcom_backend function level documentation (#24965)
     new 651b43513a Update set-up-database.rst (#24983)
     new f2a6915393 Sort operator extra links (#24992)
     new 48de9c7791 Update 2.3.3 date in release notes (#25004)
     new 0eec87e2b3 chore: fix typo (#25010)
     new 7d2e516eaf No grid auto-refresh for backfill dag runs (#25042)
     new b96e53a420 airflow/www/package.json: Add name, version fields. (#25065)
     new af702bf69c Fix invalidateQueries call (#25097)
     new 637af6a06d Added logging to get better diagnosis of flaky backfill test (#25106)
     new 975dd0c8a2 Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
     new 2f6d9ff25c call updateNodeLabels after expandGroup (#25217)
     new 4316387086 convert TimeSensorAsync target_time to utc on call time (#25221)
     new 6567b3248f Only load distribution of a name once (#25296)
     new de68fd1a81 Add __repr__ to ParamsDict class (#25305)
     new 605126ce00 fix - resolve bash by absolute path (#25331)
     new 385f04ba34 Fix Serialization error in TaskCallbackRequest (#25471)
     new 18f13384f1 Fix "This Session's transaction has been rolled back" (#25532)
     new 6821fe12f8 Allow wildcarded CORS origins (#25553)
     new 23c714b0c2 Fix the errors raised when None is passed to template filters (#25593)
     new b6a2cd1aa3 Configurable umask to all deamonized processes. (#25664)
     new cf448ea174 Adding mysql index hint to use index on task_instance.state in critical section query (#25673)
     new 78fa95ca33 Don't mistakenly take a lock on DagRun via ti.refresh_from_fb (#25312)
     new 626814aec8 Refactor DR.task_instance_scheduling_decisions (#24774)
     new aff5994e2b Remove useless logging line (#25347)
     new ef5b4c9623 Removed interfering force of index. (#25404)
     new 146d5109b4 Added exception catching to send default email if template file raises any exception (#24943)
     new dffcb5d5cd set default task group in dag.add_task method (#25000)
     new cfce076abd fix: change disable_verify_ssl behaviour (#25023)
     new 5c2fce92f4 Fix `airflow db reset` when dangling tables exist (#25441)
     new 31a4b8cf1f Fix reducing mapped length of a mapped task at runtime after a clear (#25531)
     new 530a7ce802 Add right padding (#25554)
     new ed9bf28d49 Cache the custom secrets backend so the same instance gets re-used (#25556)
     new 4a8d13997f Fix mapped sensor with reschedule mode (#25594)

The 45 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 RELEASE_NOTES.rst                                  |   2 +-
 .../api_connexion/endpoints/extra_link_endpoint.py |   2 +-
 airflow/callbacks/callback_requests.py             |   2 +-
 airflow/cli/cli_parser.py                          |   7 +-
 airflow/cli/commands/celery_command.py             |   3 +
 airflow/cli/commands/dag_processor_command.py      |   2 +
 airflow/cli/commands/db_command.py                 |   1 +
 airflow/cli/commands/kerberos_command.py           |   1 +
 airflow/cli/commands/scheduler_command.py          |   1 +
 airflow/cli/commands/triggerer_command.py          |   1 +
 airflow/cli/commands/webserver_command.py          |   1 +
 airflow/config_templates/airflow_local_settings.py |  11 +-
 airflow/config_templates/config.yml                |  28 ++-
 airflow/config_templates/default_airflow.cfg       |  14 +-
 airflow/configuration.py                           |  19 +-
 airflow/dag_processing/manager.py                  |  41 +++-
 airflow/dag_processing/processor.py                |   1 -
 airflow/jobs/scheduler_job.py                      |  10 +-
 airflow/kubernetes/kube_client.py                  |  11 +-
 airflow/models/dag.py                              |  19 +-
 airflow/models/dagrun.py                           |  83 ++++---
 airflow/models/param.py                            |   3 +
 airflow/models/taskinstance.py                     |  49 +++-
 airflow/models/taskmixin.py                        |   6 +-
 airflow/models/taskreschedule.py                   |   1 +
 airflow/models/xcom.py                             |   6 +-
 airflow/operators/bash.py                          |   4 +-
 airflow/operators/trigger_dagrun.py                |   6 +-
 airflow/plugins_manager.py                         |   2 +-
 airflow/sensors/time_sensor.py                     |   4 +-
 airflow/serialization/serialized_objects.py        |   2 +-
 airflow/settings.py                                |   2 +
 airflow/templates.py                               |  24 +-
 airflow/ti_deps/deps/ready_to_reschedule.py        |  11 +-
 airflow/utils/db.py                                |  20 +-
 airflow/utils/db_cleanup.py                        | 270 ++++++++++++---------
 airflow/utils/entry_points.py                      |  22 +-
 airflow/utils/log/timezone_aware.py                |  49 ++++
 airflow/utils/serve_logs.py                        |  22 +-
 airflow/www/extensions/init_views.py               |   8 +-
 airflow/www/package.json                           |   2 +
 airflow/www/static/js/graph.js                     |   1 +
 airflow/www/static/js/grid/Grid.tsx                |   1 +
 airflow/www/static/js/grid/api/useClearTask.js     |   2 +-
 airflow/www/static/js/grid/api/useGridData.test.js |  30 ++-
 airflow/www/static/js/grid/api/useGridData.ts      |   2 +-
 .../www/static/js/grid/api/useMarkFailedTask.js    |   2 +-
 .../www/static/js/grid/api/useMarkSuccessTask.js   |   2 +-
 airflow/www/static/js/grid/api/useRunTask.js       |   2 +-
 airflow/www/static/js/ti_log.js                    |   4 +-
 airflow/www/templates/airflow/dag_details.html     |   2 +-
 docs/apache-airflow/concepts/cluster-policies.rst  |   4 +
 docs/apache-airflow/howto/operator/python.rst      |   3 +-
 docs/apache-airflow/howto/set-up-database.rst      |   6 +-
 docs/apache-airflow/static/gh-jira-links.js        |   2 +-
 docs/apache-airflow/tutorial.rst                   |   2 +-
 docs/apache-airflow/usage-cli.rst                  |   2 +
 docs/helm-chart/static/gh-jira-links.js            |   2 +-
 newsfragments/23574.feature.rst                    |   1 +
 newsfragments/24755.improvement.rst                |   1 +
 newsfragments/24811.significant.rst                |  22 ++
 newsfragments/25147.bugfix.rst                     |   1 +
 tests/api_connexion/test_cors.py                   | 140 +++++++++++
 tests/callbacks/test_callback_requests.py          |  21 +-
 tests/cli/commands/test_celery_command.py          |   1 +
 tests/cli/commands/test_db_command.py              |  34 ++-
 tests/cli/commands/test_kerberos_command.py        |   1 +
 tests/core/test_configuration.py                   |  81 +++++++
 tests/dag_processing/test_manager.py               |  62 +++++
 tests/jobs/test_backfill_job.py                    |   5 +-
 tests/jobs/test_scheduler_job.py                   |  95 ++++++--
 tests/kubernetes/test_client.py                    |  13 +
 tests/models/test_cleartasks.py                    |  23 ++
 tests/models/test_dag.py                           |  14 ++
 tests/models/test_dagrun.py                        |  64 +++++
 tests/models/test_param.py                         |   4 +
 tests/models/test_taskinstance.py                  | 251 +++++++++++++++++++
 tests/plugins/test_plugins_manager.py              |   3 +-
 tests/sensors/test_time_sensor.py                  |   7 +
 tests/serialization/test_dag_serialization.py      |  28 +++
 tests/test_utils/db.py                             |  10 +-
 tests/ti_deps/deps/test_ready_to_reschedule_dep.py |  81 ++++++-
 tests/utils/test_db_cleanup.py                     |  44 +++-
 tests/utils/test_entry_points.py                   |  49 ++++
 tests/www/views/test_views.py                      |   2 +-
 85 files changed, 1593 insertions(+), 307 deletions(-)
 create mode 100644 airflow/utils/log/timezone_aware.py
 create mode 100644 newsfragments/23574.feature.rst
 create mode 100644 newsfragments/24755.improvement.rst
 create mode 100644 newsfragments/24811.significant.rst
 create mode 100644 newsfragments/25147.bugfix.rst
 create mode 100644 tests/api_connexion/test_cors.py
 create mode 100644 tests/utils/test_entry_points.py


[airflow] 27/45: fix - resolve bash by absolute path (#25331)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 605126ce00ef981c1c0a7bb1977245f5b57ad75f
Author: Matt Rixman <58...@users.noreply.github.com>
AuthorDate: Thu Jul 28 11:39:35 2022 -0600

    fix - resolve bash by absolute path (#25331)
    
    Co-authored-by: Matt Rixman <Ma...@users.noreply.github.com>
    (cherry picked from commit c3adf3e65d32d8145e2341989a5336c3e5269e62)
---
 airflow/operators/bash.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py
index a4fac2b7f7..47c7a6398f 100644
--- a/airflow/operators/bash.py
+++ b/airflow/operators/bash.py
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import os
+import shutil
 from typing import Dict, Optional, Sequence
 
 from airflow.compat.functools import cached_property
@@ -176,6 +177,7 @@ class BashOperator(BaseOperator):
         return env
 
     def execute(self, context: Context):
+        bash_path = shutil.which("bash") or "bash"
         if self.cwd is not None:
             if not os.path.exists(self.cwd):
                 raise AirflowException(f"Can not find the cwd: {self.cwd}")
@@ -183,7 +185,7 @@ class BashOperator(BaseOperator):
                 raise AirflowException(f"The cwd {self.cwd} must be a directory")
         env = self.get_env(context)
         result = self.subprocess_hook.run_command(
-            command=['bash', '-c', self.bash_command],
+            command=[bash_path, '-c', self.bash_command],
             env=env,
             output_encoding=self.output_encoding,
             cwd=self.cwd,


[airflow] 38/45: Added exception catching to send default email if template file raises any exception (#24943)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 146d5109b45afe7e2feaf467dbd97a3955ad8996
Author: ecodina <41...@users.noreply.github.com>
AuthorDate: Mon Jul 18 20:22:03 2022 +0200

    Added exception catching to send default email if template file raises any exception (#24943)
    
    (cherry picked from commit fd6f537eab7430cb10ea057194bfc9519ff0bb38)
---
 airflow/models/taskinstance.py    |  9 +++++++--
 tests/models/test_taskinstance.py | 34 ++++++++++++++++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index afcc469feb..c369e84e47 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2337,8 +2337,13 @@ class TaskInstance(Base, LoggingMixin):
             def render(key: str, content: str) -> str:
                 if conf.has_option('email', key):
                     path = conf.get_mandatory_value('email', key)
-                    with open(path) as f:
-                        content = f.read()
+                    try:
+                        with open(path) as f:
+                            content = f.read()
+                    except FileNotFoundError:
+                        self.log.warning(f"Could not find email template file '{path!r}'. Using defaults...")
+                    except OSError:
+                        self.log.exception(f"Error while using email template '{path!r}'. Using defaults...")
                 return render_template_to_string(jinja_env.from_string(content), jinja_context)
 
             subject = render('subject_template', default_subject)
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 3990c3cbf5..1db5542904 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -1392,6 +1392,40 @@ class TestTaskInstance:
         assert 'template: test_email_alert_with_config' == title
         assert 'template: test_email_alert_with_config' == body
 
+    @patch('airflow.models.taskinstance.send_email')
+    def test_email_alert_with_filenotfound_config(self, mock_send_email, dag_maker):
+        with dag_maker(dag_id='test_failure_email'):
+            task = BashOperator(
+                task_id='test_email_alert_with_config',
+                bash_command='exit 1',
+                email='to',
+            )
+        ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
+        ti.task = task
+
+        # Run test when the template file is not found
+        opener = mock_open(read_data='template: {{ti.task_id}}')
+        opener.side_effect = FileNotFoundError
+        with patch('airflow.models.taskinstance.open', opener, create=True):
+            try:
+                ti.run()
+            except AirflowException:
+                pass
+
+        (email_error, title_error, body_error), _ = mock_send_email.call_args
+
+        # Rerun task without any error and no template file
+        try:
+            ti.run()
+        except AirflowException:
+            pass
+
+        (email_default, title_default, body_default), _ = mock_send_email.call_args
+
+        assert email_error == email_default == 'to'
+        assert title_default == title_error
+        assert body_default == body_error
+
     @pytest.mark.parametrize("task_id", ["test_email_alert", "test_email_alert__1"])
     @patch('airflow.models.taskinstance.send_email')
     def test_failure_mapped_taskflow(self, mock_send_email, dag_maker, session, task_id):


[airflow] 04/45: Doc: Add hyperlinks to Github PRs for Release Notes (#24532)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d9ca589649473b74ec093ae335f4ced0e623dc32
Author: Kaxil Naik <ka...@apache.org>
AuthorDate: Mon Jun 20 06:46:39 2022 +0100

    Doc: Add hyperlinks to Github PRs for Release Notes (#24532)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit abb8e44a8efd5c7ea50765d87572eed405b74dad)
---
 docs/apache-airflow/static/gh-jira-links.js | 2 +-
 docs/helm-chart/static/gh-jira-links.js     | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/apache-airflow/static/gh-jira-links.js b/docs/apache-airflow/static/gh-jira-links.js
index 7d6d7f7569..d731a93738 100644
--- a/docs/apache-airflow/static/gh-jira-links.js
+++ b/docs/apache-airflow/static/gh-jira-links.js
@@ -18,7 +18,7 @@
  */
 
 document.addEventListener('DOMContentLoaded', function() {
-  var el = document.getElementById('changelog');
+  var el = document.getElementById('release-notes');
   if (el !== null ) {
     // [AIRFLOW-...]
     el.innerHTML = el.innerHTML.replace(
diff --git a/docs/helm-chart/static/gh-jira-links.js b/docs/helm-chart/static/gh-jira-links.js
index 7d6d7f7569..d731a93738 100644
--- a/docs/helm-chart/static/gh-jira-links.js
+++ b/docs/helm-chart/static/gh-jira-links.js
@@ -18,7 +18,7 @@
  */
 
 document.addEventListener('DOMContentLoaded', function() {
-  var el = document.getElementById('changelog');
+  var el = document.getElementById('release-notes');
   if (el !== null ) {
     // [AIRFLOW-...]
     el.innerHTML = el.innerHTML.replace(


[airflow] 11/45: Fix tag link on dag detail page (#24918)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9a5f8739f33d3446d7e6ec4ea154706860688675
Author: Xianghu Zhao <xi...@gmail.com>
AuthorDate: Fri Jul 8 23:17:48 2022 +0800

    Fix tag link on dag detail page (#24918)
    
    (cherry picked from commit 24220bca90c61d0d20b23728a18bca09dd6e41eb)
---
 airflow/www/templates/airflow/dag_details.html | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/www/templates/airflow/dag_details.html b/airflow/www/templates/airflow/dag_details.html
index ac0ca50531..e721878ab3 100644
--- a/airflow/www/templates/airflow/dag_details.html
+++ b/airflow/www/templates/airflow/dag_details.html
@@ -105,7 +105,7 @@
         {% if tags is defined and tags %}
           {% for tag in tags | sort(attribute='name') %}
             <a class="label label-info"
-               href="/home?tags={{ tag.name }}"
+               href="{{ url_for('Airflow.index', tags=tag.name) }}"
                style="margin: 3px 6px 3px 0;"
                title="All DAGs tagged &ldquo;{{ tag.name }}&rdquo;"
             >


[airflow] 22/45: Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 975dd0c8a203746c82fc9e992bcfe754e6209c03
Author: Andrew Gibbs <gi...@andrew.gibbs.io>
AuthorDate: Thu Jul 21 22:03:20 2022 +0100

    Stop SLA callbacks gazumping other callbacks and DOS'ing the DagProcessorManager queue (#25147)
    
    (cherry picked from commit 17ec6dbcfe4a4d60092644e1b373d80789802b49)
---
 airflow/dag_processing/manager.py    | 41 +++++++++++++++++-------
 newsfragments/25147.bugfix.rst       |  1 +
 tests/dag_processing/test_manager.py | 62 ++++++++++++++++++++++++++++++++++++
 3 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index cbbc2bfdaf..93dc060a20 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -38,7 +38,7 @@ from sqlalchemy.orm import Session
 from tabulate import tabulate
 
 import airflow.models
-from airflow.callbacks.callback_requests import CallbackRequest
+from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
 from airflow.configuration import conf
 from airflow.dag_processing.processor import DagFileProcessorProcess
 from airflow.models import DagModel, DbCallbackRequest, errors
@@ -670,16 +670,35 @@ class DagFileProcessorManager(LoggingMixin):
             guard.commit()
 
     def _add_callback_to_queue(self, request: CallbackRequest):
-        self._callback_to_execute[request.full_filepath].append(request)
-        # Callback has a higher priority over DAG Run scheduling
-        if request.full_filepath in self._file_path_queue:
-            # Remove file paths matching request.full_filepath from self._file_path_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            self._file_path_queue = [
-                file_path for file_path in self._file_path_queue if file_path != request.full_filepath
-            ]
-        self._file_path_queue.insert(0, request.full_filepath)
+
+        # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
+        # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
+        # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
+        # the front of the queue, and we never get round to picking stuff off the back of the queue
+        if isinstance(request, SlaCallbackRequest):
+            if request in self._callback_to_execute[request.full_filepath]:
+                self.log.debug("Skipping already queued SlaCallbackRequest")
+                return
+
+            # not already queued, queue the file _at the back_, and add the request to the file's callbacks
+            self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath not in self._file_path_queue:
+                self._file_path_queue.append(request.full_filepath)
+
+        # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
+        # already in the queue
+        else:
+            self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
+            self._callback_to_execute[request.full_filepath].append(request)
+            if request.full_filepath in self._file_path_queue:
+                # Remove file paths matching request.full_filepath from self._file_path_queue
+                # Since we are already going to use that filepath to run callback,
+                # there is no need to have same file path again in the queue
+                self._file_path_queue = [
+                    file_path for file_path in self._file_path_queue if file_path != request.full_filepath
+                ]
+            self._file_path_queue.insert(0, request.full_filepath)
 
     def _refresh_dag_dir(self):
         """Refresh file paths from dag dir if we haven't done it for too long."""
diff --git a/newsfragments/25147.bugfix.rst b/newsfragments/25147.bugfix.rst
new file mode 100644
index 0000000000..2d4523604c
--- /dev/null
+++ b/newsfragments/25147.bugfix.rst
@@ -0,0 +1 @@
+``DagProcessorManager`` callback queue changed to queue SLAs at the back (stops DAG processing stalling due to SLAs)
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index ed1b194a7b..6875e6ad75 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -847,6 +847,68 @@ class TestDagFileProcessorManager:
         with create_session() as session:
             assert session.query(DbCallbackRequest).count() == 1
 
+    def test_callback_queue(self, tmpdir):
+        # given
+        manager = DagFileProcessorManager(
+            dag_directory=TEST_DAG_FOLDER,
+            max_runs=1,
+            processor_timeout=timedelta(days=365),
+            signal_conn=MagicMock(),
+            dag_ids=[],
+            pickle_dags=False,
+            async_mode=True,
+        )
+
+        dag1_req1 = DagCallbackRequest(
+            full_filepath="/green_eggs/ham/file1.py",
+            dag_id="dag1",
+            run_id="run1",
+            is_failure_callback=False,
+            msg=None,
+        )
+        dag1_req2 = DagCallbackRequest(
+            full_filepath="/green_eggs/ham/file1.py",
+            dag_id="dag1",
+            run_id="run1",
+            is_failure_callback=False,
+            msg=None,
+        )
+        dag1_sla1 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1")
+        dag1_sla2 = SlaCallbackRequest(full_filepath="/green_eggs/ham/file1.py", dag_id="dag1")
+
+        dag2_req1 = DagCallbackRequest(
+            full_filepath="/green_eggs/ham/file2.py",
+            dag_id="dag2",
+            run_id="run1",
+            is_failure_callback=False,
+            msg=None,
+        )
+
+        # when
+        manager._add_callback_to_queue(dag1_req1)
+        manager._add_callback_to_queue(dag1_sla1)
+        manager._add_callback_to_queue(dag2_req1)
+
+        # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last)
+        assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
+        assert set(manager._callback_to_execute.keys()) == {dag1_req1.full_filepath, dag2_req1.full_filepath}
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
+        assert manager._callback_to_execute[dag2_req1.full_filepath] == [dag2_req1]
+
+        # when
+        manager._add_callback_to_queue(dag1_sla2)
+
+        # then - since sla2 == sla1, should not have brought dag1 to the fore
+        assert manager._file_path_queue == [dag2_req1.full_filepath, dag1_req1.full_filepath]
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1]
+
+        # when
+        manager._add_callback_to_queue(dag1_req2)
+
+        # then - non-sla callback should have brought dag1 to the fore
+        assert manager._file_path_queue == [dag1_req1.full_filepath, dag2_req1.full_filepath]
+        assert manager._callback_to_execute[dag1_req1.full_filepath] == [dag1_req1, dag1_sla1, dag1_req2]
+
 
 class TestDagFileProcessorAgent(unittest.TestCase):
     def setUp(self):


[airflow] 02/45: Don't rely on current ORM structure for db clean command (#23574)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 658f5abe60a98049be8ea904b9855615c8bb5d2f
Author: Daniel Standish <15...@users.noreply.github.com>
AuthorDate: Fri Jun 17 12:46:16 2022 -0700

    Don't rely on current ORM structure for db clean command (#23574)
    
    For command DB clean, by not relying on the ORM models, we will be able to use the command even when the metadatabase is not yet upgraded to the version of Airflow you have installed.
    
    Additionally we archive all rows before deletion.
    
    (cherry picked from commit 95bd6b71cc9f5da377e272707f7b68000d980939)
---
 airflow/cli/cli_parser.py             |   6 +
 airflow/cli/commands/db_command.py    |   1 +
 airflow/utils/db.py                   |  17 ++-
 airflow/utils/db_cleanup.py           | 270 ++++++++++++++++++++--------------
 docs/apache-airflow/usage-cli.rst     |   2 +
 newsfragments/23574.feature.rst       |   1 +
 tests/cli/commands/test_db_command.py |  34 ++++-
 tests/test_utils/db.py                |  10 +-
 tests/utils/test_db_cleanup.py        |  44 +++++-
 9 files changed, 257 insertions(+), 128 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 0789b4ee88..d494aa0f06 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -434,6 +434,11 @@ ARG_DB_DRY_RUN = Arg(
     help="Perform a dry run",
     action="store_true",
 )
+ARG_DB_SKIP_ARCHIVE = Arg(
+    ("--skip-archive",),
+    help="Don't preserve purged records in an archive table.",
+    action="store_true",
+)
 
 
 # pool
@@ -1454,6 +1459,7 @@ DB_COMMANDS = (
             ARG_DB_CLEANUP_TIMESTAMP,
             ARG_VERBOSE,
             ARG_YES,
+            ARG_DB_SKIP_ARCHIVE,
         ),
     ),
 )
diff --git a/airflow/cli/commands/db_command.py b/airflow/cli/commands/db_command.py
index c9201ad59b..5f6a84c8a4 100644
--- a/airflow/cli/commands/db_command.py
+++ b/airflow/cli/commands/db_command.py
@@ -198,4 +198,5 @@ def cleanup_tables(args):
         clean_before_timestamp=args.clean_before_timestamp,
         verbose=args.verbose,
         confirm=not args.yes,
+        skip_archive=args.skip_archive,
     )
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 7bdd33fb93..a86222e3b8 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -870,7 +870,7 @@ def check_conn_id_duplicates(session: Session) -> Iterable[str]:
         )
 
 
-def reflect_tables(tables: List[Union[Base, str]], session):
+def reflect_tables(tables: Optional[List[Union[Base, str]]], session):
     """
     When running checks prior to upgrades, we use reflection to determine current state of the
     database.
@@ -881,12 +881,15 @@ def reflect_tables(tables: List[Union[Base, str]], session):
 
     metadata = sqlalchemy.schema.MetaData(session.bind)
 
-    for tbl in tables:
-        try:
-            table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
-            metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
-        except exc.InvalidRequestError:
-            continue
+    if tables is None:
+        metadata.reflect(resolve_fks=False)
+    else:
+        for tbl in tables:
+            try:
+                table_name = tbl if isinstance(tbl, str) else tbl.__tablename__
+                metadata.reflect(only=[table_name], extend_existing=True, resolve_fks=False)
+            except exc.InvalidRequestError:
+                continue
     return metadata
 
 
diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py
index b02d08503f..f77ae52a60 100644
--- a/airflow/utils/db_cleanup.py
+++ b/airflow/utils/db_cleanup.py
@@ -21,38 +21,24 @@ This module took inspiration from the community maintenance dag
 """
 
 import logging
-from contextlib import AbstractContextManager
+from contextlib import contextmanager
 from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional
 
 from pendulum import DateTime
-from sqlalchemy import and_, false, func
+from sqlalchemy import and_, column, false, func, table, text
 from sqlalchemy.exc import OperationalError, ProgrammingError
+from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.orm import Query, Session, aliased
+from sqlalchemy.sql.expression import ClauseElement, Executable, tuple_
 
 from airflow.cli.simple_table import AirflowConsole
-from airflow.jobs.base_job import BaseJob
-from airflow.models import (
-    Base,
-    DagModel,
-    DagRun,
-    DbCallbackRequest,
-    ImportError as models_ImportError,
-    Log,
-    RenderedTaskInstanceFields,
-    SensorInstance,
-    SlaMiss,
-    TaskFail,
-    TaskInstance,
-    TaskReschedule,
-    XCom,
-)
+from airflow.models import Base
 from airflow.utils import timezone
+from airflow.utils.db import reflect_tables
 from airflow.utils.session import NEW_SESSION, provide_session
 
-if TYPE_CHECKING:
-    from sqlalchemy.orm import Query, Session
-    from sqlalchemy.orm.attributes import InstrumentedAttribute
-    from sqlalchemy.sql.schema import Column
+logger = logging.getLogger(__file__)
 
 
 @dataclass
@@ -60,115 +46,155 @@ class _TableConfig:
     """
     Config class for performing cleanup on a table
 
-    :param orm_model: the table
-    :param recency_column: date column to filter by
+    :param table_name: the table
+    :param extra_columns: any columns besides recency_column_name that we'll need in queries
+    :param recency_column_name: date column to filter by
     :param keep_last: whether the last record should be kept even if it's older than clean_before_timestamp
     :param keep_last_filters: the "keep last" functionality will preserve the most recent record
         in the table.  to ignore certain records even if they are the latest in the table, you can
         supply additional filters here (e.g. externally triggered dag runs)
     :param keep_last_group_by: if keeping the last record, can keep the last record for each group
-    :param warn_if_missing: If True, then we'll suppress "table missing" exception and log a warning.
-        If False then the exception will go uncaught.
     """
 
-    orm_model: Base
-    recency_column: Union["Column", "InstrumentedAttribute"]
+    table_name: str
+    recency_column_name: str
+    extra_columns: Optional[List[str]] = None
     keep_last: bool = False
     keep_last_filters: Optional[Any] = None
     keep_last_group_by: Optional[Any] = None
-    warn_if_missing: bool = False
+
+    def __post_init__(self):
+        self.recency_column = column(self.recency_column_name)
+        self.orm_model: Base = table(
+            self.table_name, *[column(x) for x in self.extra_columns or []], self.recency_column
+        )
 
     def __lt__(self, other):
-        return self.orm_model.__tablename__ < other.orm_model.__tablename__
+        return self.table_name < other.table_name
 
     @property
     def readable_config(self):
         return dict(
-            table=self.orm_model.__tablename__,
+            table=self.orm_model.name,
             recency_column=str(self.recency_column),
             keep_last=self.keep_last,
             keep_last_filters=[str(x) for x in self.keep_last_filters] if self.keep_last_filters else None,
             keep_last_group_by=str(self.keep_last_group_by),
-            warn_if_missing=str(self.warn_if_missing),
         )
 
 
 config_list: List[_TableConfig] = [
-    _TableConfig(orm_model=BaseJob, recency_column=BaseJob.latest_heartbeat),
-    _TableConfig(orm_model=DagModel, recency_column=DagModel.last_parsed_time),
+    _TableConfig(table_name='job', recency_column_name='latest_heartbeat'),
+    _TableConfig(table_name='dag', recency_column_name='last_parsed_time'),
     _TableConfig(
-        orm_model=DagRun,
-        recency_column=DagRun.start_date,
+        table_name='dag_run',
+        recency_column_name='start_date',
+        extra_columns=['dag_id', 'external_trigger'],
         keep_last=True,
-        keep_last_filters=[DagRun.external_trigger == false()],
-        keep_last_group_by=DagRun.dag_id,
-    ),
-    _TableConfig(orm_model=models_ImportError, recency_column=models_ImportError.timestamp),
-    _TableConfig(orm_model=Log, recency_column=Log.dttm),
-    _TableConfig(
-        orm_model=RenderedTaskInstanceFields, recency_column=RenderedTaskInstanceFields.execution_date
+        keep_last_filters=[column('external_trigger') == false()],
+        keep_last_group_by=['dag_id'],
     ),
-    _TableConfig(
-        orm_model=SensorInstance, recency_column=SensorInstance.updated_at
-    ),  # TODO: add FK to task instance / dag so we can remove here
-    _TableConfig(orm_model=SlaMiss, recency_column=SlaMiss.timestamp),
-    _TableConfig(orm_model=TaskFail, recency_column=TaskFail.start_date),
-    _TableConfig(orm_model=TaskInstance, recency_column=TaskInstance.start_date),
-    _TableConfig(orm_model=TaskReschedule, recency_column=TaskReschedule.start_date),
-    _TableConfig(orm_model=XCom, recency_column=XCom.timestamp),
-    _TableConfig(orm_model=DbCallbackRequest, recency_column=XCom.timestamp),
+    _TableConfig(table_name='import_error', recency_column_name='timestamp'),
+    _TableConfig(table_name='log', recency_column_name='dttm'),
+    _TableConfig(table_name='rendered_task_instance_fields', recency_column_name='execution_date'),
+    _TableConfig(table_name='sensor_instance', recency_column_name='updated_at'),
+    _TableConfig(table_name='sla_miss', recency_column_name='timestamp'),
+    _TableConfig(table_name='task_fail', recency_column_name='start_date'),
+    _TableConfig(table_name='task_instance', recency_column_name='start_date'),
+    _TableConfig(table_name='task_reschedule', recency_column_name='start_date'),
+    _TableConfig(table_name='xcom', recency_column_name='timestamp'),
+    _TableConfig(table_name='callback_request', recency_column_name='created_at'),
+    _TableConfig(table_name='celery_taskmeta', recency_column_name='date_done'),
+    _TableConfig(table_name='celery_tasksetmeta', recency_column_name='date_done'),
 ]
-try:
-    from celery.backends.database.models import Task, TaskSet
-
-    config_list.extend(
-        [
-            _TableConfig(orm_model=Task, recency_column=Task.date_done, warn_if_missing=True),
-            _TableConfig(orm_model=TaskSet, recency_column=TaskSet.date_done, warn_if_missing=True),
-        ]
-    )
-except ImportError:
-    pass
 
-config_dict: Dict[str, _TableConfig] = {x.orm_model.__tablename__: x for x in sorted(config_list)}
+config_dict: Dict[str, _TableConfig] = {x.orm_model.name: x for x in sorted(config_list)}
 
 
-def _print_entities(*, query: "Query", print_rows=False):
+def _check_for_rows(*, query: "Query", print_rows=False):
     num_entities = query.count()
     print(f"Found {num_entities} rows meeting deletion criteria.")
-    if not print_rows:
-        return
-    max_rows_to_print = 100
-    if num_entities > 0:
-        print(f"Printing first {max_rows_to_print} rows.")
-    logger.debug("print entities query: %s", query)
-    for entry in query.limit(max_rows_to_print):
-        print(entry.__dict__)
+    if print_rows:
+        max_rows_to_print = 100
+        if num_entities > 0:
+            print(f"Printing first {max_rows_to_print} rows.")
+        logger.debug("print entities query: %s", query)
+        for entry in query.limit(max_rows_to_print):
+            print(entry.__dict__)
+    return num_entities
+
 
+def _do_delete(*, query, orm_model, skip_archive, session):
+    import re
+    from datetime import datetime
 
-def _do_delete(*, query, session):
     print("Performing Delete...")
     # using bulk delete
-    query.delete(synchronize_session=False)
+    # create a new table and copy the rows there
+    timestamp_str = re.sub(r'[^\d]', '', datetime.utcnow().isoformat())[:14]
+    target_table_name = f'_airflow_deleted__{orm_model.name}__{timestamp_str}'
+    print(f"Moving data to table {target_table_name}")
+    stmt = CreateTableAs(target_table_name, query.selectable)
+    logger.debug("ctas query:\n%s", stmt.compile())
+    session.execute(stmt)
+    session.commit()
+
+    # delete the rows from the old table
+    metadata = reflect_tables([orm_model.name, target_table_name], session)
+    source_table = metadata.tables[orm_model.name]
+    target_table = metadata.tables[target_table_name]
+    logger.debug("rows moved; purging from %s", source_table.name)
+    bind = session.get_bind()
+    dialect_name = bind.dialect.name
+    if dialect_name == 'sqlite':
+        pk_cols = source_table.primary_key.columns
+        delete = source_table.delete().where(
+            tuple_(*pk_cols).in_(
+                session.query(*[target_table.c[x.name] for x in source_table.primary_key.columns]).subquery()
+            )
+        )
+    else:
+        delete = source_table.delete().where(
+            and_(col == target_table.c[col.name] for col in source_table.primary_key.columns)
+        )
+    logger.debug("delete statement:\n%s", delete.compile())
+    session.execute(delete)
+    session.commit()
+    if skip_archive:
+        target_table.drop()
     session.commit()
     print("Finished Performing Delete")
 
 
-def _subquery_keep_last(*, recency_column, keep_last_filters, keep_last_group_by, session):
-    subquery = session.query(func.max(recency_column))
+def _subquery_keep_last(*, recency_column, keep_last_filters, group_by_columns, max_date_colname, session):
+    subquery = session.query(*group_by_columns, func.max(recency_column).label(max_date_colname))
 
     if keep_last_filters is not None:
         for entry in keep_last_filters:
             subquery = subquery.filter(entry)
 
-    if keep_last_group_by is not None:
-        subquery = subquery.group_by(keep_last_group_by)
+    if group_by_columns is not None:
+        subquery = subquery.group_by(*group_by_columns)
+
+    return subquery.subquery(name='latest')
 
-    # We nest this subquery to work around a MySQL "table specified twice" issue
-    # See https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
-    # and https://github.com/teamclairvoyant/airflow-maintenance-dags/pull/57/files.
-    subquery = subquery.from_self()
-    return subquery
+
+class CreateTableAs(Executable, ClauseElement):
+    """Custom sqlalchemy clause element for CTAS operations."""
+
+    def __init__(self, name, query):
+        self.name = name
+        self.query = query
+
+
+@compiles(CreateTableAs)
+def _compile_create_table_as__other(element, compiler, **kw):
+    return f"CREATE TABLE {element.name} AS {compiler.process(element.query)}"
+
+
+@compiles(CreateTableAs, 'mssql')
+def _compile_create_table_as__mssql(element, compiler, **kw):
+    return f"WITH cte AS ( {compiler.process(element.query)} ) SELECT * INTO {element.name} FROM cte"
 
 
 def _build_query(
@@ -182,23 +208,33 @@ def _build_query(
     session,
     **kwargs,
 ):
-    query = session.query(orm_model)
-    conditions = [recency_column < clean_before_timestamp]
+    base_table_alias = 'base'
+    base_table = aliased(orm_model, name=base_table_alias)
+    query = session.query(base_table).with_entities(text(f"{base_table_alias}.*"))
+    base_table_recency_col = base_table.c[recency_column.name]
+    conditions = [base_table_recency_col < clean_before_timestamp]
     if keep_last:
+        max_date_col_name = 'max_date_per_group'
+        group_by_columns = [column(x) for x in keep_last_group_by]
         subquery = _subquery_keep_last(
             recency_column=recency_column,
             keep_last_filters=keep_last_filters,
-            keep_last_group_by=keep_last_group_by,
+            group_by_columns=group_by_columns,
+            max_date_colname=max_date_col_name,
             session=session,
         )
-        conditions.append(recency_column.notin_(subquery))
+        query = query.select_from(base_table).outerjoin(
+            subquery,
+            and_(
+                *[base_table.c[x] == subquery.c[x] for x in keep_last_group_by],
+                base_table_recency_col == column(max_date_col_name),
+            ),
+        )
+        conditions.append(column(max_date_col_name).is_(None))
     query = query.filter(and_(*conditions))
     return query
 
 
-logger = logging.getLogger(__file__)
-
-
 def _cleanup_table(
     *,
     orm_model,
@@ -209,12 +245,13 @@ def _cleanup_table(
     clean_before_timestamp,
     dry_run=True,
     verbose=False,
+    skip_archive=False,
     session=None,
     **kwargs,
 ):
     print()
     if dry_run:
-        print(f"Performing dry run for table {orm_model.__tablename__!r}")
+        print(f"Performing dry run for table {orm_model.name}")
     query = _build_query(
         orm_model=orm_model,
         recency_column=recency_column,
@@ -224,12 +261,14 @@ def _cleanup_table(
         clean_before_timestamp=clean_before_timestamp,
         session=session,
     )
+    logger.debug("old rows query:\n%s", query.selectable.compile())
+    print(f"Checking table {orm_model.name}")
+    num_rows = _check_for_rows(query=query, print_rows=False)
 
-    _print_entities(query=query, print_rows=False)
+    if num_rows and not dry_run:
+        _do_delete(query=query, orm_model=orm_model, skip_archive=skip_archive, session=session)
 
-    if not dry_run:
-        _do_delete(query=query, session=session)
-        session.commit()
+    session.commit()
 
 
 def _confirm_delete(*, date: DateTime, tables: List[str]):
@@ -251,19 +290,20 @@ def _print_config(*, configs: Dict[str, _TableConfig]):
     AirflowConsole().print_as_table(data=data)
 
 
-class _warn_if_missing(AbstractContextManager):
-    def __init__(self, table, suppress):
-        self.table = table
-        self.suppress = suppress
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exctype, excinst, exctb):
-        caught_error = exctype is not None and issubclass(exctype, (OperationalError, ProgrammingError))
-        if caught_error:
-            logger.warning("Table %r not found.  Skipping.", self.table)
-        return caught_error
+@contextmanager
+def _suppress_with_logging(table, session):
+    """
+    Suppresses errors but logs them.
+    Also stores the exception instance so it can be referred to after exiting context.
+    """
+    try:
+        yield
+    except (OperationalError, ProgrammingError):
+        logger.warning("Encountered error when attempting to clean table '%s'. ", table)
+        logger.debug("Traceback for table '%s'", table, exc_info=True)
+        if session.is_active:
+            logger.debug('Rolling back transaction')
+            session.rollback()
 
 
 @provide_session
@@ -274,6 +314,7 @@ def run_cleanup(
     dry_run: bool = False,
     verbose: bool = False,
     confirm: bool = True,
+    skip_archive: bool = False,
     session: 'Session' = NEW_SESSION,
 ):
     """
@@ -292,6 +333,7 @@ def run_cleanup(
     :param dry_run: If true, print rows meeting deletion criteria
     :param verbose: If true, may provide more detailed output.
     :param confirm: Require user input to confirm before processing deletions.
+    :param skip_archive: Set to True if you don't want the purged rows preservied in an archive table.
     :param session: Session representing connection to the metadata database.
     """
     clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
@@ -306,12 +348,18 @@ def run_cleanup(
         _print_config(configs=effective_config_dict)
     if not dry_run and confirm:
         _confirm_delete(date=clean_before_timestamp, tables=list(effective_config_dict.keys()))
+    existing_tables = reflect_tables(tables=None, session=session).tables
     for table_name, table_config in effective_config_dict.items():
-        with _warn_if_missing(table_name, table_config.warn_if_missing):
+        if table_name not in existing_tables:
+            logger.warning("Table %s not found.  Skipping.", table_name)
+            continue
+        with _suppress_with_logging(table_name, session):
             _cleanup_table(
                 clean_before_timestamp=clean_before_timestamp,
                 dry_run=dry_run,
                 verbose=verbose,
                 **table_config.__dict__,
+                skip_archive=skip_archive,
                 session=session,
             )
+            session.commit()
diff --git a/docs/apache-airflow/usage-cli.rst b/docs/apache-airflow/usage-cli.rst
index 0e7b1b5455..c14efacb1d 100644
--- a/docs/apache-airflow/usage-cli.rst
+++ b/docs/apache-airflow/usage-cli.rst
@@ -215,6 +215,8 @@ You can optionally provide a list of tables to perform deletes on. If no list of
 
 You can use the ``--dry-run`` option to print the row counts in the primary tables to be cleaned.
 
+By default, ``db clean`` will archive purged rows in tables of the form ``_airflow_deleted__<table>__<timestamp>``.  If you don't want the data preserved in this way, you may supply argument ``--skip-archive``.
+
 Beware cascading deletes
 ^^^^^^^^^^^^^^^^^^^^^^^^
 
diff --git a/newsfragments/23574.feature.rst b/newsfragments/23574.feature.rst
new file mode 100644
index 0000000000..805b7b18bd
--- /dev/null
+++ b/newsfragments/23574.feature.rst
@@ -0,0 +1 @@
+Command ``airflow db clean`` now archives data before purging.
diff --git a/tests/cli/commands/test_db_command.py b/tests/cli/commands/test_db_command.py
index 125e5d7c3e..e6e93f6a1c 100644
--- a/tests/cli/commands/test_db_command.py
+++ b/tests/cli/commands/test_db_command.py
@@ -293,6 +293,7 @@ class TestCLIDBClean:
             clean_before_timestamp=pendulum.parse(timestamp, tz=timezone),
             verbose=False,
             confirm=False,
+            skip_archive=False,
         )
 
     @pytest.mark.parametrize('timezone', ['UTC', 'Europe/Berlin', 'America/Los_Angeles'])
@@ -312,13 +313,14 @@ class TestCLIDBClean:
             clean_before_timestamp=pendulum.parse(timestamp),
             verbose=False,
             confirm=False,
+            skip_archive=False,
         )
 
     @pytest.mark.parametrize('confirm_arg, expected', [(['-y'], False), ([], True)])
     @patch('airflow.cli.commands.db_command.run_cleanup')
     def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
         """
-        When tz included in the string then default timezone should not be used.
+        When ``-y`` provided, ``confirm`` should be false.
         """
         args = self.parser.parse_args(
             [
@@ -337,6 +339,33 @@ class TestCLIDBClean:
             clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
             verbose=False,
             confirm=expected,
+            skip_archive=False,
+        )
+
+    @pytest.mark.parametrize('extra_arg, expected', [(['--skip-archive'], True), ([], False)])
+    @patch('airflow.cli.commands.db_command.run_cleanup')
+    def test_skip_archive(self, run_cleanup_mock, extra_arg, expected):
+        """
+        When ``--skip-archive`` provided, ``skip_archive`` should be True (False otherwise).
+        """
+        args = self.parser.parse_args(
+            [
+                'db',
+                'clean',
+                '--clean-before-timestamp',
+                '2021-01-01',
+                *extra_arg,
+            ]
+        )
+        db_command.cleanup_tables(args)
+
+        run_cleanup_mock.assert_called_once_with(
+            table_names=None,
+            dry_run=False,
+            clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
+            verbose=False,
+            confirm=True,
+            skip_archive=expected,
         )
 
     @pytest.mark.parametrize('dry_run_arg, expected', [(['--dry-run'], True), ([], False)])
@@ -362,6 +391,7 @@ class TestCLIDBClean:
             clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
             verbose=False,
             confirm=True,
+            skip_archive=False,
         )
 
     @pytest.mark.parametrize(
@@ -389,6 +419,7 @@ class TestCLIDBClean:
             clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
             verbose=False,
             confirm=True,
+            skip_archive=False,
         )
 
     @pytest.mark.parametrize('extra_args, expected', [(['--verbose'], True), ([], False)])
@@ -414,4 +445,5 @@ class TestCLIDBClean:
             clean_before_timestamp=pendulum.parse('2021-01-01 00:00:00Z'),
             verbose=expected,
             confirm=True,
+            skip_archive=False,
         )
diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py
index b7502fc52b..ae4a1d6598 100644
--- a/tests/test_utils/db.py
+++ b/tests/test_utils/db.py
@@ -38,7 +38,7 @@ from airflow.models import (
 from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.security.permissions import RESOURCE_DAG_PREFIX
-from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections
+from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections, reflect_tables
 from airflow.utils.session import create_session
 from airflow.www.fab_security.sqla.models import Permission, Resource, assoc_permission_role
 
@@ -57,6 +57,14 @@ def clear_db_dags():
         session.query(DagModel).delete()
 
 
+def drop_tables_with_prefix(prefix):
+    with create_session() as session:
+        metadata = reflect_tables(None, session)
+        for table_name, table in metadata.tables.items():
+            if table_name.startswith(prefix):
+                table.drop()
+
+
 def clear_db_serialized_dags():
     with create_session() as session:
         session.query(SerializedDagModel).delete()
diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py
index 8d227df6e5..e335cdb251 100644
--- a/tests/utils/test_db_cleanup.py
+++ b/tests/utils/test_db_cleanup.py
@@ -30,7 +30,7 @@ from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.operators.python import PythonOperator
 from airflow.utils.db_cleanup import _build_query, _cleanup_table, config_dict, run_cleanup
 from airflow.utils.session import create_session
-from tests.test_utils.db import clear_db_dags, clear_db_runs
+from tests.test_utils.db import clear_db_dags, clear_db_runs, drop_tables_with_prefix
 
 
 @pytest.fixture(autouse=True)
@@ -44,6 +44,10 @@ def clean_database():
 
 
 class TestDBCleanup:
+    @pytest.fixture(autouse=True)
+    def clear_airflow_tables(self):
+        drop_tables_with_prefix('_airflow_')
+
     @pytest.mark.parametrize(
         'kwargs, called',
         [
@@ -68,6 +72,27 @@ class TestDBCleanup:
         else:
             confirm_delete_mock.assert_not_called()
 
+    @pytest.mark.parametrize(
+        'kwargs, should_skip',
+        [
+            param(dict(skip_archive=True), True, id='true'),
+            param(dict(), False, id='not supplied'),
+            param(dict(skip_archive=False), False, id='false'),
+        ],
+    )
+    @patch('airflow.utils.db_cleanup._cleanup_table')
+    def test_run_cleanup_skip_archive(self, cleanup_table_mock, kwargs, should_skip):
+        """test that delete confirmation input is called when appropriate"""
+        run_cleanup(
+            clean_before_timestamp=None,
+            table_names=['log'],
+            dry_run=None,
+            verbose=None,
+            confirm=False,
+            **kwargs,
+        )
+        assert cleanup_table_mock.call_args[1]['skip_archive'] is should_skip
+
     @pytest.mark.parametrize(
         'table_names',
         [
@@ -95,12 +120,14 @@ class TestDBCleanup:
         [None, True, False],
     )
     @patch('airflow.utils.db_cleanup._build_query', MagicMock())
-    @patch('airflow.utils.db_cleanup._print_entities', MagicMock())
-    @patch('airflow.utils.db_cleanup._do_delete')
     @patch('airflow.utils.db_cleanup._confirm_delete', MagicMock())
-    def test_run_cleanup_dry_run(self, do_delete, dry_run):
+    @patch('airflow.utils.db_cleanup._check_for_rows')
+    @patch('airflow.utils.db_cleanup._do_delete')
+    def test_run_cleanup_dry_run(self, do_delete, check_rows_mock, dry_run):
         """Delete should only be called when not dry_run"""
+        check_rows_mock.return_value = 10
         base_kwargs = dict(
+            table_names=['log'],
             clean_before_timestamp=None,
             dry_run=dry_run,
             verbose=None,
@@ -135,7 +162,7 @@ class TestDBCleanup:
         dag run is kept.
 
         """
-        base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('America/Los_Angeles'))
+        base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('UTC'))
         create_tis(
             base_date=base_date,
             num_tis=10,
@@ -175,7 +202,7 @@ class TestDBCleanup:
         associated dag runs should remain.
 
         """
-        base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('America/Los_Angeles'))
+        base_date = pendulum.DateTime(2022, 1, 1, tzinfo=pendulum.timezone('UTC'))
         num_tis = 10
         create_tis(
             base_date=base_date,
@@ -189,13 +216,14 @@ class TestDBCleanup:
                 clean_before_timestamp=clean_before_date,
                 dry_run=False,
                 session=session,
+                table_names=['dag_run', 'task_instance'],
             )
             model = config_dict[table_name].orm_model
             expected_remaining = num_tis - expected_to_delete
             assert len(session.query(model).all()) == expected_remaining
-            if model == TaskInstance:
+            if model.name == 'task_instance':
                 assert len(session.query(DagRun).all()) == num_tis
-            elif model == DagRun:
+            elif model.name == 'dag_run':
                 assert len(session.query(TaskInstance).all()) == expected_remaining
             else:
                 raise Exception("unexpected")


[airflow] 25/45: Only load distribution of a name once (#25296)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6567b3248fa12cfa9f04c0eae14277fcfb8a60ec
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Wed Aug 3 21:01:43 2022 +0800

    Only load distribution of a name once (#25296)
    
    (cherry picked from commit c30dc5e64d7229cbf8e9fbe84cfa790dfef5fb8c)
---
 airflow/plugins_manager.py            |  2 +-
 airflow/utils/entry_points.py         | 22 +++++++++++-----
 tests/plugins/test_plugins_manager.py |  3 ++-
 tests/utils/test_entry_points.py      | 49 +++++++++++++++++++++++++++++++++++
 tests/www/views/test_views.py         |  2 +-
 5 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 82e295fa19..431d5fe55a 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -113,7 +113,7 @@ class EntryPointSource(AirflowPluginSource):
     """Class used to define Plugins loaded from entrypoint."""
 
     def __init__(self, entrypoint: importlib_metadata.EntryPoint, dist: importlib_metadata.Distribution):
-        self.dist = dist.metadata['name']
+        self.dist = dist.metadata['Name']
         self.version = dist.version
         self.entrypoint = str(entrypoint)
 
diff --git a/airflow/utils/entry_points.py b/airflow/utils/entry_points.py
index 668ed9b994..483f9efe77 100644
--- a/airflow/utils/entry_points.py
+++ b/airflow/utils/entry_points.py
@@ -15,15 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from __future__ import annotations
+
+from typing import Iterator
+
+from packaging.utils import canonicalize_name
+
 try:
-    import importlib_metadata
+    import importlib_metadata as metadata
 except ImportError:
-    from importlib import metadata as importlib_metadata  # type: ignore
+    from importlib import metadata  # type: ignore[no-redef]
 
 
-def entry_points_with_dist(group: str):
-    """
-    Return EntryPoint objects of the given group, along with the distribution information.
+def entry_points_with_dist(group: str) -> Iterator[tuple[metadata.EntryPoint, metadata.Distribution]]:
+    """Retrieve entry points of the given group.
 
     This is like the ``entry_points()`` function from importlib.metadata,
     except it also returns the distribution the entry_point was loaded from.
@@ -31,7 +36,12 @@ def entry_points_with_dist(group: str):
     :param group: Filter results to only this entrypoint group
     :return: Generator of (EntryPoint, Distribution) objects for the specified groups
     """
-    for dist in importlib_metadata.distributions():
+    loaded: set[str] = set()
+    for dist in metadata.distributions():
+        key = canonicalize_name(dist.metadata["Name"])
+        if key in loaded:
+            continue
+        loaded.add(key)
         for e in dist.entry_points:
             if e.group != group:
                 continue
diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py
index f97a811c91..c46b6e83f2 100644
--- a/tests/plugins/test_plugins_manager.py
+++ b/tests/plugins/test_plugins_manager.py
@@ -298,6 +298,7 @@ class TestPluginsManager:
         from airflow.plugins_manager import import_errors, load_entrypoint_plugins
 
         mock_dist = mock.Mock()
+        mock_dist.metadata = {"Name": "test-dist"}
 
         mock_entrypoint = mock.Mock()
         mock_entrypoint.name = 'test-entrypoint'
@@ -387,7 +388,7 @@ class TestEntryPointSource:
         mock_entrypoint.module = 'module_name_plugin'
 
         mock_dist = mock.Mock()
-        mock_dist.metadata = {'name': 'test-entrypoint-plugin'}
+        mock_dist.metadata = {'Name': 'test-entrypoint-plugin'}
         mock_dist.version = '1.0.0'
         mock_dist.entry_points = [mock_entrypoint]
 
diff --git a/tests/utils/test_entry_points.py b/tests/utils/test_entry_points.py
new file mode 100644
index 0000000000..65f688647e
--- /dev/null
+++ b/tests/utils/test_entry_points.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from typing import Iterable
+from unittest import mock
+
+from airflow.utils.entry_points import entry_points_with_dist, metadata
+
+
+class MockDistribution:
+    def __init__(self, name: str, entry_points: Iterable[metadata.EntryPoint]) -> None:
+        self.metadata = {"Name": name}
+        self.entry_points = entry_points
+
+
+class MockMetadata:
+    def distributions(self):
+        return [
+            MockDistribution(
+                "dist1",
+                [metadata.EntryPoint("a", "b", "group_x"), metadata.EntryPoint("c", "d", "group_y")],
+            ),
+            MockDistribution("Dist2", [metadata.EntryPoint("e", "f", "group_x")]),
+            MockDistribution("dist2", [metadata.EntryPoint("g", "h", "group_x")]),  # Duplicated name.
+        ]
+
+
+@mock.patch("airflow.utils.entry_points.metadata", MockMetadata())
+def test_entry_points_with_dist():
+    entries = list(entry_points_with_dist("group_x"))
+
+    # The second "dist2" is ignored. Only "group_x" entries are loaded.
+    assert [dist.metadata["Name"] for _, dist in entries] == ["dist1", "Dist2"]
+    assert [ep.name for ep, _ in entries] == ["a", "e"]
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index b034ed0b37..9f13b366d2 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -88,7 +88,7 @@ def test_plugin_should_list_entrypoint_on_page_with_details(admin_client):
     mock_plugin = AirflowPlugin()
     mock_plugin.name = "test_plugin"
     mock_plugin.source = EntryPointSource(
-        mock.Mock(), mock.Mock(version='1.0.0', metadata={'name': 'test-entrypoint-testpluginview'})
+        mock.Mock(), mock.Mock(version='1.0.0', metadata={'Name': 'test-entrypoint-testpluginview'})
     )
     with mock_plugin_manager(plugins=[mock_plugin]):
         resp = admin_client.get('/plugin')


[airflow] 21/45: Added logging to get better diagnosis of flaky backfill test (#25106)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 637af6a06d95a617ef2e1eb58e95e92af7f1d2d5
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Sat Jul 16 17:15:48 2022 +0200

    Added logging to get better diagnosis of flaky backfill test (#25106)
    
    (cherry picked from commit 7850dc3c714ef188014a08aae63b4d344242208e)
---
 airflow/models/dagrun.py        | 5 ++++-
 tests/jobs/test_backfill_job.py | 5 ++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index ad0dcdfebd..8589f6878e 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -208,11 +208,14 @@ class DagRun(Base, LoggingMixin):
 
     def __repr__(self):
         return (
-            '<DagRun {dag_id} @ {execution_date}: {run_id}, externally triggered: {external_trigger}>'
+            '<DagRun {dag_id} @ {execution_date}: {run_id}, state:{state}, '
+            'queued_at: {queued_at}. externally triggered: {external_trigger}>'
         ).format(
             dag_id=self.dag_id,
             execution_date=self.execution_date,
             run_id=self.run_id,
+            state=self.state,
+            queued_at=self.queued_at,
             external_trigger=self.external_trigger,
         )
 
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 5222f9cdbe..39c215991f 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -987,13 +987,15 @@ class TestBackfillJob:
                         dag_id=dag_id,
                     )
                     dag_maker.create_dagrun(
-                        state=None,
+                        state=State.RUNNING,
                         # Existing dagrun that is not within the backfill range
                         run_id=run_id,
                         execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
                     )
                     thread_session.commit()
                     cond.notify()
+                except Exception:
+                    logger.exception("Exception when creating DagRun")
                 finally:
                     cond.release()
                     thread_session.close()
@@ -1016,6 +1018,7 @@ class TestBackfillJob:
                 # reached, so it is waiting
                 dag_run_created_cond.wait(timeout=1.5)
                 dagruns = DagRun.find(dag_id=dag_id)
+                logger.info("The dag runs retrieved: %s", dagruns)
                 assert 1 == len(dagruns)
                 dr = dagruns[0]
                 assert dr.run_id == run_id


[airflow] 44/45: Cache the custom secrets backend so the same instance gets re-used (#25556)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ed9bf28d49f22253e3d3801c0a54775ad00ef44a
Author: Peter Debelak <pd...@gmail.com>
AuthorDate: Sat Aug 6 09:21:22 2022 -0500

    Cache the custom secrets backend so the same instance gets re-used (#25556)
    
    * Cache the custom secrets backend so the same instance gets re-used
    
    Fixes #25555
    
    This uses `functools.lru_cache` to re-use the same secrets backend
    instance between the `conf` global when it loads configuration from
    secrets and uses outside the `configuration` module like variables and
    connections. Previously, each fetch of a configuration value from
    secrets would use its own secrets backend instance.
    
    Also add unit test to confirm that only one secrets backend instance
    gets created.
    
    (cherry picked from commit 5863c42962404607013422a40118d8b9f4603f0b)
---
 airflow/configuration.py         |  9 ++++-
 tests/core/test_configuration.py | 81 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index 20dd3c13d9..546258bf7f 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -1513,7 +1513,6 @@ def ensure_secrets_loaded() -> List[BaseSecretsBackend]:
 def get_custom_secret_backend() -> Optional[BaseSecretsBackend]:
     """Get Secret Backend if defined in airflow.cfg"""
     secrets_backend_cls = conf.getimport(section='secrets', key='backend')
-
     if secrets_backend_cls:
         try:
             backends: Any = conf.get(section='secrets', key='backend_kwargs', fallback='{}')
@@ -1521,10 +1520,16 @@ def get_custom_secret_backend() -> Optional[BaseSecretsBackend]:
         except JSONDecodeError:
             alternative_secrets_config_dict = {}
 
-        return secrets_backend_cls(**alternative_secrets_config_dict)
+        return _custom_secrets_backend(secrets_backend_cls, **alternative_secrets_config_dict)
     return None
 
 
+@functools.lru_cache(maxsize=2)
+def _custom_secrets_backend(secrets_backend_cls, **alternative_secrets_config_dict):
+    """Separate function to create secrets backend instance to allow caching"""
+    return secrets_backend_cls(**alternative_secrets_config_dict)
+
+
 def initialize_secrets_backends() -> List[BaseSecretsBackend]:
     """
     * import secrets backend classes
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index 991695a969..c84729e7f5 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -33,6 +33,7 @@ from airflow import configuration
 from airflow.configuration import (
     AirflowConfigException,
     AirflowConfigParser,
+    _custom_secrets_backend,
     conf,
     expand_env_var,
     get_airflow_config,
@@ -271,6 +272,7 @@ sql_alchemy_conn = airflow
     def test_config_raise_exception_from_secret_backend_connection_error(self, mock_hvac):
         """Get Config Value from a Secret Backend"""
 
+        _custom_secrets_backend.cache_clear()
         mock_client = mock.MagicMock()
         # mock_client.side_effect = AirflowConfigException
         mock_hvac.Client.return_value = mock_client
@@ -297,6 +299,7 @@ sql_alchemy_conn = airflow
             ),
         ):
             test_conf.get('test', 'sql_alchemy_conn')
+        _custom_secrets_backend.cache_clear()
 
     def test_getboolean(self):
         """Test AirflowConfigParser.getboolean"""
@@ -1261,3 +1264,81 @@ sql_alchemy_conn=sqlite://test
                 conf.read_dict(dictionary=cfg_dict)
                 os.environ.clear()
                 assert conf.get('database', 'sql_alchemy_conn') == f'sqlite:///{HOME_DIR}/airflow/airflow.db'
+
+    @mock.patch("airflow.providers.hashicorp._internal_client.vault_client.hvac")
+    @conf_vars(
+        {
+            ("secrets", "backend"): "airflow.providers.hashicorp.secrets.vault.VaultBackend",
+            ("secrets", "backend_kwargs"): '{"url": "http://127.0.0.1:8200", "token": "token"}',
+        }
+    )
+    def test_config_from_secret_backend_caches_instance(self, mock_hvac):
+        """Get Config Value from a Secret Backend"""
+        _custom_secrets_backend.cache_clear()
+
+        test_config = '''[test]
+sql_alchemy_conn_secret = sql_alchemy_conn
+secret_key_secret = secret_key
+'''
+        test_config_default = '''[test]
+sql_alchemy_conn = airflow
+secret_key = airflow
+'''
+
+        mock_client = mock.MagicMock()
+        mock_hvac.Client.return_value = mock_client
+
+        def fake_read_secret(path, mount_point, version):
+            if path.endswith('sql_alchemy_conn'):
+                return {
+                    'request_id': '2d48a2ad-6bcb-e5b6-429d-da35fdf31f56',
+                    'lease_id': '',
+                    'renewable': False,
+                    'lease_duration': 0,
+                    'data': {
+                        'data': {'value': 'fake_conn'},
+                        'metadata': {
+                            'created_time': '2020-03-28T02:10:54.301784Z',
+                            'deletion_time': '',
+                            'destroyed': False,
+                            'version': 1,
+                        },
+                    },
+                    'wrap_info': None,
+                    'warnings': None,
+                    'auth': None,
+                }
+            if path.endswith('secret_key'):
+                return {
+                    'request_id': '2d48a2ad-6bcb-e5b6-429d-da35fdf31f56',
+                    'lease_id': '',
+                    'renewable': False,
+                    'lease_duration': 0,
+                    'data': {
+                        'data': {'value': 'fake_key'},
+                        'metadata': {
+                            'created_time': '2020-03-28T02:10:54.301784Z',
+                            'deletion_time': '',
+                            'destroyed': False,
+                            'version': 1,
+                        },
+                    },
+                    'wrap_info': None,
+                    'warnings': None,
+                    'auth': None,
+                }
+
+        mock_client.secrets.kv.v2.read_secret_version.side_effect = fake_read_secret
+
+        test_conf = AirflowConfigParser(default_config=parameterized_config(test_config_default))
+        test_conf.read_string(test_config)
+        test_conf.sensitive_config_values = test_conf.sensitive_config_values | {
+            ('test', 'sql_alchemy_conn'),
+            ('test', 'secret_key'),
+        }
+
+        assert 'fake_conn' == test_conf.get('test', 'sql_alchemy_conn')
+        mock_hvac.Client.assert_called_once()
+        assert 'fake_key' == test_conf.get('test', 'secret_key')
+        mock_hvac.Client.assert_called_once()
+        _custom_secrets_backend.cache_clear()


[airflow] 12/45: Fix syntax in mysql setup documentation (#24893 (#24939)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 481877a564992b536b1728a40b726f4e83983191
Author: Nigel Millward <42...@users.noreply.github.com>
AuthorDate: Sat Jul 9 19:05:17 2022 +0100

    Fix syntax in mysql setup documentation (#24893 (#24939)
    
    In the 'set-up-databases' documentation the code block was not working on newer versions of mssql. Updated the docs so that the query works on supported 2017 and 2019 versions of mssql
    
    (cherry picked from commit 335bd60cb3b0cc16aa7c9a9ace83073ae7d9e82c)
---
 docs/apache-airflow/howto/set-up-database.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/howto/set-up-database.rst b/docs/apache-airflow/howto/set-up-database.rst
index cdf9f04ace..15a3d10c43 100644
--- a/docs/apache-airflow/howto/set-up-database.rst
+++ b/docs/apache-airflow/howto/set-up-database.rst
@@ -325,7 +325,7 @@ You can read more about transaction isolation and snapshot features at
    CREATE LOGIN airflow_user WITH PASSWORD='airflow_pass123%';
    USE airflow;
    CREATE USER airflow_user FROM LOGIN airflow_user;
-   GRANT ALL PRIVILEGES ON DATABASE airflow TO airflow_user;
+   GRANT ALL PRIVILEGES ON DATABASE::airflow TO airflow_user;
 
 
 We recommend using the ``mssql+pyodbc`` driver and specifying it in your SqlAlchemy connection string.


[airflow] 43/45: Add right padding (#25554)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 530a7ce8025ffee21532f8b8cc081ac5924a47a5
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Fri Aug 5 18:46:59 2022 +0200

    Add right padding (#25554)
    
    (cherry picked from commit fe9772949eba35c73101c3cd93a7c76b3e633e7e)
---
 airflow/www/static/js/grid/Grid.tsx | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/www/static/js/grid/Grid.tsx b/airflow/www/static/js/grid/Grid.tsx
index 6de3d0ecc2..539b47ee8c 100644
--- a/airflow/www/static/js/grid/Grid.tsx
+++ b/airflow/www/static/js/grid/Grid.tsx
@@ -124,6 +124,7 @@ const Grid = ({ isPanelOpen = false, onPanelToggle, hoveredTaskState }: Props) =
         ref={scrollRef}
         maxHeight="900px"
         position="relative"
+        pr={4}
       >
         <Table pr="10px">
           <Thead>


[airflow] 09/45: Bind log server on worker to IPv6 address (#24755) (#24846)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a720e4fb97baa3df0acc3bd2d2c445741db14abf
Author: Philipp Hitzler <ph...@gmail.com>
AuthorDate: Thu Jul 7 22:16:36 2022 +0200

    Bind log server on worker to IPv6 address (#24755) (#24846)
    
    (cherry picked from commit 2f29bfefb59b0014ae9e5f641d3f6f46c4341518)
---
 airflow/utils/serve_logs.py         | 22 +++++++++++++++-------
 newsfragments/24755.improvement.rst |  1 +
 2 files changed, 16 insertions(+), 7 deletions(-)

diff --git a/airflow/utils/serve_logs.py b/airflow/utils/serve_logs.py
index e14162178b..447cff6d90 100644
--- a/airflow/utils/serve_logs.py
+++ b/airflow/utils/serve_logs.py
@@ -16,6 +16,7 @@
 # under the License.
 
 """Serve logs process"""
+import collections
 import logging
 import os
 
@@ -108,6 +109,9 @@ def create_app():
     return flask_app
 
 
+GunicornOption = collections.namedtuple("GunicornOption", ["key", "value"])
+
+
 class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
     """
     Standalone Gunicorn application/serve for usage with any WSGI-application.
@@ -120,13 +124,13 @@ class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
     """
 
     def __init__(self, app, options=None):
-        self.options = options or {}
+        self.options = options or []
         self.application = app
         super().__init__()
 
     def load_config(self):
-        for key, value in self.options.items():
-            self.cfg.set(key.lower(), value)
+        for option in self.options:
+            self.cfg.set(option.key.lower(), option.value)
 
     def load(self):
         return self.application
@@ -138,10 +142,14 @@ def serve_logs():
     wsgi_app = create_app()
 
     worker_log_server_port = conf.getint('logging', 'WORKER_LOG_SERVER_PORT')
-    options = {
-        'bind': f"0.0.0.0:{worker_log_server_port}",
-        'workers': 2,
-    }
+
+    # Make sure to have both an IPv4 and an IPv6 interface.
+    # Gunicorn can bind multiple addresses, see https://docs.gunicorn.org/en/stable/settings.html#bind.
+    options = [
+        GunicornOption("bind", f"0.0.0.0:{worker_log_server_port}"),
+        GunicornOption("bind", f"[::]:{worker_log_server_port}"),
+        GunicornOption("workers", 2),
+    ]
     StandaloneGunicornApplication(wsgi_app, options).run()
 
 
diff --git a/newsfragments/24755.improvement.rst b/newsfragments/24755.improvement.rst
new file mode 100644
index 0000000000..1a75c283f4
--- /dev/null
+++ b/newsfragments/24755.improvement.rst
@@ -0,0 +1 @@
+Log server on worker binds IPv6 interface.


[airflow] 19/45: airflow/www/package.json: Add name, version fields. (#25065)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b96e53a420657f0431654a2e233f4252d8000db4
Author: John Soo <js...@users.noreply.github.com>
AuthorDate: Thu Jul 14 10:34:07 2022 -0700

    airflow/www/package.json: Add name, version fields. (#25065)
    
    These fields are required according to:
    
    https://docs.npmjs.com/creating-a-package-json-file#required-name-and-version-fields
    (cherry picked from commit 1fd59d61decdc1d7e493eca80a629d02533a4ba0)
---
 airflow/www/package.json | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/airflow/www/package.json b/airflow/www/package.json
index 15d237d1cd..644eaba084 100644
--- a/airflow/www/package.json
+++ b/airflow/www/package.json
@@ -1,4 +1,6 @@
 {
+  "name": "airflow-www",
+  "version": "1.0.0",
   "description": "Apache Airflow is a platform to programmatically author, schedule and monitor workflows.",
   "scripts": {
     "test": "jest",


[airflow] 45/45: Fix mapped sensor with reschedule mode (#25594)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4a8d13997ff95c7135e726290aa74ba798967a30
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Aug 11 08:30:33 2022 +0100

    Fix mapped sensor with reschedule mode (#25594)
    
    There are two issues with mapped sensor with `reschedule` mode. First, the reschedule table is being populated with a default map_index of -1 even when the map_index is not -1. Secondly, MappedOperator does not have the `ReadyToReschedule` dependency.
    This PR is an attempt to fix this
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 5f3733ea310b53a0a90c660dc94dd6e1ad5755b7)
---
 airflow/models/taskinstance.py                     |   1 +
 airflow/models/taskreschedule.py                   |   1 +
 airflow/serialization/serialized_objects.py        |   2 +-
 airflow/ti_deps/deps/ready_to_reschedule.py        |  11 +-
 tests/models/test_taskinstance.py                  | 217 +++++++++++++++++++++
 tests/serialization/test_dag_serialization.py      |  28 +++
 tests/ti_deps/deps/test_ready_to_reschedule_dep.py |  81 +++++++-
 7 files changed, 337 insertions(+), 4 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c369e84e47..4b7f9ebf7b 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1865,6 +1865,7 @@ class TaskInstance(Base, LoggingMixin):
                 actual_start_date,
                 self.end_date,
                 reschedule_exception.reschedule_date,
+                self.map_index,
             )
         )
 
diff --git a/airflow/models/taskreschedule.py b/airflow/models/taskreschedule.py
index 132554d8d1..13978fe186 100644
--- a/airflow/models/taskreschedule.py
+++ b/airflow/models/taskreschedule.py
@@ -112,6 +112,7 @@ class TaskReschedule(Base):
             TR.dag_id == task_instance.dag_id,
             TR.task_id == task_instance.task_id,
             TR.run_id == task_instance.run_id,
+            TR.map_index == task_instance.map_index,
             TR.try_number == try_number,
         )
         if descending:
diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index dd3dc4404e..5786f9231c 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -590,7 +590,7 @@ class SerializedBaseOperator(BaseOperator, BaseSerialization):
 
     @classmethod
     def serialize_mapped_operator(cls, op: MappedOperator) -> Dict[str, Any]:
-        serialized_op = cls._serialize_node(op, include_deps=op.deps is MappedOperator.deps_for(BaseOperator))
+        serialized_op = cls._serialize_node(op, include_deps=op.deps != MappedOperator.deps_for(BaseOperator))
 
         # Handle mapped_kwargs and mapped_op_kwargs.
         serialized_op[op._expansion_kwargs_attr] = cls._serialize(op._get_expansion_kwargs())
diff --git a/airflow/ti_deps/deps/ready_to_reschedule.py b/airflow/ti_deps/deps/ready_to_reschedule.py
index 9086822cea..88219bb401 100644
--- a/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -40,7 +40,11 @@ class ReadyToRescheduleDep(BaseTIDep):
         considered as passed. This dependency fails if the latest reschedule
         request's reschedule date is still in future.
         """
-        if not getattr(ti.task, "reschedule", False):
+        is_mapped = ti.task.is_mapped
+        if not is_mapped and not getattr(ti.task, "reschedule", False):
+            # Mapped sensors don't have the reschedule property (it can only
+            # be calculated after unmapping), so we don't check them here.
+            # They are handled below by checking TaskReschedule instead.
             yield self._passing_status(reason="Task is not in reschedule mode.")
             return
 
@@ -62,6 +66,11 @@ class ReadyToRescheduleDep(BaseTIDep):
             .first()
         )
         if not task_reschedule:
+            # Because mapped sensors don't have the reschedule property, here's the last resort
+            # and we need a slightly different passing reason
+            if is_mapped:
+                yield self._passing_status(reason="The task is mapped and not in reschedule mode")
+                return
             yield self._passing_status(reason="There is no reschedule request for this task instance.")
             return
 
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 1db5542904..74ef87489f 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -811,6 +811,174 @@ class TestTaskInstance:
         done, fail = True, False
         run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)
 
+    def test_mapped_reschedule_handling(self, dag_maker):
+        """
+        Test that mapped task reschedules are handled properly
+        """
+        # Return values of the python sensor callable, modified during tests
+        done = False
+        fail = False
+
+        def func():
+            if fail:
+                raise AirflowException()
+            return done
+
+        with dag_maker(dag_id='test_reschedule_handling') as dag:
+
+            task = PythonSensor.partial(
+                task_id='test_reschedule_handling_sensor',
+                mode='reschedule',
+                python_callable=func,
+                retries=1,
+                retry_delay=datetime.timedelta(seconds=0),
+            ).expand(poke_interval=[0])
+
+        ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
+
+        ti.task = task
+        assert ti._try_number == 0
+        assert ti.try_number == 1
+
+        def run_ti_and_assert(
+            run_date,
+            expected_start_date,
+            expected_end_date,
+            expected_duration,
+            expected_state,
+            expected_try_number,
+            expected_task_reschedule_count,
+        ):
+            ti.refresh_from_task(task)
+            with freeze_time(run_date):
+                try:
+                    ti.run()
+                except AirflowException:
+                    if not fail:
+                        raise
+            ti.refresh_from_db()
+            assert ti.state == expected_state
+            assert ti._try_number == expected_try_number
+            assert ti.try_number == expected_try_number + 1
+            assert ti.start_date == expected_start_date
+            assert ti.end_date == expected_end_date
+            assert ti.duration == expected_duration
+            trs = TaskReschedule.find_for_task_instance(ti)
+            assert len(trs) == expected_task_reschedule_count
+
+        date1 = timezone.utcnow()
+        date2 = date1 + datetime.timedelta(minutes=1)
+        date3 = date2 + datetime.timedelta(minutes=1)
+        date4 = date3 + datetime.timedelta(minutes=1)
+
+        # Run with multiple reschedules.
+        # During reschedule the try number remains the same, but each reschedule is recorded.
+        # The start date is expected to remain the initial date, hence the duration increases.
+        # When finished the try number is incremented and there is no reschedule expected
+        # for this try.
+
+        done, fail = False, False
+        run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 0, 1)
+
+        done, fail = False, False
+        run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RESCHEDULE, 0, 2)
+
+        done, fail = False, False
+        run_ti_and_assert(date3, date1, date3, 120, State.UP_FOR_RESCHEDULE, 0, 3)
+
+        done, fail = True, False
+        run_ti_and_assert(date4, date1, date4, 180, State.SUCCESS, 1, 0)
+
+        # Clear the task instance.
+        dag.clear()
+        ti.refresh_from_db()
+        assert ti.state == State.NONE
+        assert ti._try_number == 1
+
+        # Run again after clearing with reschedules and a retry.
+        # The retry increments the try number, and for that try no reschedule is expected.
+        # After the retry the start date is reset, hence the duration is also reset.
+
+        done, fail = False, False
+        run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 1, 1)
+
+        done, fail = False, True
+        run_ti_and_assert(date2, date1, date2, 60, State.UP_FOR_RETRY, 2, 0)
+
+        done, fail = False, False
+        run_ti_and_assert(date3, date3, date3, 0, State.UP_FOR_RESCHEDULE, 2, 1)
+
+        done, fail = True, False
+        run_ti_and_assert(date4, date3, date4, 60, State.SUCCESS, 3, 0)
+
+    @pytest.mark.usefixtures('test_pool')
+    def test_mapped_task_reschedule_handling_clear_reschedules(self, dag_maker):
+        """
+        Test that mapped task reschedules clearing are handled properly
+        """
+        # Return values of the python sensor callable, modified during tests
+        done = False
+        fail = False
+
+        def func():
+            if fail:
+                raise AirflowException()
+            return done
+
+        with dag_maker(dag_id='test_reschedule_handling') as dag:
+            task = PythonSensor.partial(
+                task_id='test_reschedule_handling_sensor',
+                mode='reschedule',
+                python_callable=func,
+                retries=1,
+                retry_delay=datetime.timedelta(seconds=0),
+                pool='test_pool',
+            ).expand(poke_interval=[0])
+        ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
+        ti.task = task
+        assert ti._try_number == 0
+        assert ti.try_number == 1
+
+        def run_ti_and_assert(
+            run_date,
+            expected_start_date,
+            expected_end_date,
+            expected_duration,
+            expected_state,
+            expected_try_number,
+            expected_task_reschedule_count,
+        ):
+            ti.refresh_from_task(task)
+            with freeze_time(run_date):
+                try:
+                    ti.run()
+                except AirflowException:
+                    if not fail:
+                        raise
+            ti.refresh_from_db()
+            assert ti.state == expected_state
+            assert ti._try_number == expected_try_number
+            assert ti.try_number == expected_try_number + 1
+            assert ti.start_date == expected_start_date
+            assert ti.end_date == expected_end_date
+            assert ti.duration == expected_duration
+            trs = TaskReschedule.find_for_task_instance(ti)
+            assert len(trs) == expected_task_reschedule_count
+
+        date1 = timezone.utcnow()
+
+        done, fail = False, False
+        run_ti_and_assert(date1, date1, date1, 0, State.UP_FOR_RESCHEDULE, 0, 1)
+
+        # Clear the task instance.
+        dag.clear()
+        ti.refresh_from_db()
+        assert ti.state == State.NONE
+        assert ti._try_number == 0
+        # Check that reschedules for ti have also been cleared.
+        trs = TaskReschedule.find_for_task_instance(ti)
+        assert not trs
+
     @pytest.mark.usefixtures('test_pool')
     def test_reschedule_handling_clear_reschedules(self, dag_maker):
         """
@@ -2412,6 +2580,55 @@ def test_sensor_timeout(mode, retries, dag_maker):
     assert ti.state == State.FAILED
 
 
+@pytest.mark.parametrize("mode", ["poke", "reschedule"])
+@pytest.mark.parametrize("retries", [0, 1])
+def test_mapped_sensor_timeout(mode, retries, dag_maker):
+    """
+    Test that AirflowSensorTimeout does not cause mapped sensor to retry.
+    """
+
+    def timeout():
+        raise AirflowSensorTimeout
+
+    mock_on_failure = mock.MagicMock()
+    with dag_maker(dag_id=f'test_sensor_timeout_{mode}_{retries}'):
+        PythonSensor.partial(
+            task_id='test_raise_sensor_timeout',
+            python_callable=timeout,
+            on_failure_callback=mock_on_failure,
+            retries=retries,
+        ).expand(mode=[mode])
+    ti = dag_maker.create_dagrun(execution_date=timezone.utcnow()).task_instances[0]
+
+    with pytest.raises(AirflowSensorTimeout):
+        ti.run()
+
+    assert mock_on_failure.called
+    assert ti.state == State.FAILED
+
+
+@pytest.mark.parametrize("mode", ["poke", "reschedule"])
+@pytest.mark.parametrize("retries", [0, 1])
+def test_mapped_sensor_works(mode, retries, dag_maker):
+    """
+    Test that mapped sensors reaches success state.
+    """
+
+    def timeout(ti):
+        return 1
+
+    with dag_maker(dag_id=f'test_sensor_timeout_{mode}_{retries}'):
+        PythonSensor.partial(
+            task_id='test_raise_sensor_timeout',
+            python_callable=timeout,
+            retries=retries,
+        ).expand(mode=[mode])
+    ti = dag_maker.create_dagrun().task_instances[0]
+
+    ti.run()
+    assert ti.state == State.SUCCESS
+
+
 class TestTaskInstanceRecordTaskMapXComPush:
     """Test TI.xcom_push() correctly records return values for task-mapping."""
 
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index 7d6a43e933..6f5b9b49eb 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -43,6 +43,7 @@ from airflow.models.param import Param, ParamsDict
 from airflow.models.xcom import XCOM_RETURN_KEY, XCom
 from airflow.operators.bash import BashOperator
 from airflow.security import permissions
+from airflow.sensors.bash import BashSensor
 from airflow.serialization.json_schema import load_dag_schema_dict
 from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
@@ -1228,6 +1229,7 @@ class TestStringifiedDAGs:
             task1 >> task2
 
         serialize_op = SerializedBaseOperator.serialize_operator(dag.task_dict["task1"])
+
         deps = serialize_op["deps"]
         assert deps == [
             'airflow.ti_deps.deps.not_in_retry_period_dep.NotInRetryPeriodDep',
@@ -1465,6 +1467,21 @@ class TestStringifiedDAGs:
         assert serialized_op.reschedule == (mode == "reschedule")
         assert op.deps == serialized_op.deps
 
+    @pytest.mark.parametrize("mode", ["poke", "reschedule"])
+    def test_serialize_mapped_sensor_has_reschedule_dep(self, mode):
+        from airflow.sensors.base import BaseSensorOperator
+
+        class DummySensor(BaseSensorOperator):
+            def poke(self, context: Context):
+                return False
+
+        op = DummySensor.partial(task_id='dummy', mode=mode).expand(poke_interval=[23])
+
+        blob = SerializedBaseOperator.serialize_mapped_operator(op)
+        assert "deps" in blob
+
+        assert 'airflow.ti_deps.deps.ready_to_reschedule.ReadyToRescheduleDep' in blob['deps']
+
     @pytest.mark.parametrize(
         "passed_success_callback, expected_value",
         [
@@ -1778,6 +1795,17 @@ def test_mapped_operator_deserialized_unmap():
     assert deserialize(serialize(mapped)).unmap() == deserialize(serialize(normal))
 
 
+def test_sensor_expand_deserialized_unmap():
+    """Unmap a deserialized mapped sensor should be similar to deserializing a non-mapped sensor"""
+    normal = BashSensor(task_id='a', bash_command=[1, 2], mode='reschedule')
+    mapped = BashSensor.partial(task_id='a', mode='reschedule').expand(bash_command=[1, 2])
+
+    serialize = SerializedBaseOperator._serialize
+
+    deserialize = SerializedBaseOperator.deserialize_operator
+    assert deserialize(serialize(mapped)).unmap(None) == deserialize(serialize(normal))
+
+
 def test_task_resources_serde():
     """
     Test task resources serialization/deserialization.
diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
index 99416bbbc8..2ab8c539dc 100644
--- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -31,12 +31,30 @@ from airflow.utils.timezone import utcnow
 class TestNotInReschedulePeriodDep(unittest.TestCase):
     def _get_task_instance(self, state):
         dag = DAG('test_dag')
-        task = Mock(dag=dag, reschedule=True)
+        task = Mock(dag=dag, reschedule=True, is_mapped=False)
         ti = TaskInstance(task=task, state=state, run_id=None)
         return ti
 
     def _get_task_reschedule(self, reschedule_date):
-        task = Mock(dag_id='test_dag', task_id='test_task')
+        task = Mock(dag_id='test_dag', task_id='test_task', is_mapped=False)
+        reschedule = TaskReschedule(
+            task=task,
+            run_id=None,
+            try_number=None,
+            start_date=reschedule_date,
+            end_date=reschedule_date,
+            reschedule_date=reschedule_date,
+        )
+        return reschedule
+
+    def _get_mapped_task_instance(self, state):
+        dag = DAG('test_dag')
+        task = Mock(dag=dag, reschedule=True, is_mapped=True)
+        ti = TaskInstance(task=task, state=state, run_id=None)
+        return ti
+
+    def _get_mapped_task_reschedule(self, reschedule_date):
+        task = Mock(dag_id='test_dag', task_id='test_task', is_mapped=True)
         reschedule = TaskReschedule(
             task=task,
             run_id=None,
@@ -103,3 +121,62 @@ class TestNotInReschedulePeriodDep(unittest.TestCase):
         ][-1]
         ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         assert not ReadyToRescheduleDep().is_met(ti=ti)
+
+    def test_mapped_task_should_pass_if_ignore_in_reschedule_period_is_set(self):
+        ti = self._get_mapped_task_instance(State.UP_FOR_RESCHEDULE)
+        dep_context = DepContext(ignore_in_reschedule_period=True)
+        assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
+
+    @patch('airflow.models.taskreschedule.TaskReschedule.query_for_task_instance')
+    def test_mapped_task_should_pass_if_not_reschedule_mode(self, mock_query_for_task_instance):
+        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = []
+        ti = self._get_mapped_task_instance(State.UP_FOR_RESCHEDULE)
+        del ti.task.reschedule
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
+    def test_mapped_task_should_pass_if_not_in_none_state(self):
+        ti = self._get_mapped_task_instance(State.UP_FOR_RETRY)
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
+    @patch('airflow.models.taskreschedule.TaskReschedule.query_for_task_instance')
+    def test_mapped_should_pass_if_no_reschedule_record_exists(self, mock_query_for_task_instance):
+        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = []
+        ti = self._get_mapped_task_instance(State.NONE)
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
+    @patch('airflow.models.taskreschedule.TaskReschedule.query_for_task_instance')
+    def test_mapped_should_pass_after_reschedule_date_one(self, mock_query_for_task_instance):
+        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = (
+            self._get_mapped_task_reschedule(utcnow() - timedelta(minutes=1))
+        )
+        ti = self._get_mapped_task_instance(State.UP_FOR_RESCHEDULE)
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
+    @patch('airflow.models.taskreschedule.TaskReschedule.query_for_task_instance')
+    def test_mapped_task_should_pass_after_reschedule_date_multiple(self, mock_query_for_task_instance):
+        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = [
+            self._get_mapped_task_reschedule(utcnow() - timedelta(minutes=21)),
+            self._get_mapped_task_reschedule(utcnow() - timedelta(minutes=11)),
+            self._get_mapped_task_reschedule(utcnow() - timedelta(minutes=1)),
+        ][-1]
+        ti = self._get_mapped_task_instance(State.UP_FOR_RESCHEDULE)
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
+    @patch('airflow.models.taskreschedule.TaskReschedule.query_for_task_instance')
+    def test_mapped_task_should_fail_before_reschedule_date_one(self, mock_query_for_task_instance):
+        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = (
+            self._get_mapped_task_reschedule(utcnow() + timedelta(minutes=1))
+        )
+
+        ti = self._get_mapped_task_instance(State.UP_FOR_RESCHEDULE)
+        assert not ReadyToRescheduleDep().is_met(ti=ti)
+
+    @patch('airflow.models.taskreschedule.TaskReschedule.query_for_task_instance')
+    def test_mapped_task_should_fail_before_reschedule_date_multiple(self, mock_query_for_task_instance):
+        mock_query_for_task_instance.return_value.with_entities.return_value.first.return_value = [
+            self._get_mapped_task_reschedule(utcnow() - timedelta(minutes=19)),
+            self._get_mapped_task_reschedule(utcnow() - timedelta(minutes=9)),
+            self._get_mapped_task_reschedule(utcnow() + timedelta(minutes=1)),
+        ][-1]
+        ti = self._get_mapped_task_instance(State.UP_FOR_RESCHEDULE)
+        assert not ReadyToRescheduleDep().is_met(ti=ti)


[airflow] 30/45: Allow wildcarded CORS origins (#25553)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6821fe12f880696d5219057278b6d2a6c425bd86
Author: Mark Norman Francis <no...@201created.com>
AuthorDate: Fri Aug 5 18:41:05 2022 +0100

    Allow wildcarded CORS origins (#25553)
    
    '*' is a valid 'Access-Control-Allow-Origin' response, but was being
    dropped as it failed to match the Origin header sent in requests.
    
    (cherry picked from commit e81b27e713e9ef6f7104c7038f0c37cc55d96593)
---
 airflow/www/extensions/init_views.py |   8 +-
 tests/api_connexion/test_cors.py     | 140 +++++++++++++++++++++++++++++++++++
 2 files changed, 145 insertions(+), 3 deletions(-)

diff --git a/airflow/www/extensions/init_views.py b/airflow/www/extensions/init_views.py
index 83dbc50eaa..4a2d4a5119 100644
--- a/airflow/www/extensions/init_views.py
+++ b/airflow/www/extensions/init_views.py
@@ -159,11 +159,13 @@ def set_cors_headers_on_response(response):
     allow_headers = conf.get('api', 'access_control_allow_headers')
     allow_methods = conf.get('api', 'access_control_allow_methods')
     allow_origins = conf.get('api', 'access_control_allow_origins')
-    if allow_headers is not None:
+    if allow_headers:
         response.headers['Access-Control-Allow-Headers'] = allow_headers
-    if allow_methods is not None:
+    if allow_methods:
         response.headers['Access-Control-Allow-Methods'] = allow_methods
-    if allow_origins is not None:
+    if allow_origins == '*':
+        response.headers['Access-Control-Allow-Origin'] = '*'
+    elif allow_origins:
         allowed_origins = allow_origins.split(' ')
         origin = request.environ.get('HTTP_ORIGIN', allowed_origins[0])
         if origin in allowed_origins:
diff --git a/tests/api_connexion/test_cors.py b/tests/api_connexion/test_cors.py
new file mode 100644
index 0000000000..30ae19236d
--- /dev/null
+++ b/tests/api_connexion/test_cors.py
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from base64 import b64encode
+
+import pytest
+
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_pools
+
+
+class BaseTestAuth:
+    @pytest.fixture(autouse=True)
+    def set_attrs(self, minimal_app_for_api):
+        self.app = minimal_app_for_api
+
+        sm = self.app.appbuilder.sm
+        tester = sm.find_user(username="test")
+        if not tester:
+            role_admin = sm.find_role("Admin")
+            sm.add_user(
+                username="test",
+                first_name="test",
+                last_name="test",
+                email="test@fab.org",
+                role=role_admin,
+                password="test",
+            )
+
+
+class TestEmptyCors(BaseTestAuth):
+    @pytest.fixture(autouse=True, scope="class")
+    def with_basic_auth_backend(self, minimal_app_for_api):
+        from airflow.www.extensions.init_security import init_api_experimental_auth
+
+        old_auth = getattr(minimal_app_for_api, 'api_auth')
+
+        try:
+            with conf_vars({("api", "auth_backends"): "airflow.api.auth.backend.basic_auth"}):
+                init_api_experimental_auth(minimal_app_for_api)
+                yield
+        finally:
+            setattr(minimal_app_for_api, 'api_auth', old_auth)
+
+    def test_empty_cors_headers(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        clear_db_pools()
+
+        with self.app.test_client() as test_client:
+            response = test_client.get("/api/v1/pools", headers={"Authorization": token})
+            assert response.status_code == 200
+            assert 'Access-Control-Allow-Headers' not in response.headers
+            assert 'Access-Control-Allow-Methods' not in response.headers
+            assert 'Access-Control-Allow-Origin' not in response.headers
+
+
+class TestCorsOrigin(BaseTestAuth):
+    @pytest.fixture(autouse=True, scope="class")
+    def with_basic_auth_backend(self, minimal_app_for_api):
+        from airflow.www.extensions.init_security import init_api_experimental_auth
+
+        old_auth = getattr(minimal_app_for_api, 'api_auth')
+
+        try:
+            with conf_vars(
+                {
+                    ("api", "auth_backends"): "airflow.api.auth.backend.basic_auth",
+                    ("api", "access_control_allow_origins"): "http://apache.org http://example.com",
+                }
+            ):
+                init_api_experimental_auth(minimal_app_for_api)
+                yield
+        finally:
+            setattr(minimal_app_for_api, 'api_auth', old_auth)
+
+    def test_cors_origin_reflection(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        clear_db_pools()
+
+        with self.app.test_client() as test_client:
+            response = test_client.get("/api/v1/pools", headers={"Authorization": token})
+            assert response.status_code == 200
+            assert response.headers['Access-Control-Allow-Origin'] == 'http://apache.org'
+
+            response = test_client.get(
+                "/api/v1/pools", headers={"Authorization": token, "Origin": "http://apache.org"}
+            )
+            assert response.status_code == 200
+            assert response.headers['Access-Control-Allow-Origin'] == 'http://apache.org'
+
+            response = test_client.get(
+                "/api/v1/pools", headers={"Authorization": token, "Origin": "http://example.com"}
+            )
+            assert response.status_code == 200
+            assert response.headers['Access-Control-Allow-Origin'] == 'http://example.com'
+
+
+class TestCorsWildcard(BaseTestAuth):
+    @pytest.fixture(autouse=True, scope="class")
+    def with_basic_auth_backend(self, minimal_app_for_api):
+        from airflow.www.extensions.init_security import init_api_experimental_auth
+
+        old_auth = getattr(minimal_app_for_api, 'api_auth')
+
+        try:
+            with conf_vars(
+                {
+                    ("api", "auth_backends"): "airflow.api.auth.backend.basic_auth",
+                    ("api", "access_control_allow_origins"): "*",
+                }
+            ):
+                init_api_experimental_auth(minimal_app_for_api)
+                yield
+        finally:
+            setattr(minimal_app_for_api, 'api_auth', old_auth)
+
+    def test_cors_origin_reflection(self):
+        token = "Basic " + b64encode(b"test:test").decode()
+        clear_db_pools()
+
+        with self.app.test_client() as test_client:
+            response = test_client.get(
+                "/api/v1/pools", headers={"Authorization": token, "Origin": "http://example.com"}
+            )
+            assert response.status_code == 200
+            assert response.headers['Access-Control-Allow-Origin'] == '*'


[airflow] 28/45: Fix Serialization error in TaskCallbackRequest (#25471)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 385f04ba345e872dc31de62113e2f46e01fd1d4a
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Aug 2 22:50:40 2022 +0100

    Fix Serialization error in TaskCallbackRequest (#25471)
    
    How we serialize `SimpleTaskInstance `in `TaskCallbackRequest` class leads to JSON serialization error when there's start_date or end_date in the task instance. Since there's always a start_date on tis, this would always fail.
    This PR aims to fix this through a new method on the SimpleTaskInstance that looks for start_date/end_date and converts them to isoformat for serialization.
    
    (cherry picked from commit d7e14ba0d612d8315238f9d0cba4ef8c44b6867c)
---
 airflow/callbacks/callback_requests.py    |  2 +-
 airflow/models/taskinstance.py            | 10 ++++++++++
 tests/callbacks/test_callback_requests.py | 21 +++++++++++++++++----
 3 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/airflow/callbacks/callback_requests.py b/airflow/callbacks/callback_requests.py
index 8112589cd0..b04a201c08 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -75,7 +75,7 @@ class TaskCallbackRequest(CallbackRequest):
 
     def to_json(self) -> str:
         dict_obj = self.__dict__.copy()
-        dict_obj["simple_task_instance"] = dict_obj["simple_task_instance"].__dict__
+        dict_obj["simple_task_instance"] = self.simple_task_instance.as_dict()
         return json.dumps(dict_obj)
 
     @classmethod
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index debd0aa6b0..33fe7a3f53 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -2650,6 +2650,16 @@ class SimpleTaskInstance:
             return self.__dict__ == other.__dict__
         return NotImplemented
 
+    def as_dict(self):
+        new_dict = dict(self.__dict__)
+        for key in new_dict:
+            if key in ['start_date', 'end_date']:
+                val = new_dict[key]
+                if not val or isinstance(val, str):
+                    continue
+                new_dict.update({key: val.isoformat()})
+        return new_dict
+
     @classmethod
     def from_ti(cls, ti: TaskInstance):
         return cls(
diff --git a/tests/callbacks/test_callback_requests.py b/tests/callbacks/test_callback_requests.py
index 286d64eaa1..3764f19c4c 100644
--- a/tests/callbacks/test_callback_requests.py
+++ b/tests/callbacks/test_callback_requests.py
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import unittest
 from datetime import datetime
 
 from parameterized import parameterized
@@ -29,6 +28,7 @@ from airflow.callbacks.callback_requests import (
 from airflow.models.dag import DAG
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
 from airflow.operators.bash import BashOperator
+from airflow.utils import timezone
 from airflow.utils.state import State
 
 TI = TaskInstance(
@@ -38,7 +38,7 @@ TI = TaskInstance(
 )
 
 
-class TestCallbackRequest(unittest.TestCase):
+class TestCallbackRequest:
     @parameterized.expand(
         [
             (CallbackRequest(full_filepath="filepath", msg="task_failure"), CallbackRequest),
@@ -64,7 +64,20 @@ class TestCallbackRequest(unittest.TestCase):
     )
     def test_from_json(self, input, request_class):
         json_str = input.to_json()
-
         result = request_class.from_json(json_str=json_str)
+        assert result == input
 
-        self.assertEqual(result, input)
+    def test_taskcallback_to_json_with_start_date_and_end_date(self, session, create_task_instance):
+        ti = create_task_instance()
+        ti.start_date = timezone.utcnow()
+        ti.end_date = timezone.utcnow()
+        session.merge(ti)
+        session.flush()
+        input = TaskCallbackRequest(
+            full_filepath="filepath",
+            simple_task_instance=SimpleTaskInstance.from_ti(ti),
+            is_failure_callback=True,
+        )
+        json_str = input.to_json()
+        result = TaskCallbackRequest.from_json(json_str)
+        assert input == result


[airflow] 16/45: Update 2.3.3 date in release notes (#25004)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 48de9c7791c8470ec67833fafb382bdc3ddb0e98
Author: Josh Fell <48...@users.noreply.github.com>
AuthorDate: Tue Jul 12 16:47:29 2022 -0400

    Update 2.3.3 date in release notes (#25004)
    
    (cherry picked from commit fb7c30f7a3e503667d6c224bd9ae7eec6f7a82c2)
---
 RELEASE_NOTES.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst
index 9b445081cf..546a1e1c79 100644
--- a/RELEASE_NOTES.rst
+++ b/RELEASE_NOTES.rst
@@ -22,7 +22,7 @@
 .. towncrier release notes start
 
 
-Airflow 2.3.3 (2022-07-05)
+Airflow 2.3.3 (2022-07-09)
 --------------------------
 
 Significant Changes


[airflow] 36/45: Remove useless logging line (#25347)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit aff5994e2bd30d5288adb7489ab734c2f6583757
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Thu Jul 28 01:28:22 2022 +0200

    Remove useless logging line (#25347)
    
    (cherry picked from commit 5efe6f34fd4a00f388d2979a708c90884f2e09ac)
---
 airflow/models/dagrun.py | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 0e7d4e1374..1b746e8a06 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -647,9 +647,10 @@ class DagRun(Base, LoggingMixin):
                 try:
                     ti.task = dag.get_task(ti.task_id)
                 except TaskNotFound:
-                    self.log.error("Failed to get task for ti %s. Marking it as removed.", ti)
-                    ti.state = State.REMOVED
-                    session.flush()
+                    if ti.state != State.REMOVED:
+                        self.log.error("Failed to get task for ti %s. Marking it as removed.", ti)
+                        ti.state = State.REMOVED
+                        session.flush()
                 else:
                     yield ti
 


[airflow] 23/45: call updateNodeLabels after expandGroup (#25217)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2f6d9ff25c87c30114091c3d0c80d57155c3cc58
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Thu Jul 21 08:33:12 2022 -0400

    call updateNodeLabels after expandGroup (#25217)
    
    (cherry picked from commit 90a157a64ade1e4efe368889ac0f88a6fe551156)
---
 airflow/www/static/js/graph.js | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index d45557a3f6..b58e2c2002 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -176,6 +176,7 @@ function draw() {
       // A group node
       if (d3.event.defaultPrevented) return;
       expandGroup(nodeId, node);
+      updateNodeLabels(nodes, taskInstances);
       draw();
       focusGroup(nodeId);
     } else if (nodeId in taskInstances) {


[airflow] 39/45: set default task group in dag.add_task method (#25000)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dffcb5d5cd4b47d2d203b15bf8038f2e13245b5b
Author: Hossein Torabi <Ho...@shell.com>
AuthorDate: Fri Aug 5 17:17:38 2022 +0200

    set default task group in dag.add_task method (#25000)
    
    Signed-off-by: Hossein Torabi <bl...@gmail.com>
    (cherry picked from commit ce0a6e51c2d4ee87e008e28897b2450778b51003)
---
 airflow/models/dag.py       | 19 ++++++++++++++-----
 airflow/models/taskmixin.py |  6 +-----
 tests/models/test_dag.py    | 14 ++++++++++++++
 3 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4370f36c3a..2a00abdded 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2167,6 +2167,8 @@ class DAG(LoggingMixin):
 
         :param task: the task you want to add
         """
+        from airflow.utils.task_group import TaskGroupContext
+
         if not self.start_date and not task.start_date:
             raise AirflowException("DAG is missing the start_date parameter")
         # if the task has no start date, assign it the same as the DAG
@@ -2185,15 +2187,22 @@ class DAG(LoggingMixin):
         elif task.end_date and self.end_date:
             task.end_date = min(task.end_date, self.end_date)
 
+        task_id = task.task_id
+        if not task.task_group:
+            task_group = TaskGroupContext.get_current_task_group(self)
+            if task_group:
+                task_id = task_group.child_id(task_id)
+                task_group.add(task)
+
         if (
-            task.task_id in self.task_dict and self.task_dict[task.task_id] is not task
-        ) or task.task_id in self._task_group.used_group_ids:
-            raise DuplicateTaskIdFound(f"Task id '{task.task_id}' has already been added to the DAG")
+            task_id in self.task_dict and self.task_dict[task_id] is not task
+        ) or task_id in self._task_group.used_group_ids:
+            raise DuplicateTaskIdFound(f"Task id '{task_id}' has already been added to the DAG")
         else:
-            self.task_dict[task.task_id] = task
+            self.task_dict[task_id] = task
             task.dag = self
             # Add task_id to used_group_ids to prevent group_id and task_id collisions.
-            self._task_group.used_group_ids.add(task.task_id)
+            self._task_group.used_group_ids.add(task_id)
 
         self.task_count = len(self.task_dict)
 
diff --git a/airflow/models/taskmixin.py b/airflow/models/taskmixin.py
index 06494946a8..7a70e328d2 100644
--- a/airflow/models/taskmixin.py
+++ b/airflow/models/taskmixin.py
@@ -195,10 +195,8 @@ class DAGNode(DependencyMixin, metaclass=ABCMeta):
             )
 
         if not self.has_dag():
-            # If this task does not yet have a dag, add it to the same dag as the other task and
-            # put it in the dag's root TaskGroup.
+            # If this task does not yet have a dag, add it to the same dag as the other task.
             self.dag = dag
-            self.dag.task_group.add(self)
 
         def add_only_new(obj, item_set: Set[str], item: str) -> None:
             """Adds only new items to item set"""
@@ -210,9 +208,7 @@ class DAGNode(DependencyMixin, metaclass=ABCMeta):
         for task in task_list:
             if dag and not task.has_dag():
                 # If the other task does not yet have a dag, add it to the same dag as this task and
-                # put it in the dag's root TaskGroup.
                 dag.add_task(task)
-                dag.task_group.add(task)
             if upstream:
                 add_only_new(task, task.downstream_task_ids, self.node_id)
                 add_only_new(self, self.upstream_task_ids, task.node_id)
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0164ce0f87..bf6f760006 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -59,6 +59,7 @@ from airflow.utils import timezone
 from airflow.utils.file import list_py_file_paths
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import DagRunState, State, TaskInstanceState
+from airflow.utils.task_group import TaskGroup, TaskGroupContext
 from airflow.utils.timezone import datetime as datetime_tz
 from airflow.utils.types import DagRunType
 from airflow.utils.weight_rule import WeightRule
@@ -1364,6 +1365,19 @@ class TestDag(unittest.TestCase):
         )
         assert dr.creating_job_id == job_id
 
+    def test_dag_add_task_sets_default_task_group(self):
+        dag = DAG(dag_id="test_dag_add_task_sets_default_task_group", start_date=DEFAULT_DATE)
+        task_without_task_group = EmptyOperator(task_id="task_without_group_id")
+        default_task_group = TaskGroupContext.get_current_task_group(dag)
+        dag.add_task(task_without_task_group)
+        assert default_task_group.get_child_by_label("task_without_group_id") == task_without_task_group
+
+        task_group = TaskGroup(group_id="task_group", dag=dag)
+        task_with_task_group = EmptyOperator(task_id="task_with_task_group", task_group=task_group)
+        dag.add_task(task_with_task_group)
+        assert task_group.get_child_by_label("task_with_task_group") == task_with_task_group
+        assert dag.get_task("task_group.task_with_task_group") == task_with_task_group
+
     @parameterized.expand(
         [
             (State.QUEUED,),


[airflow] 32/45: Configurable umask to all deamonized processes. (#25664)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b6a2cd1aa34f69a36ea127e4f7f5ba87f4aca420
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Thu Aug 11 17:03:55 2022 +0100

    Configurable umask to all deamonized processes. (#25664)
    
    We previously had this for just the `celery worker` subcommand, this PR
    extends it to anything that can run in daemon mode
    
    (cherry picked from commit bf14d14e272d24ee7a3f798c242593078359383e)
---
 airflow/cli/cli_parser.py                     |  1 -
 airflow/cli/commands/celery_command.py        |  3 +++
 airflow/cli/commands/dag_processor_command.py |  2 ++
 airflow/cli/commands/kerberos_command.py      |  1 +
 airflow/cli/commands/scheduler_command.py     |  1 +
 airflow/cli/commands/triggerer_command.py     |  1 +
 airflow/cli/commands/webserver_command.py     |  1 +
 airflow/config_templates/config.yml           | 22 +++++++++++++---------
 airflow/config_templates/default_airflow.cfg  | 13 ++++++++-----
 airflow/configuration.py                      | 10 ++++++++++
 airflow/settings.py                           |  2 ++
 tests/cli/commands/test_celery_command.py     |  1 +
 tests/cli/commands/test_kerberos_command.py   |  1 +
 13 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index d494aa0f06..ae71d2007a 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -667,7 +667,6 @@ ARG_CELERY_HOSTNAME = Arg(
 ARG_UMASK = Arg(
     ("-u", "--umask"),
     help="Set the umask of celery worker in daemon mode",
-    default=conf.get('celery', 'worker_umask'),
 )
 ARG_WITHOUT_MINGLE = Arg(
     ("--without-mingle",),
diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py
index affe032411..edb61c1b9d 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -72,6 +72,7 @@ def flower(args):
                 pidfile=TimeoutPIDLockFile(pidfile, -1),
                 stdout=stdout,
                 stderr=stderr,
+                umask=int(settings.DAEMON_UMASK, 8),
             )
             with ctx:
                 celery_app.start(options)
@@ -180,6 +181,8 @@ def worker(args):
         with open(stdout, 'w+') as stdout_handle, open(stderr, 'w+') as stderr_handle:
             if args.umask:
                 umask = args.umask
+            else:
+                umask = conf.get('celery', 'worker_umask', fallback=settings.DAEMON_UMASK)
 
             ctx = daemon.DaemonContext(
                 files_preserve=[handle],
diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py
index d57e26510c..e6a27072cd 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -22,6 +22,7 @@ from datetime import timedelta
 import daemon
 from daemon.pidfile import TimeoutPIDLockFile
 
+from airflow import settings
 from airflow.configuration import conf
 from airflow.dag_processing.manager import DagFileProcessorManager
 from airflow.utils import cli as cli_utils
@@ -66,6 +67,7 @@ def dag_processor(args):
                 files_preserve=[handle],
                 stdout=stdout_handle,
                 stderr=stderr_handle,
+                umask=int(settings.DAEMON_UMASK, 8),
             )
             with ctx:
                 try:
diff --git a/airflow/cli/commands/kerberos_command.py b/airflow/cli/commands/kerberos_command.py
index fea8743499..51acdf62df 100644
--- a/airflow/cli/commands/kerberos_command.py
+++ b/airflow/cli/commands/kerberos_command.py
@@ -39,6 +39,7 @@ def kerberos(args):
                 pidfile=TimeoutPIDLockFile(pid, -1),
                 stdout=stdout_handle,
                 stderr=stderr_handle,
+                umask=int(settings.DAEMON_UMASK, 8),
             )
 
             with ctx:
diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py
index bc6e983ee5..ab24dd21c7 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -65,6 +65,7 @@ def scheduler(args):
                 files_preserve=[handle],
                 stdout=stdout_handle,
                 stderr=stderr_handle,
+                umask=int(settings.DAEMON_UMASK, 8),
             )
             with ctx:
                 _run_scheduler_job(args=args)
diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py
index 82e7fde129..70f860034d 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -45,6 +45,7 @@ def triggerer(args):
                 files_preserve=[handle],
                 stdout=stdout_handle,
                 stderr=stderr_handle,
+                umask=int(settings.DAEMON_UMASK, 8),
             )
             with ctx:
                 job.run()
diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py
index c74513d23e..f1eeb8000f 100644
--- a/airflow/cli/commands/webserver_command.py
+++ b/airflow/cli/commands/webserver_command.py
@@ -455,6 +455,7 @@ def webserver(args):
                     files_preserve=[handle],
                     stdout=stdout,
                     stderr=stderr,
+                    umask=int(settings.DAEMON_UMASK, 8),
                 )
                 with ctx:
                     subprocess.Popen(run_args, close_fds=True)
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index 4ea090bcd9..9a97790ab7 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -366,6 +366,19 @@
       example: ~
       default: "1024"
 
+    - name: daemon_umask
+      description: |
+        The default umask to use for process when run in daemon mode (scheduler, worker,  etc.)
+
+        This controls the file-creation mode mask which determines the initial value of file permission bits
+        for newly created files.
+
+        This value is treated as an octal-integer.
+      version_added: 2.3.4
+      type: string
+      default: "0o077"
+      example: ~
+
 - name: database
   description: ~
   options:
@@ -1643,15 +1656,6 @@
       type: boolean
       example: ~
       default: "true"
-    - name: worker_umask
-      description: |
-        Umask that will be used when starting workers with the ``airflow celery worker``
-        in daemon mode. This control the file-creation mode mask which determines the initial
-        value of file permission bits for newly created files.
-      version_added: 2.0.0
-      type: string
-      example: ~
-      default: "0o077"
     - name: broker_url
       description: |
         The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 86f9cf93fc..16f8d8a0d9 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -208,6 +208,14 @@ default_pool_task_slot_count = 128
 # mapped tasks from clogging the scheduler.
 max_map_length = 1024
 
+# The default umask to use for process when run in daemon mode (scheduler, worker,  etc.)
+#
+# This controls the file-creation mode mask which determines the initial value of file permission bits
+# for newly created files.
+#
+# This value is treated as an octal-integer.
+daemon_umask = 0o077
+
 [database]
 # The SqlAlchemy connection string to the metadata database.
 # SqlAlchemy supports many different database engines.
@@ -829,11 +837,6 @@ worker_prefetch_multiplier = 1
 # prevent this by setting this to false. However, with this disabled Flower won't work.
 worker_enable_remote_control = true
 
-# Umask that will be used when starting workers with the ``airflow celery worker``
-# in daemon mode. This control the file-creation mode mask which determines the initial
-# value of file permission bits for newly created files.
-worker_umask = 0o077
-
 # The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
 # a sqlalchemy database. Refer to the Celery documentation for more information.
 broker_url = redis://redis:6379/0
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 4f2caa186e..20dd3c13d9 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -517,7 +517,17 @@ class AirflowConfigParser(ConfigParser):
             raise ValueError(f"The value {section}/{key} should be set!")
         return value
 
+    @overload  # type: ignore[override]
+    def get(self, section: str, key: str, fallback: str = ..., **kwargs) -> str:  # type: ignore[override]
+
+        ...
+
+    @overload  # type: ignore[override]
     def get(self, section: str, key: str, **kwargs) -> Optional[str]:  # type: ignore[override]
+
+        ...
+
+    def get(self, section: str, key: str, **kwargs) -> Optional[str]:  # type: ignore[override, misc]
         section = str(section).lower()
         key = str(key).lower()
 
diff --git a/airflow/settings.py b/airflow/settings.py
index 374960ab3e..be05589f94 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -640,3 +640,5 @@ DASHBOARD_UIALERTS: List["UIAlert"] = []
 
 # Prefix used to identify tables holding data moved during migration.
 AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
+
+DAEMON_UMASK: str = conf.get('core', 'daemon_umask', fallback='0o077')
diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py
index 14a6f52e9d..2bc9796cdd 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -354,6 +354,7 @@ class TestFlowerCommand(unittest.TestCase):
                 pidfile=mock_pid_file.return_value,
                 stderr=mock_open.return_value,
                 stdout=mock_open.return_value,
+                umask=0o077,
             ),
             mock.call.DaemonContext().__enter__(),
             mock.call.DaemonContext().__exit__(None, None, None),
diff --git a/tests/cli/commands/test_kerberos_command.py b/tests/cli/commands/test_kerberos_command.py
index 007bd2f41b..855b0515df 100644
--- a/tests/cli/commands/test_kerberos_command.py
+++ b/tests/cli/commands/test_kerberos_command.py
@@ -75,6 +75,7 @@ class TestKerberosCommand(unittest.TestCase):
                 pidfile=mock_pid_file.return_value,
                 stderr=mock_open.return_value,
                 stdout=mock_open.return_value,
+                umask=0o077,
             ),
             mock.call.DaemonContext().__enter__(),
             mock.call.DaemonContext().__exit__(None, None, None),


[airflow] 40/45: fix: change disable_verify_ssl behaviour (#25023)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cfce076abd7718fdfff8d928a57b510c6b49531c
Author: wselfjes <84...@users.noreply.github.com>
AuthorDate: Thu Jul 28 20:46:40 2022 +0300

    fix: change disable_verify_ssl behaviour (#25023)
    
    The problem is that verify_ssl is overwritten by the
    configuration from the kube_config or load_incluster_config file.
    
    (cherry picked from commit 2071519e7462cfc7613c50dc42acb4672dbca4a7)
---
 airflow/kubernetes/kube_client.py | 11 +++++++----
 tests/kubernetes/test_client.py   | 13 +++++++++++++
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/airflow/kubernetes/kube_client.py b/airflow/kubernetes/kube_client.py
index 7e6ba05119..c42ef6191f 100644
--- a/airflow/kubernetes/kube_client.py
+++ b/airflow/kubernetes/kube_client.py
@@ -30,7 +30,10 @@ try:
     has_kubernetes = True
 
     def _disable_verify_ssl() -> None:
-        configuration = Configuration()
+        if hasattr(Configuration, 'get_default_copy'):
+            configuration = Configuration.get_default_copy()
+        else:
+            configuration = Configuration()
         configuration.verify_ssl = False
         Configuration.set_default(configuration)
 
@@ -100,9 +103,6 @@ def get_kube_client(
     if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
         _enable_tcp_keepalive()
 
-    if not conf.getboolean('kubernetes', 'verify_ssl'):
-        _disable_verify_ssl()
-
     if in_cluster:
         config.load_incluster_config()
     else:
@@ -112,4 +112,7 @@ def get_kube_client(
             config_file = conf.get('kubernetes', 'config_file', fallback=None)
         config.load_kube_config(config_file=config_file, context=cluster_context)
 
+    if not conf.getboolean('kubernetes', 'verify_ssl'):
+        _disable_verify_ssl()
+
     return client.CoreV1Api()
diff --git a/tests/kubernetes/test_client.py b/tests/kubernetes/test_client.py
index ce040cf3ed..d144456c49 100644
--- a/tests/kubernetes/test_client.py
+++ b/tests/kubernetes/test_client.py
@@ -38,6 +38,19 @@ class TestClient(unittest.TestCase):
         assert config.load_incluster_config.not_called
         assert config.load_kube_config.called
 
+    @mock.patch('airflow.kubernetes.kube_client.config')
+    @mock.patch('airflow.kubernetes.kube_client.conf')
+    def test_load_config_disable_ssl(self, conf, config):
+        conf.getboolean.return_value = False
+        get_kube_client(in_cluster=False)
+        conf.getboolean.assert_called_with('kubernetes', 'verify_ssl')
+        # Support wide range of kube client libraries
+        if hasattr(Configuration, 'get_default_copy'):
+            configuration = Configuration.get_default_copy()
+        else:
+            configuration = Configuration()
+        self.assertFalse(configuration.verify_ssl)
+
     def test_enable_tcp_keepalive(self):
         socket_options = [
             (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),


[airflow] 01/45: Clear next method when clearing TIs (#23929)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bf9f4e128267a2e4077daee99784aaf5668809ca
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Tue Jun 28 12:40:34 2022 +0300

    Clear next method when clearing TIs (#23929)
    
    (cherry picked from commit a5ef7a02e12071aac5d19a2a0792603c63b65adf)
---
 airflow/models/taskinstance.py  |  1 +
 tests/models/test_cleartasks.py | 23 +++++++++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index fe3387ecf0..debd0aa6b0 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -229,6 +229,7 @@ def clear_task_instances(
                 ti.max_tries = max(ti.max_tries, ti.prev_attempted_tries)
             ti.state = None
             ti.external_executor_id = None
+            ti.clear_next_method_args()
             session.merge(ti)
 
         task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id)
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index ba08692d42..05ff9df458 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -106,6 +106,29 @@ class TestClearTasks:
             assert ti0.state is None
             assert ti0.external_executor_id is None
 
+    def test_clear_task_instances_next_method(self, dag_maker, session):
+        with dag_maker(
+            'test_clear_task_instances_next_method',
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+        ) as dag:
+            EmptyOperator(task_id='task0')
+
+        ti0 = dag_maker.create_dagrun().task_instances[0]
+        ti0.state = State.DEFERRED
+        ti0.next_method = "next_method"
+        ti0.next_kwargs = {}
+
+        session.add(ti0)
+        session.commit()
+
+        clear_task_instances([ti0], session, dag=dag)
+
+        ti0.refresh_from_db()
+
+        assert ti0.next_method is None
+        assert ti0.next_kwargs is None
+
     @pytest.mark.parametrize(
         ["state", "last_scheduling"], [(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)]
     )


[airflow] 07/45: Note how DAG policy works with default_args (#24804)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 98baf6054b0ffb9277508be871363a9362ac4c76
Author: Mario Taddeucci <ma...@gmx.com>
AuthorDate: Wed Aug 3 04:31:25 2022 -0300

    Note how DAG policy works with default_args (#24804)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 298be502c35006b7c3f011b676dbb4db0633bc74)
---
 docs/apache-airflow/concepts/cluster-policies.rst | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/docs/apache-airflow/concepts/cluster-policies.rst b/docs/apache-airflow/concepts/cluster-policies.rst
index e1c664660a..8bec4db4c3 100644
--- a/docs/apache-airflow/concepts/cluster-policies.rst
+++ b/docs/apache-airflow/concepts/cluster-policies.rst
@@ -54,6 +54,10 @@ This policy checks if each DAG has at least one tag defined:
 
     To avoid import cycles, if you use ``DAG`` in type annotations in your cluster policy, be sure to import from ``airflow.models`` and not from ``airflow``.
 
+.. note::
+
+    DAG policies are applied after the DAG has been completely loaded, so overriding the ``default_args`` parameter has no effect. If you want to override the default operator settings, use task policies instead.
+
 Task policies
 -------------
 


[airflow] 35/45: Refactor DR.task_instance_scheduling_decisions (#24774)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 626814aec802586a366403e7d8e95c07eaaf94bc
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Sat Jul 2 08:06:27 2022 +0800

    Refactor DR.task_instance_scheduling_decisions (#24774)
    
    (cherry picked from commit 5d5d62e41e93fe9845c96ab894047422761023d8)
---
 airflow/models/dagrun.py | 63 +++++++++++++++++++++++++++---------------------
 1 file changed, 35 insertions(+), 28 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 968f360de7..0e7d4e1374 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -638,14 +638,23 @@ class DagRun(Base, LoggingMixin):
 
     @provide_session
     def task_instance_scheduling_decisions(self, session: Session = NEW_SESSION) -> TISchedulingDecision:
+        tis = self.get_task_instances(session=session, state=State.task_states)
+        self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
 
-        schedulable_tis: List[TI] = []
-        changed_tis = False
+        def _filter_tis_and_exclude_removed(dag: "DAG", tis: List[TI]) -> Iterable[TI]:
+            """Populate ``ti.task`` while excluding those missing one, marking them as REMOVED."""
+            for ti in tis:
+                try:
+                    ti.task = dag.get_task(ti.task_id)
+                except TaskNotFound:
+                    self.log.error("Failed to get task for ti %s. Marking it as removed.", ti)
+                    ti.state = State.REMOVED
+                    session.flush()
+                else:
+                    yield ti
 
-        tis = list(self.get_task_instances(session=session, state=State.task_states))
-        self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis))
-        dag = self.get_dag()
-        missing_indexes = self._find_missing_task_indexes(dag, tis, session=session)
+        tis = list(_filter_tis_and_exclude_removed(self.get_dag(), tis))
+        missing_indexes = self._find_missing_task_indexes(tis, session=session)
         if missing_indexes:
             self.verify_integrity(missing_indexes=missing_indexes, session=session)
 
@@ -666,6 +675,9 @@ class DagRun(Base, LoggingMixin):
                 new_unfinished_tis = [t for t in unfinished_tis if t.state in State.unfinished]
                 finished_tis.extend(t for t in unfinished_tis if t.state in State.finished)
                 unfinished_tis = new_unfinished_tis
+        else:
+            schedulable_tis = []
+            changed_tis = False
 
         return TISchedulingDecision(
             tis=tis,
@@ -1068,38 +1080,33 @@ class DagRun(Base, LoggingMixin):
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(self, dag, tis, *, session) -> Dict["MappedOperator", Sequence[int]]:
-        """
-        Here we check if the length of the mapped task instances changed
-        at runtime. If so, we find the missing indexes.
-
-        This function also marks task instances with missing tasks as REMOVED.
+    def _find_missing_task_indexes(
+        self,
+        tis: Iterable[TI],
+        *,
+        session: Session,
+    ) -> Dict["MappedOperator", Sequence[int]]:
+        """Check if the length of the mapped task instances changed at runtime and find the missing indexes.
 
-        :param dag: DAG object corresponding to the dagrun
-        :param tis: task instances to check
-        :param session: the session to use
+        :param tis: Task instances to check
+        :param session: The session to use
         """
-        existing_indexes: Dict["MappedOperator", list] = defaultdict(list)
-        new_indexes: Dict["MappedOperator", Sequence[int]] = defaultdict(list)
-        for ti in tis:
-            try:
-                task = ti.task = dag.get_task(ti.task_id)
-            except TaskNotFound:
-                self.log.error("Failed to get task '%s' for dag '%s'. Marking it as removed.", ti, ti.dag_id)
+        from airflow.models.mappedoperator import MappedOperator
 
-                ti.state = State.REMOVED
-                session.flush()
-                continue
-            if not task.is_mapped:
+        existing_indexes: Dict[MappedOperator, List[int]] = defaultdict(list)
+        new_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
+        for ti in tis:
+            task = ti.task
+            if not isinstance(task, MappedOperator):
                 continue
             # skip unexpanded tasks and also tasks that expands with literal arguments
             if ti.map_index < 0 or task.parse_time_mapped_ti_count:
                 continue
             existing_indexes[task].append(ti.map_index)
-            task.run_time_mapped_ti_count.cache_clear()
+            task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
             new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
             new_indexes[task] = range(new_length)
-        missing_indexes: Dict["MappedOperator", Sequence[int]] = defaultdict(list)
+        missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
         for k, v in existing_indexes.items():
             missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
         return missing_indexes


[airflow] 18/45: No grid auto-refresh for backfill dag runs (#25042)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7d2e516eaf6dbe8e4d1b23355d57359d61fae06c
Author: yingxuanwangxuan <95...@users.noreply.github.com>
AuthorDate: Wed Jul 20 18:30:22 2022 +0800

    No grid auto-refresh for backfill dag runs (#25042)
    
    * Update useGridData.ts
    
    * Update useGridData.test.js
    
    * Update useGridData.test.js
    
    (cherry picked from commit de6938e173773d88bd741e43c7b0aa16d8a1a167)
---
 airflow/www/static/js/grid/api/useGridData.test.js | 30 ++++++++++++++++++----
 airflow/www/static/js/grid/api/useGridData.ts      |  2 +-
 2 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/airflow/www/static/js/grid/api/useGridData.test.js b/airflow/www/static/js/grid/api/useGridData.test.js
index 29a7f1ac8a..f8c7d451c6 100644
--- a/airflow/www/static/js/grid/api/useGridData.test.js
+++ b/airflow/www/static/js/grid/api/useGridData.test.js
@@ -24,17 +24,37 @@ import { areActiveRuns } from './useGridData';
 describe('Test areActiveRuns()', () => {
   test('Correctly detects active runs', () => {
     const runs = [
-      { state: 'success' },
-      { state: 'queued' },
+      { runType: 'scheduled', state: 'success' },
+      { runType: 'manual', state: 'queued' },
     ];
     expect(areActiveRuns(runs)).toBe(true);
   });
 
   test('Returns false when all runs are resolved', () => {
     const runs = [
-      { state: 'success' },
-      { state: 'failed' },
-      { state: 'not_queued' },
+      { runType: 'scheduled', state: 'success' },
+      { runType: 'manual', state: 'failed' },
+      { runType: 'manual', state: 'not_queued' },
+    ];
+    const result = areActiveRuns(runs);
+    expect(result).toBe(false);
+  });
+
+  test('Returns false when filtering runs runtype ["backfill"] and state ["not_queued"]', () => {
+    const runs = [
+      { runType: 'scheduled', state: 'success' },
+      { runType: 'manual', state: 'failed' },
+      { runType: 'backfill', state: 'not_queued' },
+    ];
+    const result = areActiveRuns(runs);
+    expect(result).toBe(false);
+  });
+
+  test('Returns false when filtering runs runtype ["backfill"] and state ["queued"]', () => {
+    const runs = [
+      { runType: 'scheduled', state: 'success' },
+      { runType: 'manual', state: 'failed' },
+      { runType: 'backfill', state: 'queued' },
     ];
     const result = areActiveRuns(runs);
     expect(result).toBe(false);
diff --git a/airflow/www/static/js/grid/api/useGridData.ts b/airflow/www/static/js/grid/api/useGridData.ts
index ec12ee6d60..3af43df9b1 100644
--- a/airflow/www/static/js/grid/api/useGridData.ts
+++ b/airflow/www/static/js/grid/api/useGridData.ts
@@ -49,7 +49,7 @@ const emptyGridData: GridData = {
   },
 };
 
-export const areActiveRuns = (runs: DagRun[] = []) => runs.filter((run) => ['queued', 'running', 'scheduled'].includes(run.state)).length > 0;
+export const areActiveRuns = (runs: DagRun[] = []) => runs.filter((run) => ['manual', 'manual'].includes(run.runType)).filter((run) => ['queued', 'running', 'scheduled'].includes(run.state)).length > 0;
 
 const useGridData = () => {
   const { isRefreshOn, stopRefresh } = useAutoRefresh();


[airflow] 13/45: Extends resolve_xcom_backend function level documentation (#24965)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 977d11dbed16555bd36deab632642ed2523f2397
Author: Frank Cash <fc...@astronomer.io>
AuthorDate: Mon Jul 11 14:10:30 2022 -0400

    Extends resolve_xcom_backend function level documentation (#24965)
    
    * # This is a combination of 2 commits.
    # This is the 1st commit message:
    
    Extends resolve_xcom_backend documentation
    # This is the commit message #2:
    
    properly stylize XCom
    
    * Extends resolve_xcom_backend documentation
    
    properly stylize XCom
    
    Final docstring update
    
    Extends resolve_xcom_backend documentation
    
    properly stylize XCom
    
    Final docstring update
    
    * Stylistic changes
    
    (cherry picked from commit 178af9d24772a8866ac55d25eeb48bed77337031)
---
 airflow/models/xcom.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/models/xcom.py b/airflow/models/xcom.py
index aad720bd8b..a3d74aa48e 100644
--- a/airflow/models/xcom.py
+++ b/airflow/models/xcom.py
@@ -658,7 +658,11 @@ def _get_function_params(function) -> List[str]:
 
 
 def resolve_xcom_backend() -> Type[BaseXCom]:
-    """Resolves custom XCom class"""
+    """Resolves custom XCom class
+
+    Confirms that custom XCom class extends the BaseXCom.
+    Compares the function signature of the custom XCom serialize_value to the base XCom serialize_value.
+    """
     clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
     if not clazz:
         return BaseXCom


[airflow] 06/45: Update PythonVirtualenvOperator Howto (#24782)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 03b2fad778f6060cefda5dbfdfd36e35f88f8a70
Author: nanohanno <44...@users.noreply.github.com>
AuthorDate: Thu Jul 7 20:44:59 2022 +0200

    Update PythonVirtualenvOperator Howto (#24782)
    
    * Update PythonVirtualenvOperator Howto
    
    (cherry picked from commit 011fbae583a4347eda85d5b7797a80d46ff06062)
---
 docs/apache-airflow/howto/operator/python.rst | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 210b528cf2..ac9070208f 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -60,8 +60,7 @@ is evaluated as a :ref:`Jinja template <concepts:jinja-templating>`.
 PythonVirtualenvOperator
 ========================
 
-Use the :class:`~airflow.operators.python.PythonVirtualenvOperator` to execute
-Python callables inside a new Python virtual environment.
+Use the :class:`~airflow.operators.python.PythonVirtualenvOperator` to execute Python callables inside a new Python virtual environment. The ``virtualenv`` package needs to be installed in the environment that runs Airflow (as optional dependency ``pip install airflow[virtualenv] --constraint ...``).
 
 .. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
     :language: python


[airflow] 20/45: Fix invalidateQueries call (#25097)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit af702bf69c717c08c0d3c5d518b28f385bc03c9c
Author: pierrejeambrun <pi...@gmail.com>
AuthorDate: Fri Jul 15 19:03:24 2022 +0200

    Fix invalidateQueries call (#25097)
    
    (cherry picked from commit b4e9c678d7375401bb548302a2074a215aec5dc0)
---
 airflow/www/static/js/grid/api/useClearTask.js       | 2 +-
 airflow/www/static/js/grid/api/useMarkFailedTask.js  | 2 +-
 airflow/www/static/js/grid/api/useMarkSuccessTask.js | 2 +-
 airflow/www/static/js/grid/api/useRunTask.js         | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/airflow/www/static/js/grid/api/useClearTask.js b/airflow/www/static/js/grid/api/useClearTask.js
index b9a1b389ac..c78484b9f7 100644
--- a/airflow/www/static/js/grid/api/useClearTask.js
+++ b/airflow/www/static/js/grid/api/useClearTask.js
@@ -66,7 +66,7 @@ export default function useClearTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('gridData');
-        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
+        queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]);
         startRefresh();
       },
       onError: (error) => errorToast({ error }),
diff --git a/airflow/www/static/js/grid/api/useMarkFailedTask.js b/airflow/www/static/js/grid/api/useMarkFailedTask.js
index 5a5d8fbf50..5c613c557b 100644
--- a/airflow/www/static/js/grid/api/useMarkFailedTask.js
+++ b/airflow/www/static/js/grid/api/useMarkFailedTask.js
@@ -62,7 +62,7 @@ export default function useMarkFailedTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('gridData');
-        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
+        queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]);
         startRefresh();
       },
       onError: (error) => errorToast({ error }),
diff --git a/airflow/www/static/js/grid/api/useMarkSuccessTask.js b/airflow/www/static/js/grid/api/useMarkSuccessTask.js
index 3c7ea1e42a..117f26229b 100644
--- a/airflow/www/static/js/grid/api/useMarkSuccessTask.js
+++ b/airflow/www/static/js/grid/api/useMarkSuccessTask.js
@@ -63,7 +63,7 @@ export default function useMarkSuccessTask({
     {
       onSuccess: () => {
         queryClient.invalidateQueries('gridData');
-        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
+        queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]);
         startRefresh();
       },
       onError: (error) => errorToast({ error }),
diff --git a/airflow/www/static/js/grid/api/useRunTask.js b/airflow/www/static/js/grid/api/useRunTask.js
index abfa34f8dc..cbac466b50 100644
--- a/airflow/www/static/js/grid/api/useRunTask.js
+++ b/airflow/www/static/js/grid/api/useRunTask.js
@@ -60,7 +60,7 @@ export default function useRunTask(dagId, runId, taskId) {
     {
       onSuccess: () => {
         queryClient.invalidateQueries('gridData');
-        queryClient.invalidateQueries('mappedInstances', dagId, runId, taskId);
+        queryClient.invalidateQueries(['mappedInstances', dagId, runId, taskId]);
         startRefresh();
       },
       onError: (error) => errorToast({ error }),


[airflow] 29/45: Fix "This Session's transaction has been rolled back" (#25532)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 18f13384f1f4b551ddbd9c20c0fe25b21be37b17
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Aug 5 17:15:31 2022 +0100

    Fix "This Session's transaction has been rolled back" (#25532)
    
    Accessing the run_id(self.run_id) on exception leads to error because sessions are invalidated on exception. Here we extract the run_id before handling the exception
    
    (cherry picked from commit 5668888a7e1074a620b3d38f407ecf1aa055b623)
---
 airflow/models/dagrun.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8589f6878e..968f360de7 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1044,6 +1044,10 @@ class DagRun(Base, LoggingMixin):
         :param session: the session to use
 
         """
+        # Fetch the information we need before handling the exception to avoid
+        # PendingRollbackError due to the session being invalidated on exception
+        # see https://github.com/apache/superset/pull/530
+        run_id = self.run_id
         try:
             if hook_is_noop:
                 session.bulk_insert_mappings(TI, tasks)
@@ -1057,7 +1061,7 @@ class DagRun(Base, LoggingMixin):
             self.log.info(
                 'Hit IntegrityError while creating the TIs for %s- %s',
                 dag_id,
-                self.run_id,
+                run_id,
                 exc_info=True,
             )
             self.log.info('Doing session rollback.')


[airflow] 41/45: Fix `airflow db reset` when dangling tables exist (#25441)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5c2fce92f46479c6948c358ab675dd41b3d177fd
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Mon Aug 1 13:58:31 2022 +0100

    Fix `airflow db reset` when dangling tables exist (#25441)
    
    If one of the "dangling" tables already existed in the DB, performing an
    `airflow db reset` would delete the tables, but it would then try and
    _re-create_ the table later. This was because the Table object was still
    associated with the Metadata object.
    
    The fix is to remove the it from Metadata once we have dropped it.
    
    (cherry picked from commit 40eefd84797f5085e6c3fef6cbd6f713ceb3c3d8)
---
 airflow/utils/db.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index a86222e3b8..192a2343c4 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1582,7 +1582,8 @@ def drop_airflow_moved_tables(session):
     tables = set(inspect(session.get_bind()).get_table_names())
     to_delete = [Table(x, Base.metadata) for x in tables if x.startswith(AIRFLOW_MOVED_TABLE_PREFIX)]
     for tbl in to_delete:
-        tbl.drop(settings.engine, checkfirst=True)
+        tbl.drop(settings.engine, checkfirst=False)
+        Base.metadata.remove(tbl)
 
 
 def drop_flask_models(connection):


[airflow] 33/45: Adding mysql index hint to use index on task_instance.state in critical section query (#25673)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cf448ea1746a05ddd2cbb6005f7c238e69083d0f
Author: Michael Petro <40...@users.noreply.github.com>
AuthorDate: Fri Aug 12 07:28:55 2022 -0400

    Adding mysql index hint to use index on task_instance.state in critical section query (#25673)
    
    (cherry picked from commit 134b5551db67f17b4268dce552e87a154aa1e794)
---
 airflow/jobs/scheduler_job.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 539e6f1eff..888922840b 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -314,6 +314,7 @@ class SchedulerJob(BaseJob):
             # and the dag is not paused
             query = (
                 session.query(TI)
+                .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
                 .join(TI.dag_run)
                 .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
                 .join(TI.dag_model)


[airflow] 34/45: Don't mistakenly take a lock on DagRun via ti.refresh_from_fb (#25312)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 78fa95ca330f7e331118111ce4ff13ab1ce09c19
Author: Ash Berlin-Taylor <as...@apache.org>
AuthorDate: Tue Aug 9 15:17:41 2022 +0100

    Don't mistakenly take a lock on DagRun via ti.refresh_from_fb (#25312)
    
    In 2.2.0 we made TI.dag_run be automatically join-loaded, which is fine
    for most cases, but for `refresh_from_db` we don't need that (we don't
    access anything under ti.dag_run) and it's possible that when
    `lock_for_update=True` is passed we are locking more than we want to and
    _might_ cause deadlocks.
    
    Even if it doesn't, selecting more than we need is wasteful.
    
    (cherry picked from commit be2b53eaaf6fc136db8f3fa3edd797a6c529409a)
---
 airflow/models/taskinstance.py   | 28 ++++++++++++++++++----------
 tests/jobs/test_scheduler_job.py |  8 +++++---
 2 files changed, 23 insertions(+), 13 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 33fe7a3f53..afcc469feb 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -308,6 +308,7 @@ def clear_task_instances(
             if dag_run_state == DagRunState.QUEUED:
                 dr.last_scheduling_decision = None
                 dr.start_date = None
+    session.flush()
 
 
 class _LazyXComAccessIterator(collections.abc.Iterator):
@@ -879,28 +880,35 @@ class TaskInstance(Base, LoggingMixin):
         """
         self.log.debug("Refreshing TaskInstance %s from DB", self)
 
-        qry = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == self.dag_id,
-            TaskInstance.task_id == self.task_id,
-            TaskInstance.run_id == self.run_id,
-            TaskInstance.map_index == self.map_index,
+        if self in session:
+            session.refresh(self, TaskInstance.__mapper__.column_attrs.keys())
+
+        qry = (
+            # To avoid joining any relationships, by default select all
+            # columns, not the object. This also means we get (effectively) a
+            # namedtuple back, not a TI object
+            session.query(*TaskInstance.__table__.columns).filter(
+                TaskInstance.dag_id == self.dag_id,
+                TaskInstance.task_id == self.task_id,
+                TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index,
+            )
         )
 
         if lock_for_update:
             for attempt in run_with_db_retries(logger=self.log):
                 with attempt:
-                    ti: Optional[TaskInstance] = qry.with_for_update().first()
+                    ti: Optional[TaskInstance] = qry.with_for_update().one_or_none()
         else:
-            ti = qry.first()
+            ti = qry.one_or_none()
         if ti:
             # Fields ordered per model definition
             self.start_date = ti.start_date
             self.end_date = ti.end_date
             self.duration = ti.duration
             self.state = ti.state
-            # Get the raw value of try_number column, don't read through the
-            # accessor here otherwise it will be incremented by one already.
-            self.try_number = ti._try_number
+            # Since we selected columns, not the object, this is the raw value
+            self.try_number = ti.try_number
             self.max_tries = ti.max_tries
             self.hostname = ti.hostname
             self.unixname = ti.unixname
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index dfd66770c0..c9c1ab166a 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -457,7 +457,8 @@ class TestSchedulerJob:
         (ti1,) = dr1.task_instances
         ti1.state = State.SCHEDULED
 
-        self.scheduler_job._critical_section_execute_task_instances(session)
+        self.scheduler_job._critical_section_enqueue_task_instances(session)
+        session.flush()
         ti1.refresh_from_db(session=session)
         assert State.SCHEDULED == ti1.state
         session.rollback()
@@ -1315,8 +1316,9 @@ class TestSchedulerJob:
         session.commit()
 
         with patch.object(BaseExecutor, 'queue_command') as mock_queue_command:
-            self.scheduler_job._enqueue_task_instances_with_queued_state([ti])
-        ti.refresh_from_db()
+            self.scheduler_job._enqueue_task_instances_with_queued_state([ti], session=session)
+        session.flush()
+        ti.refresh_from_db(session=session)
         assert ti.state == State.NONE
         mock_queue_command.assert_not_called()
 


[airflow] 42/45: Fix reducing mapped length of a mapped task at runtime after a clear (#25531)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 31a4b8cf1f803343fdc9681bfa2784132c05411b
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Fri Aug 5 10:30:51 2022 +0100

    Fix reducing mapped length of a mapped task at runtime after a clear (#25531)
    
    The previous fix on task immutability after a run did not fix a case where the task was removed at runtime when the literal is dynamic. This PR addreses it
    
    (cherry picked from commit d3028ada36a43a0d549d22c280fb16d868b90b6d)
---
 airflow/models/dagrun.py    | 12 +++++++--
 tests/models/test_dagrun.py | 64 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 74 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 1b746e8a06..8b7f3a1c39 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -655,7 +655,7 @@ class DagRun(Base, LoggingMixin):
                     yield ti
 
         tis = list(_filter_tis_and_exclude_removed(self.get_dag(), tis))
-        missing_indexes = self._find_missing_task_indexes(tis, session=session)
+        missing_indexes = self._revise_mapped_task_indexes(tis, session=session)
         if missing_indexes:
             self.verify_integrity(missing_indexes=missing_indexes, session=session)
 
@@ -1081,7 +1081,7 @@ class DagRun(Base, LoggingMixin):
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(
+    def _revise_mapped_task_indexes(
         self,
         tis: Iterable[TI],
         *,
@@ -1106,6 +1106,14 @@ class DagRun(Base, LoggingMixin):
             existing_indexes[task].append(ti.map_index)
             task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
             new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
+
+            if ti.map_index >= new_length:
+                self.log.debug(
+                    "Removing task '%s' as the map_index is longer than the resolved mapping list (%d)",
+                    ti,
+                    new_length,
+                )
+                ti.state = State.REMOVED
             new_indexes[task] = range(new_length)
         missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
         for k, v in existing_indexes.items():
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 6c3cc1c91c..e28b203640 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1207,6 +1207,70 @@ def test_mapped_literal_length_increase_at_runtime_adds_additional_tis(dag_maker
     ]
 
 
+def test_mapped_literal_length_reduction_at_runtime_adds_removed_state(dag_maker, session):
+    """
+    Test that when the length of mapped literal reduces at runtime, the missing task instances
+    are marked as removed
+    """
+    from airflow.models import Variable
+
+    Variable.set(key='arg1', value=[1, 2, 3])
+
+    @task
+    def task_1():
+        return Variable.get('arg1', deserialize_json=True)
+
+    with dag_maker(session=session) as dag:
+
+        @task
+        def task_2(arg2):
+            ...
+
+        task_2.expand(arg2=task_1())
+
+    dr = dag_maker.create_dagrun()
+    ti = dr.get_task_instance(task_id='task_1')
+    ti.run()
+    dr.task_instance_scheduling_decisions()
+    tis = dr.get_task_instances()
+    indices = [(ti.map_index, ti.state) for ti in tis if ti.map_index >= 0]
+    assert sorted(indices) == [
+        (0, State.NONE),
+        (1, State.NONE),
+        (2, State.NONE),
+    ]
+
+    # Now "clear" and "reduce" the length of literal
+    dag.clear()
+    Variable.set(key='arg1', value=[1, 2])
+
+    with dag:
+        task_2.expand(arg2=task_1()).operator
+
+    # At this point, we need to test that the change works on the serialized
+    # DAG (which is what the scheduler operates on)
+    serialized_dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+
+    dr.dag = serialized_dag
+
+    # Run the first task again to get the new lengths
+    ti = dr.get_task_instance(task_id='task_1')
+    task1 = dag.get_task('task_1')
+    ti.refresh_from_task(task1)
+    ti.run()
+
+    # this would be called by the localtask job
+    dr.task_instance_scheduling_decisions()
+    tis = dr.get_task_instances()
+
+    indices = [(ti.map_index, ti.state) for ti in tis if ti.map_index >= 0]
+    assert sorted(indices) == [
+        (0, State.NONE),
+        (1, State.NONE),
+        (2, TaskInstanceState.REMOVED),
+    ]
+
+
 @pytest.mark.need_serialized_dag
 def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
     literal = [1, 2, 3, 4]


[airflow] 03/45: Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 394ebc80b5019aaeba8972801d49b0d06a4d30c3
Author: Tanel Kiis <ta...@users.noreply.github.com>
AuthorDate: Wed Jul 6 12:35:58 2022 +0300

    Send DAG timeout callbacks to processor outside of prohibit_commit (#24366)
    
    (cherry picked from commit 438d13e73aeba752c5f65f8fe7c0cee082fbcc42)
---
 airflow/jobs/scheduler_job.py    |  5 +---
 tests/jobs/test_scheduler_job.py | 52 +++++++++++++++++++++++++++++++++-------
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 3613b9be47..8e22065618 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1149,10 +1149,7 @@ class SchedulerJob(BaseJob):
                 msg='timed_out',
             )
 
-            # Send SLA & DAG Success/Failure Callbacks to be executed
-            self._send_dag_callbacks_to_processor(dag, callback_to_execute)
-            # Because we send the callback here, we need to return None
-            return callback
+            return callback_to_execute
 
         if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
             self.log.error("Execution date is in future: %s", dag_run.execution_date)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index de909ef6e6..6027867ab9 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -46,7 +46,7 @@ from airflow.jobs.backfill_job import BackfillJob
 from airflow.jobs.base_job import BaseJob
 from airflow.jobs.local_task_job import LocalTaskJob
 from airflow.jobs.scheduler_job import SchedulerJob
-from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
+from airflow.models import DAG, DagBag, DagModel, DbCallbackRequest, Pool, TaskInstance
 from airflow.models.dagrun import DagRun
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
@@ -1613,7 +1613,6 @@ class TestSchedulerJob:
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.dagbag = dag_maker.dagbag
-        self.scheduler_job.executor = MockExecutor()
 
         session = settings.Session()
         orm_dag = session.query(DagModel).get(dag.dag_id)
@@ -1639,7 +1638,7 @@ class TestSchedulerJob:
         # Mock that processor_agent is started
         self.scheduler_job.processor_agent = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, session)
+        callback = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -1658,8 +1657,8 @@ class TestSchedulerJob:
             msg="timed_out",
         )
 
-        # Verify dag failure callback request is sent to file processor
-        self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
+        # Verify dag failure callback request is sent
+        assert callback == expected_callback
 
         session.rollback()
         session.close()
@@ -1680,12 +1679,11 @@ class TestSchedulerJob:
 
         self.scheduler_job = SchedulerJob(subdir=os.devnull)
         self.scheduler_job.dagbag = dag_maker.dagbag
-        self.scheduler_job.executor = MockExecutor()
 
         # Mock that processor_agent is started
         self.scheduler_job.processor_agent = mock.Mock()
 
-        self.scheduler_job._schedule_dag_run(dr, session)
+        callback = self.scheduler_job._schedule_dag_run(dr, session)
         session.flush()
 
         session.refresh(dr)
@@ -1699,8 +1697,8 @@ class TestSchedulerJob:
             msg="timed_out",
         )
 
-        # Verify dag failure callback request is sent to file processor
-        self.scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
+        # Verify dag failure callback request is sent
+        assert callback == expected_callback
 
         session.rollback()
         session.close()
@@ -1780,6 +1778,42 @@ class TestSchedulerJob:
         session.rollback()
         session.close()
 
+    def test_dagrun_timeout_callbacks_are_stored_in_database(self, dag_maker, session):
+        with dag_maker(
+            dag_id='test_dagrun_timeout_callbacks_are_stored_in_database',
+            on_failure_callback=lambda x: print("failed"),
+            dagrun_timeout=timedelta(hours=1),
+        ) as dag:
+            EmptyOperator(task_id='empty')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.executor.callback_sink = DatabaseCallbackSink()
+        self.scheduler_job.dagbag = dag_maker.dagbag
+        self.scheduler_job.processor_agent = mock.Mock()
+
+        dr = dag_maker.create_dagrun(start_date=DEFAULT_DATE)
+
+        with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
+            self.scheduler_job._do_scheduling(session)
+
+        callback = (
+            session.query(DbCallbackRequest)
+            .order_by(DbCallbackRequest.id.desc())
+            .first()
+            .get_callback_request()
+        )
+
+        expected_callback = DagCallbackRequest(
+            full_filepath=dag.fileloc,
+            dag_id=dr.dag_id,
+            is_failure_callback=True,
+            run_id=dr.run_id,
+            msg='timed_out',
+        )
+
+        assert callback == expected_callback
+
     def test_dagrun_callbacks_commited_before_sent(self, dag_maker):
         """
         Tests that before any callbacks are sent to the processor, the session is committed. This ensures


[airflow] 05/45: TriggerDagRunOperator.operator_extra_links is attr (#24676)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8a95092c9ecd1f930d82880618236c3d526dff8e
Author: Tzu-ping Chung <ur...@gmail.com>
AuthorDate: Tue Jun 28 10:59:00 2022 +0800

    TriggerDagRunOperator.operator_extra_links is attr (#24676)
    
    There's absolutely no reason this needs to be a property. And it cannot
    be since we need to access this at the class level.
    
    (cherry picked from commit 8dcafdfcdddc77fdfd2401757dcbc15bfec76d6b)
---
 airflow/operators/trigger_dagrun.py | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/airflow/operators/trigger_dagrun.py b/airflow/operators/trigger_dagrun.py
index 4578fd2df8..8045b5796a 100644
--- a/airflow/operators/trigger_dagrun.py
+++ b/airflow/operators/trigger_dagrun.py
@@ -87,11 +87,7 @@ class TriggerDagRunOperator(BaseOperator):
     template_fields: Sequence[str] = ("trigger_dag_id", "trigger_run_id", "execution_date", "conf")
     template_fields_renderers = {"conf": "py"}
     ui_color = "#ffefeb"
-
-    @property
-    def operator_extra_links(self):
-        """Return operator extra links"""
-        return [TriggerDagRunLink()]
+    operator_extra_links = [TriggerDagRunLink()]
 
     def __init__(
         self,


[airflow] 17/45: chore: fix typo (#25010)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 0eec87e2b3a0cfce431e486ce4a02a3525293dce
Author: Emily Richmond <10...@users.noreply.github.com>
AuthorDate: Tue Jul 12 15:18:41 2022 -0800

    chore: fix typo (#25010)
    
    fix typo in bullet list under Completing our DAG from "a" to "at"
    
    (cherry picked from commit c85af2dfe48acf77fe3b9293d9f9e27dd9a0c89d)
---
 docs/apache-airflow/tutorial.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 12a695281a..31a1824b3c 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -538,7 +538,7 @@ Completing our DAG:
 ~~~~~~~~~~~~~~~~~~~
 We've developed our tasks, now we need to wrap them in a DAG, which enables us to define when and how tasks should run, and state any dependencies that tasks have on other tasks. The DAG below is configured to:
 
-* run every day a midnight starting on Jan 1, 2021,
+* run every day at midnight starting on Jan 1, 2021,
 * only run once in the event that days are missed, and
 * timeout after 60 minutes
 


[airflow] 24/45: convert TimeSensorAsync target_time to utc on call time (#25221)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 43163870862921527033271e940dad0bda8cb763
Author: GitStart-AirFlow <10...@users.noreply.github.com>
AuthorDate: Fri Jul 22 22:03:46 2022 +0100

    convert TimeSensorAsync target_time to utc on call time (#25221)
    
    (cherry picked from commit ddaf74df9b1e9a4698d719f81931e822b21b0a95)
---
 airflow/sensors/time_sensor.py    | 4 +++-
 tests/sensors/test_time_sensor.py | 7 +++++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index 117390925d..edd8a7689d 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -51,10 +51,12 @@ class TimeSensorAsync(BaseSensorOperator):
         super().__init__(**kwargs)
         self.target_time = target_time
 
-        self.target_datetime = timezone.coerce_datetime(
+        aware_time = timezone.coerce_datetime(
             datetime.datetime.combine(datetime.datetime.today(), self.target_time)
         )
 
+        self.target_datetime = timezone.convert_to_utc(aware_time)
+
     def execute(self, context: Context):
         self.defer(
             trigger=DateTimeTrigger(moment=self.target_datetime),
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
index b3ce2e84d9..4d69b71324 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -68,3 +68,10 @@ class TestTimeSensorAsync:
         assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 10)
         assert exc_info.value.method_name == "execute_complete"
         assert exc_info.value.kwargs is None
+
+    def test_target_time_aware(self):
+        with DAG("test_target_time_aware", start_date=timezone.datetime(2020, 1, 1, 23, 0)):
+            aware_time = time(0, 1).replace(tzinfo=pendulum.local_timezone())
+            op = TimeSensorAsync(task_id="test", target_time=aware_time)
+            assert hasattr(op.target_datetime.tzinfo, "offset")
+            assert op.target_datetime.tzinfo.offset == 0


[airflow] 08/45: Add %z for %(asctime)s to fix timezone for logs on UI (#24811)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9b4eec2a0ea9b85a457e463cf2f251820e24ba0d
Author: Ko HanJong <ha...@navercorp.com>
AuthorDate: Thu Jul 14 00:27:27 2022 +0900

    Add %z for %(asctime)s to fix timezone for logs on UI (#24811)
    
    (cherry picked from commit 851e5cad165a043654116a8a2717c9d3643d8251)
---
 airflow/config_templates/airflow_local_settings.py | 11 ++++-
 airflow/config_templates/config.yml                |  6 +++
 airflow/config_templates/default_airflow.cfg       |  1 +
 airflow/utils/log/timezone_aware.py                | 49 ++++++++++++++++++++++
 airflow/www/static/js/ti_log.js                    |  4 +-
 newsfragments/24811.significant.rst                | 22 ++++++++++
 6 files changed, 90 insertions(+), 3 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 6684fd18e5..ea8a19e80c 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -38,6 +38,10 @@ FAB_LOG_LEVEL: str = conf.get_mandatory_value('logging', 'FAB_LOGGING_LEVEL').up
 
 LOG_FORMAT: str = conf.get_mandatory_value('logging', 'LOG_FORMAT')
 
+LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
+    'logging', 'LOG_FORMATTER_CLASS', fallback='airflow.utils.log.timezone_aware.TimezoneAware'
+)
+
 COLORED_LOG_FORMAT: str = conf.get_mandatory_value('logging', 'COLORED_LOG_FORMAT')
 
 COLORED_LOG: bool = conf.getboolean('logging', 'COLORED_CONSOLE_LOG')
@@ -60,10 +64,13 @@ DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
     'version': 1,
     'disable_existing_loggers': False,
     'formatters': {
-        'airflow': {'format': LOG_FORMAT},
+        'airflow': {
+            'format': LOG_FORMAT,
+            'class': LOG_FORMATTER_CLASS,
+        },
         'airflow_coloured': {
             'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
-            'class': COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter',
+            'class': COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
         },
     },
     'filters': {
diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml
index f7409373c2..4ea090bcd9 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -625,6 +625,12 @@
       type: string
       example: ~
       default: "%%(asctime)s %%(levelname)s - %%(message)s"
+    - name: log_formatter_class
+      description: ~
+      version_added: 2.3.4
+      type: string
+      example: ~
+      default: "airflow.utils.log.timezone_aware.TimezoneAware"
     - name: task_log_prefix_template
       description: |
         Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 6591f82dc0..86f9cf93fc 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -345,6 +345,7 @@ colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatte
 # Format of Log line
 log_format = [%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s
 simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
+log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
 
 # Specify prefix pattern like mentioned below with stream handler TaskHandlerWithCustomFormatter
 # Example: task_log_prefix_template = {{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{try_number}}
diff --git a/airflow/utils/log/timezone_aware.py b/airflow/utils/log/timezone_aware.py
new file mode 100644
index 0000000000..d01205233a
--- /dev/null
+++ b/airflow/utils/log/timezone_aware.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+
+import pendulum
+
+
+class TimezoneAware(logging.Formatter):
+    """
+    Override `default_time_format`, `default_msec_format` and `formatTime` to specify utc offset.
+    utc offset is the matter, without it, time conversion could be wrong.
+    With this Formatter, `%(asctime)s` will be formatted containing utc offset. (ISO 8601)
+    (e.g. 2022-06-12T13:00:00.123+0000)
+    """
+
+    default_time_format = '%Y-%m-%dT%H:%M:%S'
+    default_msec_format = '%s.%03d'
+    default_tz_format = '%z'
+
+    def formatTime(self, record, datefmt=None):
+        """
+        Returns the creation time of the specified LogRecord in ISO 8601 date and time format
+        in the local time zone.
+        """
+        dt = pendulum.from_timestamp(record.created, tz=pendulum.local_timezone())
+        if datefmt:
+            s = dt.strftime(datefmt)
+        else:
+            s = dt.strftime(self.default_time_format)
+
+        if self.default_msec_format:
+            s = self.default_msec_format % (s, record.msecs)
+        if self.default_tz_format:
+            s += dt.strftime(self.default_tz_format)
+        return s
diff --git a/airflow/www/static/js/ti_log.js b/airflow/www/static/js/ti_log.js
index 1bf6b501a6..ae72837345 100644
--- a/airflow/www/static/js/ti_log.js
+++ b/airflow/www/static/js/ti_log.js
@@ -103,6 +103,7 @@ function autoTailingLog(tryNumber, metadata = null, autoTailing = false) {
       // Detect urls and log timestamps
       const urlRegex = /http(s)?:\/\/[\w.-]+(\.?:[\w.-]+)*([/?#][\w\-._~:/?#[\]@!$&'()*+,;=.%]+)?/g;
       const dateRegex = /\d{4}[./-]\d{2}[./-]\d{2} \d{2}:\d{2}:\d{2},\d{3}/g;
+      const iso8601Regex = /\d{4}[./-]\d{2}[./-]\d{2}T\d{2}:\d{2}:\d{2}.\d{3}[+-]\d{4}/g;
 
       res.message.forEach((item) => {
         const logBlockElementId = `try-${tryNumber}-${item[0]}`;
@@ -120,7 +121,8 @@ function autoTailingLog(tryNumber, metadata = null, autoTailing = false) {
         const escapedMessage = escapeHtml(item[1]);
         const linkifiedMessage = escapedMessage
           .replace(urlRegex, (url) => `<a href="${url}" target="_blank">${url}</a>`)
-          .replaceAll(dateRegex, (date) => `<time datetime="${date}+00:00" data-with-tz="true">${formatDateTime(`${date}+00:00`)}</time>`);
+          .replaceAll(dateRegex, (date) => `<time datetime="${date}+00:00" data-with-tz="true">${formatDateTime(`${date}+00:00`)}</time>`)
+          .replaceAll(iso8601Regex, (date) => `<time datetime="${date}" data-with-tz="true">${formatDateTime(`${date}`)}</time>`);
         logBlock.innerHTML += `${linkifiedMessage}\n`;
       });
 
diff --git a/newsfragments/24811.significant.rst b/newsfragments/24811.significant.rst
new file mode 100644
index 0000000000..cb7208843c
--- /dev/null
+++ b/newsfragments/24811.significant.rst
@@ -0,0 +1,22 @@
+Added new config ``[logging]log_formatter_class`` to fix timezone display for logs on UI
+
+If you are using a custom Formatter subclass in your ``[logging]logging_config_class``, please inherit from ``airflow.utils.log.timezone_aware.TimezoneAware`` instead of ``logging.Formatter``.
+For example, in your ``custom_config.py``:
+
+.. code-block:: python
+   from airflow.utils.log.timezone_aware import TimezoneAware
+
+   # before
+   class YourCustomFormatter(logging.Formatter):
+       ...
+
+
+   # after
+   class YourCustomFormatter(TimezoneAware):
+       ...
+
+
+   AIRFLOW_FORMATTER = LOGGING_CONFIG["formatters"]["airflow"]
+   AIRFLOW_FORMATTER["class"] = "somewhere.your.custom_config.YourCustomFormatter"
+   # or use TimezoneAware class directly. If you don't have custom Formatter.
+   AIRFLOW_FORMATTER["class"] = "airflow.utils.log.timezone_aware.TimezoneAware"


[airflow] 10/45: Fix zombie task handling with multiple schedulers (#24906)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9b6df301ac9e772bdf257a6e3b6ead25c703ff2f
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Fri Jul 8 10:49:12 2022 -0600

    Fix zombie task handling with multiple schedulers (#24906)
    
    Each scheduler was looking at all running tasks for zombies, leading to
    multiple schedulers handling the zombies. This causes problems with
    retries (e.g. being marked as FAILED instead of UP_FOR_RETRY) and
    callbacks (e.g. `on_failure_callback` being called multiple times).
    
    When the second scheduler tries to determine if the task is able to be retried,
    and it's already in UP_FOR_RETRY (the first scheduler already finished),
    it sees the "next" try_number (as it's no longer running),
    which then leads it to be FAILED instead.
    
    The easy fix is to simply restrict each scheduler to its own TIs, as
    orphaned running TIs will be adopted anyways.
    
    (cherry picked from commit 1c0d0a5d907ae447b7221200952b47b69f8f8e87)
---
 airflow/jobs/scheduler_job.py    |  4 +++-
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++------------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8e22065618..539e6f1eff 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1336,7 +1336,8 @@ class SchedulerJob(BaseJob):
     def _find_zombies(self, session):
         """
         Find zombie task instances, which are tasks haven't heartbeated for too long
-        and update the current zombie list.
+        or have a no-longer-running LocalTaskJob, and send them off to the DAG processor
+        to be handled.
         """
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
@@ -1352,6 +1353,7 @@ class SchedulerJob(BaseJob):
                     LocalTaskJob.latest_heartbeat < limit_dttm,
                 )
             )
+            .filter(TaskInstance.queued_by_job_id == self.id)
             .all()
         )
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6027867ab9..dfd66770c0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3890,7 +3890,6 @@ class TestSchedulerJob:
             session.query(LocalTaskJob).delete()
             dag = dagbag.get_dag('example_branch_operator')
             dag.sync_to_db()
-            task = dag.get_task(task_id='run_this_first')
 
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
@@ -3899,21 +3898,33 @@ class TestSchedulerJob:
                 session=session,
             )
 
-            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-            local_job = LocalTaskJob(ti)
-            local_job.state = State.SHUTDOWN
-
-            session.add(local_job)
-            session.flush()
-
-            ti.job_id = local_job.id
-            session.add(ti)
-            session.flush()
-
             self.scheduler_job = SchedulerJob(subdir=os.devnull)
             self.scheduler_job.executor = MockExecutor()
             self.scheduler_job.processor_agent = mock.MagicMock()
 
+            # We will provision 2 tasks so we can check we only find zombies from this scheduler
+            tasks_to_setup = ['branching', 'run_this_first']
+
+            for task_id in tasks_to_setup:
+                task = dag.get_task(task_id=task_id)
+                ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+                ti.queued_by_job_id = 999
+
+                local_job = LocalTaskJob(ti)
+                local_job.state = State.SHUTDOWN
+
+                session.add(local_job)
+                session.flush()
+
+                ti.job_id = local_job.id
+                session.add(ti)
+                session.flush()
+
+            assert task.task_id == 'run_this_first'  # Make sure we have the task/ti we expect
+
+            ti.queued_by_job_id = self.scheduler_job.id
+            session.flush()
+
             self.scheduler_job._find_zombies(session=session)
 
             self.scheduler_job.executor.callback_sink.send.assert_called_once()


[airflow] 14/45: Update set-up-database.rst (#24983)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 651b43513af3ad3162f40a7148222a2b3bd4c64e
Author: calfzhou <ca...@gmail.com>
AuthorDate: Tue Jul 12 14:44:06 2022 +0800

    Update set-up-database.rst (#24983)
    
    Add notice about MySQL's `NO_ZERO_DATE` mode, which could cause database operation error in some cases.
    
    (cherry picked from commit aa568720c28cf9320e5c1b54b49a38ed9549a91a)
---
 docs/apache-airflow/howto/set-up-database.rst | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/docs/apache-airflow/howto/set-up-database.rst b/docs/apache-airflow/howto/set-up-database.rst
index 15a3d10c43..77f1b3bdeb 100644
--- a/docs/apache-airflow/howto/set-up-database.rst
+++ b/docs/apache-airflow/howto/set-up-database.rst
@@ -307,6 +307,10 @@ and setup of the SqlAlchemy connection.
 
 In addition, you also should pay particular attention to MySQL's encoding. Although the ``utf8mb4`` character set is more and more popular for MySQL (actually, ``utf8mb4`` becomes default character set in MySQL8.0), using the ``utf8mb4`` encoding requires additional setting in Airflow 2+ (See more details in `#7570 <https://github.com/apache/airflow/pull/7570>`__.). If you use ``utf8mb4`` as character set, you should also set ``sql_engine_collation_for_ids=utf8mb3_bin``.
 
+.. note::
+
+   In strict mode, MySQL doesn't allow ``0000-00-00`` as a valid date. Then you might get errors like ``"Invalid default value for 'end_date'"`` in some cases (some Airflow tables use ``0000-00-00 00:00:00`` as timestamp field default value). To avoid this error, you could disable ``NO_ZERO_DATE`` mode on you MySQL server. Read https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field for how to disable it. See `SQL Mode - NO_ZERO_DATE <https://dev [...]
+
 Setting up a MsSQL Database
 ---------------------------
 


[airflow] 31/45: Fix the errors raised when None is passed to template filters (#25593)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 23c714b0c200d30f3184f8dce5eb116eb955893f
Author: Kian Eliasi <ki...@gmail.com>
AuthorDate: Tue Aug 9 15:04:40 2022 +0430

    Fix the errors raised when None is passed to template filters (#25593)
    
    (cherry picked from commit 741c20770230c83a95f74fe7ad7cc9f95329f2cc)
---
 airflow/templates.py | 24 +++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git a/airflow/templates.py b/airflow/templates.py
index 6ec010f618..73c8635836 100644
--- a/airflow/templates.py
+++ b/airflow/templates.py
@@ -16,6 +16,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from __future__ import annotations
+
+import datetime
+
 import jinja2.nativetypes
 import jinja2.sandbox
 
@@ -44,23 +48,33 @@ class SandboxedEnvironment(_AirflowEnvironmentMixin, jinja2.sandbox.SandboxedEnv
     """SandboxedEnvironment for Airflow task templates."""
 
 
-def ds_filter(value):
+def ds_filter(value: datetime.date | datetime.time | None) -> str | None:
+    if value is None:
+        return None
     return value.strftime('%Y-%m-%d')
 
 
-def ds_nodash_filter(value):
+def ds_nodash_filter(value: datetime.date | datetime.time | None) -> str | None:
+    if value is None:
+        return None
     return value.strftime('%Y%m%d')
 
 
-def ts_filter(value):
+def ts_filter(value: datetime.date | datetime.time | None) -> str | None:
+    if value is None:
+        return None
     return value.isoformat()
 
 
-def ts_nodash_filter(value):
+def ts_nodash_filter(value: datetime.date | datetime.time | None) -> str | None:
+    if value is None:
+        return None
     return value.strftime('%Y%m%dT%H%M%S')
 
 
-def ts_nodash_with_tz_filter(value):
+def ts_nodash_with_tz_filter(value: datetime.date | datetime.time | None) -> str | None:
+    if value is None:
+        return None
     return value.isoformat().replace('-', '').replace(':', '')
 
 


[airflow] 26/45: Add __repr__ to ParamsDict class (#25305)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit de68fd1a816598204bfb6ab8274dc0fa5cf593f2
Author: Sumit Maheshwari <ms...@users.noreply.github.com>
AuthorDate: Wed Jul 27 12:43:27 2022 +0530

    Add __repr__ to ParamsDict class (#25305)
    
    Fixes #25295
    
    (cherry picked from commit df388a3d5364b748993e61b522d0b68ff8b8124a)
---
 airflow/models/param.py    | 3 +++
 tests/models/test_param.py | 4 ++++
 2 files changed, 7 insertions(+)

diff --git a/airflow/models/param.py b/airflow/models/param.py
index fcbe7a0f93..1179dd9fd6 100644
--- a/airflow/models/param.py
+++ b/airflow/models/param.py
@@ -147,6 +147,9 @@ class ParamsDict(MutableMapping[str, Any]):
     def __iter__(self):
         return iter(self.__dict)
 
+    def __repr__(self):
+        return repr(self.dump())
+
     def __setitem__(self, key: str, value: Any) -> None:
         """
         Override for dictionary's ``setitem`` method. This method make sure that all values are of
diff --git a/tests/models/test_param.py b/tests/models/test_param.py
index 3529f0360c..bbd430d773 100644
--- a/tests/models/test_param.py
+++ b/tests/models/test_param.py
@@ -193,6 +193,10 @@ class TestParamsDict:
         with pytest.raises(ParamValidationError, match=r'Invalid input for param key: 1 is not'):
             pd.update({'key': 1})
 
+    def test_repr(self):
+        pd = ParamsDict({'key': Param('value', type='string')})
+        assert repr(pd) == "{'key': 'value'}"
+
 
 class TestDagParamRuntime:
     VALUE = 42


[airflow] 15/45: Sort operator extra links (#24992)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f2a6915393860566cbd7c413b74bf06ef6d7c071
Author: HTErik <89...@users.noreply.github.com>
AuthorDate: Tue Jul 12 12:38:54 2022 +0200

    Sort operator extra links (#24992)
    
    Today, every time webserver is restarted, the order of the
    operator extra links are randomized due to Python sets being
    unordered.
    
    This change will sort the links according to their name.
    No particular thought has been given to customizing this sort
    order, except to make it consistent so that extra links always
    appear at the same place for the users.
    
    (cherry picked from commit f5cd2c396bcc9ef2775503b4f86aa9bb7d6c8d93)
---
 airflow/api_connexion/endpoints/extra_link_endpoint.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/api_connexion/endpoints/extra_link_endpoint.py b/airflow/api_connexion/endpoints/extra_link_endpoint.py
index 94b36928bf..c464979719 100644
--- a/airflow/api_connexion/endpoints/extra_link_endpoint.py
+++ b/airflow/api_connexion/endpoints/extra_link_endpoint.py
@@ -73,6 +73,6 @@ def get_extra_links(
         (link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
     )
     all_extra_links = {
-        link_name: link_url if link_url else None for link_name, link_url in all_extra_link_pairs
+        link_name: link_url if link_url else None for link_name, link_url in sorted(all_extra_link_pairs)
     }
     return all_extra_links


[airflow] 37/45: Removed interfering force of index. (#25404)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ef5b4c96233a2970fe0bd73de29cfca820ccf0e4
Author: Denis Boulas <de...@gmail.com>
AuthorDate: Tue Aug 2 14:14:31 2022 +0300

    Removed interfering force of index. (#25404)
    
    (cherry picked from commit 6778503784de403a7e62b25d9fffd1d6715f2c88)
---
 airflow/dag_processing/processor.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py
index 469b55cfeb..b04a0ef985 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -375,7 +375,6 @@ class DagFileProcessor(LoggingMixin):
         qry = (
             session.query(TI.task_id, func.max(DR.execution_date).label('max_ti'))
             .join(TI.dag_run)
-            .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
             .filter(TI.dag_id == dag.dag_id)
             .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
             .filter(TI.task_id.in_(dag.task_ids))