You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/03/16 09:59:47 UTC

[airflow] branch v2-2-test updated (ee9049c -> 94a75ae)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a change to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from ee9049c  fixup! Add changelog for 2.2.4rc1
     new 925082e  DB upgrade is required when updating Airflow (#22061)
     new 30fad7f  Fix incorrect data provided to tries & landing times charts (#21928)
     new f81bc4c  Fix assignment of unassigned triggers (#21770)
     new a5dbef9  Fix the triggerer capacity test (#21760)
     new 5f4eaac  Fix triggerer --capacity parameter (#21753)
     new cf5a3a9  Correct a couple grammatical errors in docs (#21750)
     new 7c760cf  Fix graph autorefresh on page load (#21736)
     new 9f0c814  Fix filesystem sensor for directories (#21729)
     new 17b0335  Fix stray order_by(TaskInstance.execution_date) (#21705)
     new 8b5cfb7  Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
     new cdecc7a  Log exception in local executor (#21667)
     new 445ae53  Fix postgres part of pipeline example of tutorial (#21586)
     new 9f177e0  Disable default_pool delete on web ui (#21658)
     new e6fb0e9  Fix logging JDBC SQL error when task fails (#21540)
     new b794415  Filter out default configs when overrides exist. (#21539)
     new a80213c  Fix Resources __eq__ check (#21442)
     new 68e6e8f  Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
     new cb37eb4  Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
     new 94a75ae  Fix race condition between triggerer and scheduler (#21316)

The 19 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/api/common/experimental/pool.py            |   2 +-
 airflow/cli/cli_parser.py                          |   2 +-
 airflow/configuration.py                           |  58 ++++++++++++++++++
 airflow/executors/base_executor.py                 |  36 +++++++++--
 airflow/executors/local_executor.py                |   4 +-
 airflow/models/baseoperator.py                     |  14 ++---
 airflow/models/dagrun.py                           |   2 +
 airflow/models/pool.py                             |  19 +++++-
 airflow/models/trigger.py                          |  10 ++-
 airflow/secrets/local_filesystem.py                |   7 +--
 airflow/sensors/filesystem.py                      |   2 +-
 airflow/utils/log/secrets_masker.py                |   7 ++-
 airflow/utils/operator_resources.py                |   4 ++
 airflow/www/static/js/graph.js                     |  19 +++---
 airflow/www/views.py                               |  17 +++++-
 docs/apache-airflow/best-practices.rst             |  67 +++++++++++++++++++++
 docs/apache-airflow/concepts/dags.rst              |   5 +-
 docs/apache-airflow/concepts/tasks.rst             |   6 ++
 docs/apache-airflow/dag-run.rst                    |  27 ++++++++-
 docs/apache-airflow/howto/set-config.rst           |   4 ++
 docs/apache-airflow/img/watcher.png                | Bin 0 -> 41592 bytes
 docs/apache-airflow/installation/upgrading.rst     |   6 +-
 docs/apache-airflow/plugins.rst                    |   2 +-
 docs/apache-airflow/tutorial.rst                   |  67 +++++++++++++--------
 docs/spelling_wordlist.txt                         |   1 +
 tests/cli/commands/test_triggerer_command.py       |   2 +-
 tests/core/test_configuration.py                   |  46 ++++++++++++++
 tests/executors/test_base_executor.py              |  60 ++++++++++++++++--
 tests/jobs/test_scheduler_job.py                   |  35 +++++++++++
 tests/models/test_pool.py                          |   6 ++
 tests/models/test_trigger.py                       |  52 ++++++++++++++++
 tests/secrets/test_local_filesystem.py             |  17 ++++++
 tests/sensors/test_filesystem.py                   |  36 ++++++++++-
 .../test_operator_resources.py}                    |  24 ++++----
 34 files changed, 578 insertions(+), 88 deletions(-)
 create mode 100644 docs/apache-airflow/img/watcher.png
 copy tests/{providers/amazon/aws/hooks/test_sqs.py => utils/test_operator_resources.py} (67%)

[airflow] 07/19: Fix graph autorefresh on page load (#21736)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7c760cfe5f8f21c7ae47f15faad61aa3fb3c1b37
Author: Brent Bovenzi <br...@gmail.com>
AuthorDate: Tue Feb 22 11:41:39 2022 -0500

    Fix graph autorefresh on page load (#21736)
    
    * fix auto refresh check on page load
    
    * minor code cleanup
    
    * remove new line
    
    (cherry picked from commit b2c0a921c155e82d1140029e6495594061945025)
---
 airflow/www/static/js/graph.js | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/airflow/www/static/js/graph.js b/airflow/www/static/js/graph.js
index 2d3b22e..615e238 100644
--- a/airflow/www/static/js/graph.js
+++ b/airflow/www/static/js/graph.js
@@ -57,6 +57,13 @@ const stateFocusMap = {
   deferred: false,
   no_status: false,
 };
+
+const checkRunState = () => {
+  const states = Object.values(taskInstances).map((ti) => ti.state);
+  return !states.some((state) => (
+    ['success', 'failed', 'upstream_failed', 'skipped', 'removed'].indexOf(state) === -1));
+};
+
 const taskTip = d3.tip()
   .attr('class', 'tooltip d3-tip')
   .html((toolTipHtml) => toolTipHtml);
@@ -362,13 +369,11 @@ function handleRefresh() {
         if (prevTis !== tis) {
         // eslint-disable-next-line no-global-assign
           taskInstances = JSON.parse(tis);
-          const states = Object.values(taskInstances).map((ti) => ti.state);
           updateNodesStates(taskInstances);
 
           // end refresh if all states are final
-          if (!states.some((state) => (
-            ['success', 'failed', 'upstream_failed', 'skipped', 'removed'].indexOf(state) === -1))
-          ) {
+          const isFinal = checkRunState();
+          if (isFinal) {
             $('#auto_refresh').prop('checked', false);
             clearInterval(refreshInterval);
           }
@@ -410,9 +415,9 @@ $('#auto_refresh').change(() => {
 });
 
 function initRefresh() {
-  if (localStorage.getItem('disableAutoRefresh')) {
-    $('#auto_refresh').prop('checked', false);
-  }
+  const isDisabled = localStorage.getItem('disableAutoRefresh');
+  const isFinal = checkRunState();
+  $('#auto_refresh').prop('checked', !(isDisabled || isFinal));
   startOrStopRefresh();
   d3.select('#refresh_button').on('click', () => handleRefresh());
 }

[airflow] 10/19: Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8b5cfb763a77fc61c3c85a01aac21f620644d509
Author: Davy <ka...@gmail.com>
AuthorDate: Thu Feb 24 15:40:57 2022 +0800

    Correctly handle multiple '=' in LocalFileSystem secrets. (#21694)
    
    (cherry picked from commit 919b75ba20083cc83c4e84e35aae8102af2b5871)
---
 airflow/secrets/local_filesystem.py    |  7 +++----
 tests/secrets/test_local_filesystem.py | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/airflow/secrets/local_filesystem.py b/airflow/secrets/local_filesystem.py
index d23969f..227a77c 100644
--- a/airflow/secrets/local_filesystem.py
+++ b/airflow/secrets/local_filesystem.py
@@ -75,8 +75,8 @@ def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSynt
             # Ignore comments
             continue
 
-        var_parts: List[str] = line.split("=", 2)
-        if len(var_parts) != 2:
+        key, sep, value = line.partition("=")
+        if not sep:
             errors.append(
                 FileSyntaxError(
                     line_no=line_no,
@@ -85,8 +85,7 @@ def _parse_env_file(file_path: str) -> Tuple[Dict[str, List[str]], List[FileSynt
             )
             continue
 
-        key, value = var_parts
-        if not key:
+        if not value:
             errors.append(
                 FileSyntaxError(
                     line_no=line_no,
diff --git a/tests/secrets/test_local_filesystem.py b/tests/secrets/test_local_filesystem.py
index 85f0aaa..5993eb3 100644
--- a/tests/secrets/test_local_filesystem.py
+++ b/tests/secrets/test_local_filesystem.py
@@ -155,6 +155,23 @@ class TestLoadConnection(unittest.TestCase):
 
     @parameterized.expand(
         (
+            (
+                "CONN_ID=mysql://host_1?param1=val1&param2=val2",
+                {"CONN_ID": "mysql://host_1?param1=val1&param2=val2"},
+            ),
+        )
+    )
+    def test_parsing_with_params(self, content, expected_connection_uris):
+        with mock_local_file(content):
+            connections_by_conn_id = local_filesystem.load_connections_dict("a.env")
+            connection_uris_by_conn_id = {
+                conn_id: connection.get_uri() for conn_id, connection in connections_by_conn_id.items()
+            }
+
+            assert expected_connection_uris == connection_uris_by_conn_id
+
+    @parameterized.expand(
+        (
             ("AA", 'Invalid line format. The line should contain at least one equal sign ("=")'),
             ("=", "Invalid line format. Key is empty."),
         )

[airflow] 03/19: Fix assignment of unassigned triggers (#21770)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit f81bc4c44b46b9984f47b10076681ac757e20cd8
Author: jkramer-ginkgo <68...@users.noreply.github.com>
AuthorDate: Sat Feb 26 14:25:15 2022 -0500

    Fix assignment of unassigned triggers (#21770)
    
    Previously, the query returned no alive triggerers which resulted
    in all triggers to be assigned to the current triggerer. This works
    fine, despite the logic bug, in the case where there's a single
    triggerer. But with multiple triggerers, concurrent iterations of
    the TriggerJob loop would bounce trigger ownership to whichever
    loop ran last.
    
    Addresses https://github.com/apache/airflow/issues/21616
    
    (cherry picked from commit b26d4d8a290ce0104992ba28850113490c1ca445)
---
 airflow/models/trigger.py    | 10 ++++++---
 tests/models/test_trigger.py | 52 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/airflow/models/trigger.py b/airflow/models/trigger.py
index 5749589..aa0d2b1 100644
--- a/airflow/models/trigger.py
+++ b/airflow/models/trigger.py
@@ -17,7 +17,7 @@
 import datetime
 from typing import Any, Dict, List, Optional
 
-from sqlalchemy import Column, Integer, String, func
+from sqlalchemy import Column, Integer, String, func, or_
 
 from airflow.models.base import Base
 from airflow.models.taskinstance import TaskInstance
@@ -175,7 +175,7 @@ class Trigger(Base):
         alive_triggerer_ids = [
             row[0]
             for row in session.query(BaseJob.id).filter(
-                BaseJob.end_date is None,
+                BaseJob.end_date.is_(None),
                 BaseJob.latest_heartbeat > timezone.utcnow() - datetime.timedelta(seconds=30),
                 BaseJob.job_type == "TriggererJob",
             )
@@ -184,7 +184,11 @@ class Trigger(Base):
         # Find triggers who do NOT have an alive triggerer_id, and then assign
         # up to `capacity` of those to us.
         trigger_ids_query = (
-            session.query(cls.id).filter(cls.triggerer_id.notin_(alive_triggerer_ids)).limit(capacity).all()
+            session.query(cls.id)
+            # notin_ doesn't find NULL rows
+            .filter(or_(cls.triggerer_id.is_(None), cls.triggerer_id.notin_(alive_triggerer_ids)))
+            .limit(capacity)
+            .all()
         )
         session.query(cls).filter(cls.id.in_([i.id for i in trigger_ids_query])).update(
             {cls.triggerer_id: triggerer_id},
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index aacfa88..99cd71f 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -15,8 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+
 import pytest
 
+from airflow.jobs.triggerer_job import TriggererJob
 from airflow.models import TaskInstance, Trigger
 from airflow.operators.dummy import DummyOperator
 from airflow.triggers.base import TriggerEvent
@@ -36,9 +39,11 @@ def session():
 def clear_db(session):
     session.query(TaskInstance).delete()
     session.query(Trigger).delete()
+    session.query(TriggererJob).delete()
     yield session
     session.query(TaskInstance).delete()
     session.query(Trigger).delete()
+    session.query(TriggererJob).delete()
     session.commit()
 
 
@@ -124,3 +129,50 @@ def test_submit_failure(session, create_task_instance):
     updated_task_instance = session.query(TaskInstance).one()
     assert updated_task_instance.state == State.SCHEDULED
     assert updated_task_instance.next_method == "__fail__"
+
+
+def test_assign_unassigned(session, create_task_instance):
+    """
+    Tests that unassigned triggers of all appropriate states are assigned.
+    """
+    finished_triggerer = TriggererJob(None, heartrate=10, state=State.SUCCESS)
+    finished_triggerer.end_date = timezone.utcnow() - datetime.timedelta(hours=1)
+    session.add(finished_triggerer)
+    assert not finished_triggerer.is_alive()
+    healthy_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    session.add(healthy_triggerer)
+    assert healthy_triggerer.is_alive()
+    new_triggerer = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    session.add(new_triggerer)
+    assert new_triggerer.is_alive()
+    session.commit()
+    trigger_on_healthy_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_healthy_triggerer.id = 1
+    trigger_on_healthy_triggerer.triggerer_id = healthy_triggerer.id
+    trigger_on_killed_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_on_killed_triggerer.id = 2
+    trigger_on_killed_triggerer.triggerer_id = finished_triggerer.id
+    trigger_unassigned_to_triggerer = Trigger(classpath="airflow.triggers.testing.SuccessTrigger", kwargs={})
+    trigger_unassigned_to_triggerer.id = 3
+    assert trigger_unassigned_to_triggerer.triggerer_id is None
+    session.add(trigger_on_healthy_triggerer)
+    session.add(trigger_on_killed_triggerer)
+    session.add(trigger_unassigned_to_triggerer)
+    session.commit()
+    assert session.query(Trigger).count() == 3
+    Trigger.assign_unassigned(new_triggerer.id, 100, session=session)
+    session.expire_all()
+    # Check that trigger on killed triggerer and unassigned trigger are assigned to new triggerer
+    assert (
+        session.query(Trigger).filter(Trigger.id == trigger_on_killed_triggerer.id).one().triggerer_id
+        == new_triggerer.id
+    )
+    assert (
+        session.query(Trigger).filter(Trigger.id == trigger_unassigned_to_triggerer.id).one().triggerer_id
+        == new_triggerer.id
+    )
+    # Check that trigger on healthy triggerer still assigned to existing triggerer
+    assert (
+        session.query(Trigger).filter(Trigger.id == trigger_on_healthy_triggerer.id).one().triggerer_id
+        == healthy_triggerer.id
+    )

[airflow] 17/19: Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 68e6e8f60764f9956b2e37e17e8a5dc4923fd0f2
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Feb 24 08:12:12 2022 +0100

    Fix max_active_runs=1 not scheduling runs when min_file_process_interval is high (#21413)
    
    The finished dagrun was still being seen as running when we call dag.get_num_active_runs
    because the session was not flushed. This PR fixes it
    
    (cherry picked from commit feea143af9b1db3b1f8cd8d29677f0b2b2ab757a)
---
 airflow/models/dagrun.py         |  2 ++
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 20ec7cd..c42604d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -604,11 +604,13 @@ class DagRun(Base, LoggingMixin):
                 self.data_interval_end,
                 self.dag_hash,
             )
+            session.flush()
 
         self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
         self._emit_duration_stats_for_finished_state()
 
         session.merge(self)
+        # We do not flush here for performance reasons(It increases queries count by +20)
 
         return schedulable_tis, callback
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 7185720..168d452 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1180,6 +1180,41 @@ class TestSchedulerJob:
         assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0
         assert orm_dag.next_dagrun_create_after is None
 
+    def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session):
+        """
+        Test that when creating runs once max_active_runs is reached the runs does not stick
+        """
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor(do_update=True)
+        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        with dag_maker(max_active_runs=1, session=session) as dag:
+            # Need to use something that doesn't immediately get marked as success by the scheduler
+            BashOperator(task_id='task', bash_command='true')
+
+        dag_run = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            session=session,
+        )
+
+        # Reach max_active_runs
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        # Complete dagrun
+        # Add dag_run back in to the session (_do_scheduling does an expunge_all)
+        dag_run = session.merge(dag_run)
+        session.refresh(dag_run)
+        dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS
+
+        # create new run
+        for _ in range(3):
+            self.scheduler_job._do_scheduling(session)
+
+        # Assert that new runs has created
+        dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
+        assert len(dag_runs) == 2
+
     def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
         """
         Test if a a dagrun will not be scheduled if max_dag_runs

[airflow] 14/19: Fix logging JDBC SQL error when task fails (#21540)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e6fb0e9bb7f5f0767fcc6fc8f080f8ca97b06836
Author: hubert-pietron <94...@users.noreply.github.com>
AuthorDate: Sun Feb 27 14:07:14 2022 +0100

    Fix logging JDBC SQL error when task fails (#21540)
    
    (cherry picked from commit bc1b422e1ce3a5b170618a7a6589f8ae2fc33ad6)
---
 airflow/utils/log/secrets_masker.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index c263cbb..6cdfd0e 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -145,7 +145,12 @@ class SecretsMasker(logging.Filter):
         return frozenset(record.__dict__).difference({'msg', 'args'})
 
     def _redact_exception_with_context(self, exception):
-        exception.args = (self.redact(v) for v in exception.args)
+        # Exception class may not be modifiable (e.g. declared by an
+        # extension module such as JDBC).
+        try:
+            exception.args = (self.redact(v) for v in exception.args)
+        except AttributeError:
+            pass
         if exception.__context__:
             self._redact_exception_with_context(exception.__context__)
         if exception.__cause__ and exception.__cause__ is not exception.__context__:

[airflow] 11/19: Log exception in local executor (#21667)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cdecc7a59360abd23f539cdd7c6e05ffa755bf41
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Mon Feb 21 05:05:01 2022 +0800

    Log exception in local executor (#21667)
    
    (cherry picked from commit a0fb0bbad312df06dd0a85453bd4f93ee2e01cbb)
---
 airflow/executors/local_executor.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 775b6ca..fc662fc 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -125,12 +125,12 @@ class LocalWorkerBase(Process, LoggingMixin):
             ret = 0
             return State.SUCCESS
         except Exception as e:
-            self.log.error("Failed to execute task %s.", str(e))
+            self.log.exception("Failed to execute task %s.", e)
+            return State.FAILED
         finally:
             Sentry.flush()
             logging.shutdown()
             os._exit(ret)
-            raise RuntimeError('unreachable -- keep mypy happy')
 
     @abstractmethod
     def do_work(self):

[airflow] 05/19: Fix triggerer --capacity parameter (#21753)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5f4eaac51308de66bfa0592bb55df9be15ccef64
Author: Sumit Maheshwari <ms...@users.noreply.github.com>
AuthorDate: Wed Feb 23 15:50:13 2022 +0530

    Fix triggerer --capacity parameter (#21753)
    
    (cherry picked from commit 9076b67c05cdba23e8fa51ebe5ad7f7d53e1c2ba)
---
 airflow/cli/cli_parser.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 64cc582..e9939be 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -758,7 +758,7 @@ ARG_INCLUDE_DAGS = Arg(
 # triggerer
 ARG_CAPACITY = Arg(
     ("--capacity",),
-    type=str,
+    type=positive_int(allow_zero=False),
     help="The maximum number of triggers that a Triggerer will run at one time.",
 )
 

[airflow] 09/19: Fix stray order_by(TaskInstance.execution_date) (#21705)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 17b03354d3f5b9bac6c4700557bf4d6e217add91
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Feb 23 04:12:22 2022 +0800

    Fix stray order_by(TaskInstance.execution_date) (#21705)
    
    (cherry picked from commit bb577a98494369b22ae252ac8d23fb8e95508a1c)
---
 airflow/models/baseoperator.py | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 9e4cedd..c758aeb 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -1253,18 +1253,18 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin, metaclass=BaseOperatorMeta
         end_date: Optional[datetime] = None,
         session: Session = None,
     ) -> List[TaskInstance]:
-        """
-        Get a set of task instance related to this task for a specific date
-        range.
-        """
+        """Get task instances related to this task for a specific date range."""
+        from airflow.models import DagRun
+
         end_date = end_date or timezone.utcnow()
         return (
             session.query(TaskInstance)
+            .join(TaskInstance.dag_run)
             .filter(TaskInstance.dag_id == self.dag_id)
             .filter(TaskInstance.task_id == self.task_id)
-            .filter(TaskInstance.execution_date >= start_date)
-            .filter(TaskInstance.execution_date <= end_date)
-            .order_by(TaskInstance.execution_date)
+            .filter(DagRun.execution_date >= start_date)
+            .filter(DagRun.execution_date <= end_date)
+            .order_by(DagRun.execution_date)
             .all()
         )
 

[airflow] 06/19: Correct a couple grammatical errors in docs (#21750)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cf5a3a94ae55337ddbf900f9735ae140b09fd6a6
Author: Zach McQuiston <37...@users.noreply.github.com>
AuthorDate: Wed Feb 23 13:48:06 2022 -0700

    Correct a couple grammatical errors in docs (#21750)
    
    Just reading through the docs as we implement Airflow on our end, saw a couple additions that could be made.
    
    (cherry picked from commit 3bb63d4cfbbb534298d32ec987f25a02c11fc4e6)
---
 docs/apache-airflow/plugins.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst
index 3543aef..f8a7e00 100644
--- a/docs/apache-airflow/plugins.rst
+++ b/docs/apache-airflow/plugins.rst
@@ -27,7 +27,7 @@ features to its core by simply dropping files in your
 The python modules in the ``plugins`` folder get imported, and **macros** and web **views**
 get integrated to Airflow's main collections and become available for use.
 
-To troubleshoot issue with plugins, you can use ``airflow plugins`` command.
+To troubleshoot issues with plugins, you can use the ``airflow plugins`` command.
 This command dumps information about loaded plugins.
 
 .. versionchanged:: 2.0

[airflow] 02/19: Fix incorrect data provided to tries & landing times charts (#21928)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 30fad7f318126d6e2c1a0fe8f688dc197163df79
Author: Alexander Millin <a....@city-mobil.ru>
AuthorDate: Wed Mar 2 13:29:07 2022 +0300

    Fix incorrect data provided to tries & landing times charts (#21928)
    
    (cherry picked from commit 2c57ad4ff9ddde8102c62f2e25c2a2e82cceb3e7)
---
 airflow/www/views.py | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9ebe899..9fc61b5 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2727,6 +2727,8 @@ class Airflow(AirflowBaseView):
             y_points = []
             x_points = []
             for ti in tis:
+                if ti.task_id != task.task_id:
+                    continue
                 dttm = wwwutils.epoch(ti.execution_date)
                 x_points.append(dttm)
                 # y value should reflect completed tries to have a 0 baseline.
@@ -2802,6 +2804,8 @@ class Airflow(AirflowBaseView):
             y_points[task_id] = []
             x_points[task_id] = []
             for ti in tis:
+                if ti.task_id != task.task_id:
+                    continue
                 ts = dag.get_run_data_interval(ti.dag_run).end
                 if ti.end_date:
                     dttm = wwwutils.epoch(ti.execution_date)

[airflow] 18/19: Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit cb37eb48c5abdb45b034104a8b3efe0de9820553
Author: Mateusz Nojek <ma...@gmail.com>
AuthorDate: Mon Feb 21 17:27:36 2022 +0100

    Extend documentation for states of DAGs & tasks and update trigger rules docs (#21382)
    
    (cherry picked from commit 4e959358ac4ef9554ff5d82cdc85ab7dc142a639)
---
 docs/apache-airflow/best-practices.rst |  67 +++++++++++++++++++++++++++++++++
 docs/apache-airflow/concepts/dags.rst  |   5 ++-
 docs/apache-airflow/concepts/tasks.rst |   6 +++
 docs/apache-airflow/dag-run.rst        |  27 +++++++++++--
 docs/apache-airflow/img/watcher.png    | Bin 0 -> 41592 bytes
 docs/spelling_wordlist.txt             |   1 +
 6 files changed, 101 insertions(+), 5 deletions(-)

diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst
index 3b01b3e..15fcea1 100644
--- a/docs/apache-airflow/best-practices.rst
+++ b/docs/apache-airflow/best-practices.rst
@@ -326,6 +326,73 @@ each parameter by following the links):
 * :ref:`config:scheduler__parsing_processes`
 * :ref:`config:scheduler__file_parsing_sort_mode`
 
+Example of watcher pattern with trigger rules
+---------------------------------------------
+
+The watcher pattern is how we call a DAG with a task that is "watching" the states of the other tasks. It's primary purpose is to fail a DAG Run when any other task fail.
+The need came from the Airflow system tests that are DAGs with different tasks (similarly like a test containing steps).
+
+Normally, when any task fails, all other tasks are not executed and the whole DAG Run gets failed status too. But when we use trigger rules, we can disrupt the normal flow of running tasks and the whole DAG may represent different status that we expect.
+For example, we can have a teardown task (with trigger rule set to ``"all_done"``) that will be executed regardless of the state of the other tasks (e.g. to clean up the resources). In such situation, the DAG would always run this task and the DAG Run will get the status of this particular task, so we can potentially lose the information about failing tasks.
+If we want to ensure that the DAG with teardown task would fail if any task fails, we need to use the watcher pattern.
+The watcher task is a task that will always fail if triggered, but it needs to be triggered only if any other task fails. It needs to have a trigger rule set to ``"one_failed"`` and it needs also to be a downstream task for all other tasks in the DAG.
+Thanks to this, if every other task will pass, the watcher will be skipped, but when something fails, the watcher task will be executed and fail making the DAG Run fail too.
+
+.. note::
+
+    Be aware that trigger rules only rely on the direct upstream (parent) tasks, e.g. ``one_failed`` will ignore any failed (or ``upstream_failed``) tasks that are not a direct parent of the parameterized task.
+
+It's easier to grab the concept with an example. Let's say that we have the following DAG:
+
+.. code-block:: python
+
+    from datetime import datetime
+    from airflow import DAG
+    from airflow.decorators import task
+    from airflow.exceptions import AirflowException
+    from airflow.operators.bash import BashOperator
+    from airflow.operators.python import PythonOperator
+
+
+    @task(trigger_rule="one_failed", retries=0)
+    def watcher():
+        raise AirflowException("Failing task because one or more upstream tasks failed.")
+
+
+    with DAG(
+        dag_id="watcher_example",
+        schedule_interval="@once",
+        start_date=datetime(2021, 1, 1),
+        catchup=False,
+    ) as dag:
+        failing_task = BashOperator(
+            task_id="failing_task", bash_command="exit 1", retries=0
+        )
+        passing_task = BashOperator(
+            task_id="passing_task", bash_command="echo passing_task"
+        )
+        teardown = BashOperator(
+            task_id="teardown", bash_command="echo teardown", trigger_rule="all_done"
+        )
+
+        failing_task >> passing_task >> teardown
+        list(dag.tasks) >> watcher()
+
+The visual representation of this DAG after execution looks like this:
+
+.. image:: /img/watcher.png
+
+We have several tasks that serve different purposes:
+
+- ``failing_task`` always fails,
+- ``passing_task`` always succeeds (if executed),
+- ``teardown`` is always triggered (regardless the states of the other tasks) and it should always succeed,
+- ``watcher`` is a downstream task for each other task, i.e. it will be triggered when any task fails and thus fail the whole DAG Run, since it's a leaf task.
+
+It's important to note, that without ``watcher`` task, the whole DAG Run will get the ``success`` state, since the only failing task is not the leaf task, and the ``teardown`` task will finish with ``success``.
+If we want the ``watcher`` to monitor the state of all tasks, we need to make it dependent on all of them separately. Thanks to this, we can fail the DAG Run if any of the tasks fail. Note that the watcher task has a trigger rule set to ``"one_failed"``.
+On the other hand, without the ``teardown`` task, the ``watcher`` task will not be needed, because ``failing_task`` will propagate its ``failed`` state to downstream task ``passed_task`` and the whole DAG Run will also get the ``failed`` status.
+
 .. _best_practices/reducing_dag_complexity:
 
 Reducing DAG complexity
diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst
index 32d21ce..b83b025 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -369,7 +369,7 @@ Note that if you are running the DAG at the very start of its life---specificall
 Trigger Rules
 ~~~~~~~~~~~~~
 
-By default, Airflow will wait for all upstream tasks for a task to be :ref:`successful <concepts:task-states>` before it runs that task.
+By default, Airflow will wait for all upstream (direct parents) tasks for a task to be :ref:`successful <concepts:task-states>` before it runs that task.
 
 However, this is just the default behaviour, and you can control it using the ``trigger_rule`` argument to a Task. The options for ``trigger_rule`` are:
 
@@ -383,6 +383,7 @@ However, this is just the default behaviour, and you can control it using the ``
 * ``none_skipped``: No upstream task is in a ``skipped`` state - that is, all upstream tasks are in a ``success``, ``failed``, or ``upstream_failed`` state
 * ``always``: No dependencies at all, run this task at any time
 
+
 You can also combine this with the :ref:`concepts:depends-on-past` functionality if you wish.
 
 .. note::
@@ -724,7 +725,7 @@ You can also prepare ``.airflowignore`` file for a subfolder in ``DAG_FOLDER`` a
 would only be applicable for that subfolder.
 
 DAG Dependencies
-================
+----------------
 
 *Added in Airflow 2.1*
 
diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst
index f6c6619..41fa556 100644
--- a/docs/apache-airflow/concepts/tasks.rst
+++ b/docs/apache-airflow/concepts/tasks.rst
@@ -36,6 +36,12 @@ Relationships
 
 The key part of using Tasks is defining how they relate to each other - their *dependencies*, or as we say in Airflow, their *upstream* and *downstream* tasks. You declare your Tasks first, and then you declare their dependencies second.
 
+.. note::
+
+    We call the *upstream* task the one that is directly preceding the other task. We used to call it a parent task before.
+    Be aware that this concept does not describe the tasks that are higher in the tasks hierarchy (i.e. they are not a direct parents of the task).
+    Same definition applies to *downstream* task, which needs to be a direct child of the other task.
+
 There are two ways of declaring dependencies - using the ``>>`` and ``<<`` (bitshift) operators::
 
     first_task >> second_task >> [third_task, fourth_task]
diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst
index 1a2bbe3..52d90e0 100644
--- a/docs/apache-airflow/dag-run.rst
+++ b/docs/apache-airflow/dag-run.rst
@@ -18,6 +18,30 @@
 DAG Runs
 =========
 A DAG Run is an object representing an instantiation of the DAG in time.
+Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states.
+Each DAG Run is run separately from another, meaning that you can have running DAG many times at the same time.
+
+.. _dag-run:dag-run-status:
+
+DAG Run Status
+''''''''''''''
+
+A DAG Run status is determined when the execution of the DAG is finished.
+The execution of the DAG depends on its containing tasks and their dependencies.
+The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like ``success``, ``failed`` or ``skipped``.
+The DAG Run is having the status assigned based on the so-called "leaf nodes" or simply "leaves". Leaf nodes are the tasks with no children.
+
+There are two possible terminal states for the DAG Run:
+
+- ``success`` if all of the leaf nodes states are either ``success`` or ``skipped``,
+- ``failed`` if any of the leaf nodes state is either ``failed`` or ``upstream_failed``.
+
+.. note::
+    Be careful if some of your tasks have defined some specific `trigger rule <dags.html#trigger-rules>`_.
+    These can lead to some unexpected behavior, e.g. if you have a leaf task with trigger rule `"all_done"`, it will be executed regardless of the states of the rest of the tasks and if it will succeed, then the whole DAG Run will also be marked as ``success``, even if something failed in the middle.
+
+Cron Presets
+''''''''''''
 
 Each DAG may or may not have a schedule, which informs how DAG Runs are
 created. ``schedule_interval`` is defined as a DAG argument, which can be passed a
@@ -27,9 +51,6 @@ a ``str``, a ``datetime.timedelta`` object, or one of the following cron "preset
 .. tip::
     You can use an online editor for CRON expressions such as `Crontab guru <https://crontab.guru/>`_
 
-Cron Presets
-''''''''''''
-
 +----------------+----------------------------------------------------------------+-----------------+
 | preset         | meaning                                                        | cron            |
 +================+================================================================+=================+
diff --git a/docs/apache-airflow/img/watcher.png b/docs/apache-airflow/img/watcher.png
new file mode 100644
index 0000000..9e0ed2d
Binary files /dev/null and b/docs/apache-airflow/img/watcher.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index ed114b6..350b79d 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1339,6 +1339,7 @@ taskflow
 taskinstance
 tblproperties
 tcp
+teardown
 templatable
 templateable
 templated

[airflow] 19/19: Fix race condition between triggerer and scheduler (#21316)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 94a75ae7d15459dd7a0f7ac26f7e65a5f30551b2
Author: Malthe Borch <mb...@gmail.com>
AuthorDate: Tue Feb 15 13:12:51 2022 +0000

    Fix race condition between triggerer and scheduler (#21316)
    
    (cherry picked from commit 2a6792d94d153c6f2dd116843a43ee63cd296c8d)
---
 airflow/executors/base_executor.py    | 36 ++++++++++++++++++---
 tests/executors/test_base_executor.py | 60 ++++++++++++++++++++++++++++++++---
 2 files changed, 87 insertions(+), 9 deletions(-)

diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index f7ad45a..1d993bb 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -17,7 +17,7 @@
 """Base executor - this is the base class for all the implemented executors."""
 import sys
 from collections import OrderedDict
-from typing import Any, Dict, List, Optional, Set, Tuple
+from typing import Any, Counter, Dict, List, Optional, Set, Tuple
 
 from airflow.configuration import conf
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
@@ -29,6 +29,8 @@ PARALLELISM: int = conf.getint('core', 'PARALLELISM')
 
 NOT_STARTED_MESSAGE = "The executor should be started first!"
 
+QUEUEING_ATTEMPTS = 5
+
 # Command to execute - list of strings
 # the first element is always "airflow".
 # It should be result of TaskInstance.generate_command method.q
@@ -63,6 +65,7 @@ class BaseExecutor(LoggingMixin):
         self.queued_tasks: OrderedDict[TaskInstanceKey, QueuedTaskInstanceType] = OrderedDict()
         self.running: Set[TaskInstanceKey] = set()
         self.event_buffer: Dict[TaskInstanceKey, EventBufferValueType] = {}
+        self.attempts: Counter[TaskInstanceKey] = Counter()
 
     def __repr__(self):
         return f"{self.__class__.__name__}(parallelism={self.parallelism})"
@@ -78,7 +81,7 @@ class BaseExecutor(LoggingMixin):
         queue: Optional[str] = None,
     ):
         """Queues command to task"""
-        if task_instance.key not in self.queued_tasks and task_instance.key not in self.running:
+        if task_instance.key not in self.queued_tasks:
             self.log.info("Adding to queue: %s", command)
             self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
         else:
@@ -183,9 +186,32 @@ class BaseExecutor(LoggingMixin):
 
         for _ in range(min((open_slots, len(self.queued_tasks)))):
             key, (command, _, queue, ti) = sorted_queue.pop(0)
-            self.queued_tasks.pop(key)
-            self.running.add(key)
-            self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)
+
+            # If a task makes it here but is still understood by the executor
+            # to be running, it generally means that the task has been killed
+            # externally and not yet been marked as failed.
+            #
+            # However, when a task is deferred, there is also a possibility of
+            # a race condition where a task might be scheduled again during
+            # trigger processing, even before we are able to register that the
+            # deferred task has completed. In this case and for this reason,
+            # we make a small number of attempts to see if the task has been
+            # removed from the running set in the meantime.
+            if key in self.running:
+                attempt = self.attempts[key]
+                if attempt < QUEUEING_ATTEMPTS - 1:
+                    self.attempts[key] = attempt + 1
+                    self.log.info("task %s is still running", key)
+                    continue
+
+                # We give up and remove the task from the queue.
+                self.log.error("could not queue task %s (still running after %d attempts)", key, attempt)
+                del self.attempts[key]
+                del self.queued_tasks[key]
+            else:
+                del self.queued_tasks[key]
+                self.running.add(key)
+                self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)
 
     def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
         """
diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py
index 49d6c01..40bf8eb 100644
--- a/tests/executors/test_base_executor.py
+++ b/tests/executors/test_base_executor.py
@@ -18,7 +18,9 @@
 from datetime import timedelta
 from unittest import mock
 
-from airflow.executors.base_executor import BaseExecutor
+from pytest import mark
+
+from airflow.executors.base_executor import QUEUEING_ATTEMPTS, BaseExecutor
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.taskinstance import TaskInstanceKey
 from airflow.utils import timezone
@@ -57,7 +59,7 @@ def test_gauge_executor_metrics(mock_stats_gauge, mock_trigger_tasks, mock_sync)
     mock_stats_gauge.assert_has_calls(calls)
 
 
-def test_try_adopt_task_instances(dag_maker):
+def setup_dagrun(dag_maker):
     date = timezone.utcnow()
     start_date = date - timedelta(days=2)
 
@@ -66,8 +68,58 @@ def test_try_adopt_task_instances(dag_maker):
         BaseOperator(task_id="task_2", start_date=start_date)
         BaseOperator(task_id="task_3", start_date=start_date)
 
-    dagrun = dag_maker.create_dagrun(execution_date=date)
-    tis = dagrun.task_instances
+    return dag_maker.create_dagrun(execution_date=date)
 
+
+def test_try_adopt_task_instances(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
     assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
     assert BaseExecutor().try_adopt_task_instances(tis) == tis
+
+
+def enqueue_tasks(executor, dagrun):
+    for task_instance in dagrun.task_instances:
+        executor.queue_command(task_instance, ["airflow"])
+
+
+def setup_trigger_tasks(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    executor = BaseExecutor()
+    executor.execute_async = mock.Mock()
+    enqueue_tasks(executor, dagrun)
+    return executor, dagrun
+
+
+@mark.parametrize("open_slots", [1, 2, 3])
+def test_trigger_queued_tasks(dag_maker, open_slots):
+    executor, _ = setup_trigger_tasks(dag_maker)
+    executor.trigger_tasks(open_slots)
+    assert len(executor.execute_async.mock_calls) == open_slots
+
+
+@mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS + 2))
+def test_trigger_running_tasks(dag_maker, change_state_attempt):
+    executor, dagrun = setup_trigger_tasks(dag_maker)
+    open_slots = 100
+    executor.trigger_tasks(open_slots)
+    expected_calls = len(dagrun.task_instances)  # initially `execute_async` called for each task
+    assert len(executor.execute_async.mock_calls) == expected_calls
+
+    # All the tasks are now "running", so while we enqueue them again here,
+    # they won't be executed again until the executor has been notified of a state change.
+    enqueue_tasks(executor, dagrun)
+
+    for attempt in range(QUEUEING_ATTEMPTS + 2):
+        # On the configured attempt, we notify the executor that the task has succeeded.
+        if attempt == change_state_attempt:
+            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
+            # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an additional "execute" call
+            if attempt < QUEUEING_ATTEMPTS:
+                expected_calls += 1
+        executor.trigger_tasks(open_slots)
+        assert len(executor.execute_async.mock_calls) == expected_calls
+    if change_state_attempt < QUEUEING_ATTEMPTS:
+        assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances) + 1
+    else:
+        assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances)

[airflow] 13/19: Disable default_pool delete on web ui (#21658)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9f177e04a7f70f1630dda334a4a22804540d5caa
Author: Chenglong Yan <al...@gmail.com>
AuthorDate: Wed Mar 16 16:31:12 2022 +0800

    Disable default_pool delete on web ui (#21658)
    
    (cherry picked from commit df6058c862a910a99fbb86858502d9d93fdbe1e5)
---
 airflow/api/common/experimental/pool.py |  2 +-
 airflow/models/pool.py                  | 19 ++++++++++++++++++-
 airflow/www/views.py                    | 13 ++++++++++++-
 tests/models/test_pool.py               |  6 ++++++
 4 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/airflow/api/common/experimental/pool.py b/airflow/api/common/experimental/pool.py
index fe4f161..b1ca9f0 100644
--- a/airflow/api/common/experimental/pool.py
+++ b/airflow/api/common/experimental/pool.py
@@ -83,7 +83,7 @@ def delete_pool(name, session=None):
         raise AirflowBadRequest("Pool name shouldn't be empty")
 
     if name == Pool.DEFAULT_POOL_NAME:
-        raise AirflowBadRequest("default_pool cannot be deleted")
+        raise AirflowBadRequest(f"{Pool.DEFAULT_POOL_NAME} cannot be deleted")
 
     pool = session.query(Pool).filter_by(pool=name).first()
     if pool is None:
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 8ae88aa..7d092f7 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -86,6 +86,23 @@ class Pool(Base):
 
     @staticmethod
     @provide_session
+    def is_default_pool(id: int, session: Session = NEW_SESSION) -> bool:
+        """
+        Check id if is the default_pool.
+
+        :param id: pool id
+        :param session: SQLAlchemy ORM Session
+        :return: True if id is default_pool, otherwise False
+        """
+        return (
+            session.query(func.count(Pool.id))
+            .filter(Pool.id == id, Pool.pool == Pool.DEFAULT_POOL_NAME)
+            .scalar()
+            > 0
+        )
+
+    @staticmethod
+    @provide_session
     def create_or_update_pool(name: str, slots: int, description: str, session: Session = NEW_SESSION):
         """Create a pool with given parameters or update it if it already exists."""
         if not name:
@@ -107,7 +124,7 @@ class Pool(Base):
     def delete_pool(name: str, session: Session = NEW_SESSION):
         """Delete pool by a given name."""
         if name == Pool.DEFAULT_POOL_NAME:
-            raise AirflowException("default_pool cannot be deleted")
+            raise AirflowException(f"{Pool.DEFAULT_POOL_NAME} cannot be deleted")
 
         pool = session.query(Pool).filter_by(pool=name).first()
         if pool is None:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 9fc61b5..d6bc40b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3727,13 +3727,24 @@ class PoolModelView(AirflowModelView):
     def action_muldelete(self, items):
         """Multiple delete."""
         if any(item.pool == models.Pool.DEFAULT_POOL_NAME for item in items):
-            flash("default_pool cannot be deleted", 'error')
+            flash(f"{models.Pool.DEFAULT_POOL_NAME} cannot be deleted", 'error')
             self.update_redirect()
             return redirect(self.get_redirect())
         self.datamodel.delete_all(items)
         self.update_redirect()
         return redirect(self.get_redirect())
 
+    @expose("/delete/<pk>", methods=["GET", "POST"])
+    @has_access
+    def delete(self, pk):
+        """Single delete."""
+        if models.Pool.is_default_pool(pk):
+            flash(f"{models.Pool.DEFAULT_POOL_NAME} cannot be deleted", 'error')
+            self.update_redirect()
+            return redirect(self.get_redirect())
+
+        return super().delete(pk)
+
     def pool_link(self):
         """Pool link rendering."""
         pool_id = self.get('pool')
diff --git a/tests/models/test_pool.py b/tests/models/test_pool.py
index 95e585e..1c5bbe1 100644
--- a/tests/models/test_pool.py
+++ b/tests/models/test_pool.py
@@ -220,3 +220,9 @@ class TestPool:
     def test_delete_default_pool_not_allowed(self):
         with pytest.raises(AirflowException, match="^default_pool cannot be deleted$"):
             Pool.delete_pool(Pool.DEFAULT_POOL_NAME)
+
+    def test_is_default_pool(self):
+        pool = Pool.create_or_update_pool(name="not_default_pool", slots=1, description="test")
+        default_pool = Pool.get_default_pool()
+        assert not Pool.is_default_pool(id=pool.id)
+        assert Pool.is_default_pool(str(default_pool.id))

[airflow] 15/19: Filter out default configs when overrides exist. (#21539)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit b7944156987a57d24f0a221df4ed28f515f14074
Author: Xiao Yu <me...@xyu.io>
AuthorDate: Tue Mar 15 18:06:50 2022 +0000

    Filter out default configs when overrides exist. (#21539)
    
    * Filter out default configs when overrides exist.
    
    When sending configs to Airflow workers we materialize a temp config file. In #18772 a feature was added so that `_cmd` generated secrets are not written to the files in some cases instead favoring maintaining the raw `_cmd` settings. Unfortunately during materializing of the configs via `as_dict()` Airflow defaults are generated and materialized as well including defaults for the non `_cmd` versions of some settings. And due to Airflow setting precedence stating bare versions of sett [...]
    https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html
    
    This change checks `_cmd`, env, and secrets when materializing configs via `as_dict()` so that if the bare versions of the values is exactly the same as Airflow defaults and we have "hidden" / special versions of these configs that are trying to be set we remove the bare versions so that the correct version can be used.
    
    Fixes: #20092
    Related to: #18772 #4050
    
    (cherry picked from commit e07bc63ec0e5b679c87de8e8d4cdff1cf4671146)
---
 airflow/configuration.py                 | 58 ++++++++++++++++++++++++++++++++
 docs/apache-airflow/howto/set-config.rst |  4 +++
 tests/core/test_configuration.py         | 46 +++++++++++++++++++++++++
 3 files changed, 108 insertions(+)

diff --git a/airflow/configuration.py b/airflow/configuration.py
index e120b26..88587f3 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -581,6 +581,15 @@ class AirflowConfigParser(ConfigParser):
         """
         Returns the current configuration as an OrderedDict of OrderedDicts.
 
+        When materializing current configuration Airflow defaults are
+        materialized along with user set configs. If any of the `include_*`
+        options are False then the result of calling command or secret key
+        configs do not override Airflow defaults and instead are passed through.
+        In order to then avoid Airflow defaults from overwriting user set
+        command or secret key configs we filter out bare sensitive_config_values
+        that are set to Airflow defaults when command or secret key configs
+        produce different values.
+
         :param display_source: If False, the option value is returned. If True,
             a tuple of (option_value, source) is returned. Source is either
             'airflow.cfg', 'default', 'env var', or 'cmd'.
@@ -618,14 +627,21 @@ class AirflowConfigParser(ConfigParser):
         # add env vars and overwrite because they have priority
         if include_env:
             self._include_envs(config_sources, display_sensitive, display_source, raw)
+        else:
+            self._filter_by_source(config_sources, display_source, self._get_env_var_option)
 
         # add bash commands
         if include_cmds:
             self._include_commands(config_sources, display_sensitive, display_source, raw)
+        else:
+            self._filter_by_source(config_sources, display_source, self._get_cmd_option)
 
         # add config from secret backends
         if include_secret:
             self._include_secrets(config_sources, display_sensitive, display_source, raw)
+        else:
+            self._filter_by_source(config_sources, display_source, self._get_secret_option)
+
         return config_sources
 
     def _include_secrets(self, config_sources, display_sensitive, display_source, raw):
@@ -683,6 +699,48 @@ class AirflowConfigParser(ConfigParser):
                 key = key.lower()
             config_sources.setdefault(section, OrderedDict()).update({key: opt})
 
+    def _filter_by_source(self, config_sources, display_source, getter_func):
+        """
+        Deletes default configs from current configuration (an OrderedDict of
+        OrderedDicts) if it would conflict with special sensitive_config_values.
+
+        This is necessary because bare configs take precedence over the command
+        or secret key equivalents so if the current running config is
+        materialized with Airflow defaults they in turn override user set
+        command or secret key configs.
+
+        :param config_sources: The current configuration to operate on
+        :param display_source: If False, configuration options contain raw
+            values. If True, options are a tuple of (option_value, source).
+            Source is either 'airflow.cfg', 'default', 'env var', or 'cmd'.
+        :param getter_func: A callback function that gets the user configured
+            override value for a particular sensitive_config_values config.
+        :rtype: None
+        :return: None, the given config_sources is filtered if necessary,
+            otherwise untouched.
+        """
+        for (section, key) in self.sensitive_config_values:
+            # Don't bother if we don't have section / key
+            if section not in config_sources or key not in config_sources[section]:
+                continue
+            # Check that there is something to override defaults
+            try:
+                getter_opt = getter_func(section, key)
+            except ValueError:
+                continue
+            if not getter_opt:
+                continue
+            # Check to see that there is a default value
+            if not self.airflow_defaults.has_option(section, key):
+                continue
+            # Check to see if bare setting is the same as defaults
+            if display_source:
+                opt, source = config_sources[section][key]
+            else:
+                opt = config_sources[section][key]
+            if opt == self.airflow_defaults.get(section, key):
+                del config_sources[section][key]
+
     @staticmethod
     def _replace_config_with_display_sources(config_sources, configs, display_source, raw):
         for (source_name, config) in configs:
diff --git a/docs/apache-airflow/howto/set-config.rst b/docs/apache-airflow/howto/set-config.rst
index 03cf0de..ad25c1d 100644
--- a/docs/apache-airflow/howto/set-config.rst
+++ b/docs/apache-airflow/howto/set-config.rst
@@ -100,6 +100,10 @@ The universal order of precedence for all configuration options is as follows:
 #. secret key in ``airflow.cfg``
 #. Airflow's built in defaults
 
+.. note::
+    For Airflow versions >= 2.2.1, < 2.3.0 Airflow's built in defaults took precedence
+    over command and secret key in ``airflow.cfg`` in some circumstances.
+
 You can check the current configuration with the ``airflow config list`` command.
 
 If you only want to see the value for one option, you can use ``airflow config get-value`` command as in
diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py
index da1d736..a064c48 100644
--- a/tests/core/test_configuration.py
+++ b/tests/core/test_configuration.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
 import io
 import os
 import re
@@ -720,3 +721,48 @@ notacommand = OK
             "CRITICAL, FATAL, ERROR, WARN, WARNING, INFO, DEBUG."
         )
         assert message == exception
+
+    def test_as_dict_works_without_sensitive_cmds(self):
+        conf_materialize_cmds = conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
+        conf_maintain_cmds = conf.as_dict(display_sensitive=True, raw=True, include_cmds=False)
+
+        assert 'sql_alchemy_conn' in conf_materialize_cmds['core']
+        assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['core']
+
+        assert 'sql_alchemy_conn' in conf_maintain_cmds['core']
+        assert 'sql_alchemy_conn_cmd' not in conf_maintain_cmds['core']
+
+        assert (
+            conf_materialize_cmds['core']['sql_alchemy_conn']
+            == conf_maintain_cmds['core']['sql_alchemy_conn']
+        )
+
+    def test_as_dict_respects_sensitive_cmds(self):
+        conf_conn = conf['core']['sql_alchemy_conn']
+        test_conf = copy.deepcopy(conf)
+        test_conf.read_string(
+            textwrap.dedent(
+                """
+                [core]
+                sql_alchemy_conn_cmd = echo -n my-super-secret-conn
+                """
+            )
+        )
+
+        conf_materialize_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=True)
+        conf_maintain_cmds = test_conf.as_dict(display_sensitive=True, raw=True, include_cmds=False)
+
+        assert 'sql_alchemy_conn' in conf_materialize_cmds['core']
+        assert 'sql_alchemy_conn_cmd' not in conf_materialize_cmds['core']
+
+        if conf_conn == test_conf.airflow_defaults['core']['sql_alchemy_conn']:
+            assert conf_materialize_cmds['core']['sql_alchemy_conn'] == 'my-super-secret-conn'
+
+        assert 'sql_alchemy_conn_cmd' in conf_maintain_cmds['core']
+        assert conf_maintain_cmds['core']['sql_alchemy_conn_cmd'] == 'echo -n my-super-secret-conn'
+
+        if conf_conn == test_conf.airflow_defaults['core']['sql_alchemy_conn']:
+            assert 'sql_alchemy_conn' not in conf_maintain_cmds['core']
+        else:
+            assert 'sql_alchemy_conn' in conf_maintain_cmds['core']
+            assert conf_maintain_cmds['core']['sql_alchemy_conn'] == conf_conn

[airflow] 12/19: Fix postgres part of pipeline example of tutorial (#21586)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 445ae53db7a086c2153cdcbf5447baa7191d05af
Author: KevinYanesG <75...@users.noreply.github.com>
AuthorDate: Tue Feb 15 19:26:30 2022 +0100

    Fix postgres part of pipeline example of tutorial (#21586)
    
    (cherry picked from commit 40028f3ea3e78a9cf0db9de6b16fa67fa730dd7a)
---
 docs/apache-airflow/tutorial.rst | 67 ++++++++++++++++++++++++----------------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst
index 085be42..7a2245f 100644
--- a/docs/apache-airflow/tutorial.rst
+++ b/docs/apache-airflow/tutorial.rst
@@ -381,11 +381,30 @@ We need to have docker and postgres installed.
 We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_
 Follow the instructions properly to set up Airflow.
 
-Create a Employee table in postgres using this:
+You can use the postgres_default connection:
+
+- Conn id: postgres_default
+- Conn Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+
+
+After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection. For
+
+
+Open up a postgres shell:
+
+.. code-block:: bash
+
+  ./airflow.sh airflow db shell
+
+Create the Employees table with:
 
 .. code-block:: sql
 
-  CREATE TABLE "Employees"
+  CREATE TABLE EMPLOYEES
   (
       "Serial Number" NUMERIC PRIMARY KEY,
       "Company Name" TEXT,
@@ -394,7 +413,11 @@ Create a Employee table in postgres using this:
       "Leave" INTEGER
   );
 
-  CREATE TABLE "Employees_temp"
+Afterwards, create the Employees_temp table:
+
+.. code-block:: sql
+
+  CREATE TABLE EMPLOYEES_TEMP
   (
       "Serial Number" NUMERIC PRIMARY KEY,
       "Company Name" TEXT,
@@ -403,17 +426,9 @@ Create a Employee table in postgres using this:
       "Leave" INTEGER
   );
 
-We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field:
+We are now ready write the DAG.
 
-- Conn id: LOCAL
-- Conn Type: postgres
-- Host: postgres
-- Schema: <DATABASE_NAME>
-- Login: airflow
-- Password: airflow
-- Port: 5432
 
-After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG.
 
 Let's break this down into 2 steps: get data & merge data:
 
@@ -436,12 +451,12 @@ Let's break this down into 2 steps: get data & merge data:
       with open(data_path, "w") as file:
           file.write(response.text)
 
-      postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+      postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
       conn = postgres_hook.get_conn()
       cur = conn.cursor()
       with open(data_path, "r") as file:
           cur.copy_expert(
-              "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+              "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
               file,
           )
       conn.commit()
@@ -457,16 +472,16 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i
   @task
   def merge_data():
       query = """
-          DELETE FROM "Employees" e
-          USING "Employees_temp" et
+          DELETE FROM EMPLOYEES e
+          USING EMPLOYEES_TEMP et
           WHERE e."Serial Number" = et."Serial Number";
 
-          INSERT INTO "Employees"
+          INSERT INTO EMPLOYEES
           SELECT *
-          FROM "Employees_temp";
+          FROM EMPLOYEES_TEMP;
       """
       try:
-          postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+          postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           cur.execute(query)
@@ -509,12 +524,12 @@ Lets look at our DAG:
           with open(data_path, "w") as file:
               file.write(response.text)
 
-          postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+          postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
           conn = postgres_hook.get_conn()
           cur = conn.cursor()
           with open(data_path, "r") as file:
               cur.copy_expert(
-                  "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
+                  "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
                   file,
               )
           conn.commit()
@@ -522,16 +537,16 @@ Lets look at our DAG:
       @task
       def merge_data():
           query = """
-                  DELETE FROM "Employees" e
-                  USING "Employees_temp" et
+                  DELETE FROM EMPLOYEES e
+                  USING EMPLOYEES_TEMP et
                   WHERE e."Serial Number" = et."Serial Number";
 
-                  INSERT INTO "Employees"
+                  INSERT INTO EMPLOYEES
                   SELECT *
-                  FROM "Employees_temp";
+                  FROM EMPLOYEES_TEMP;
                   """
           try:
-              postgres_hook = PostgresHook(postgres_conn_id="LOCAL")
+              postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
               conn = postgres_hook.get_conn()
               cur = conn.cursor()
               cur.execute(query)

[airflow] 01/19: DB upgrade is required when updating Airflow (#22061)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 925082e2235cd03f59b28ca591c204fa42b5b19b
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Mon Mar 7 15:05:58 2022 -0700

    DB upgrade is required when updating Airflow (#22061)
    
    Just strengthen the language that it is "required", not "recommended" to
    run `airflow db upgrade` when upgrading Airflow versions.
    
    (cherry picked from commit e7fed6bb3d7c8def86b47204176cebbfc6c401ff)
---
 docs/apache-airflow/installation/upgrading.rst | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/docs/apache-airflow/installation/upgrading.rst b/docs/apache-airflow/installation/upgrading.rst
index 929c604..1f7687e 100644
--- a/docs/apache-airflow/installation/upgrading.rst
+++ b/docs/apache-airflow/installation/upgrading.rst
@@ -21,9 +21,9 @@ Upgrading Airflow to a newer version
 Why you need to upgrade
 =======================
 
-Newer Airflow versions can contain Database migrations so it is recommended that you run
-``airflow db upgrade`` to Upgrade your Database with the schema changes in the Airflow version
-you are upgrading to.
+Newer Airflow versions can contain database migrations so you must run ``airflow db upgrade``
+to upgrade your database with the schema changes in the Airflow version you are upgrading to.
+Don't worry, it's safe to run even if there are no migrations to perform.
 
 When you need to upgrade
 ========================

[airflow] 16/19: Fix Resources __eq__ check (#21442)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a80213c6758ddd222b9d9b4bc1e56c92040eeda7
Author: Ping Zhang <pi...@umich.edu>
AuthorDate: Thu Feb 10 04:40:51 2022 -0800

    Fix Resources __eq__ check (#21442)
    
    (cherry picked from commit 6b308446eae2f83bf379f976c7d7801aa53370a3)
---
 airflow/utils/operator_resources.py    |  4 ++++
 tests/utils/test_operator_resources.py | 35 ++++++++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git a/airflow/utils/operator_resources.py b/airflow/utils/operator_resources.py
index 8781021..010bf33 100644
--- a/airflow/utils/operator_resources.py
+++ b/airflow/utils/operator_resources.py
@@ -53,6 +53,8 @@ class Resource:
         self._qty = qty
 
     def __eq__(self, other):
+        if not isinstance(other, self.__class__):
+            return NotImplemented
         return self.__dict__ == other.__dict__
 
     def __repr__(self):
@@ -133,6 +135,8 @@ class Resources:
         self.gpus = GpuResource(gpus)
 
     def __eq__(self, other):
+        if not isinstance(other, self.__class__):
+            return NotImplemented
         return self.__dict__ == other.__dict__
 
     def __repr__(self):
diff --git a/tests/utils/test_operator_resources.py b/tests/utils/test_operator_resources.py
new file mode 100644
index 0000000..fb15580
--- /dev/null
+++ b/tests/utils/test_operator_resources.py
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow.utils.operator_resources import Resources
+
+
+class TestResources(unittest.TestCase):
+    def test_resource_eq(self):
+        r = Resources(cpus=0.1, ram=2048)
+        assert r not in [{}, [], None]
+        assert r == r
+
+        r2 = Resources(cpus=0.1, ram=2048)
+        assert r == r2
+        assert r2 == r
+
+        r3 = Resources(cpus=0.2, ram=2048)
+        assert r != r3

[airflow] 08/19: Fix filesystem sensor for directories (#21729)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9f0c8149528c12e70a0b922ee3c5ff841ae75c3e
Author: Mikhail Ilchenko <pr...@gmail.com>
AuthorDate: Tue Mar 1 12:18:34 2022 +0300

    Fix filesystem sensor for directories (#21729)
    
    Fix walking through wildcarded directory in `FileSensor.poke` method
    
    (cherry picked from commit 6b0ca646ec849af91fe8a10d3d5656cafa3ed4bd)
---
 airflow/sensors/filesystem.py    |  2 +-
 tests/sensors/test_filesystem.py | 36 ++++++++++++++++++++++++++++++++++--
 2 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/airflow/sensors/filesystem.py b/airflow/sensors/filesystem.py
index 130be5c..1a4711c 100644
--- a/airflow/sensors/filesystem.py
+++ b/airflow/sensors/filesystem.py
@@ -65,7 +65,7 @@ class FileSensor(BaseSensorOperator):
                 self.log.info('Found File %s last modified: %s', str(path), str(mod_time))
                 return True
 
-            for _, _, files in os.walk(full_path):
+            for _, _, files in os.walk(path):
                 if len(files) > 0:
                     return True
         return False
diff --git a/tests/sensors/test_filesystem.py b/tests/sensors/test_filesystem.py
index 4d23331..e696f1e 100644
--- a/tests/sensors/test_filesystem.py
+++ b/tests/sensors/test_filesystem.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import os.path
+import os
 import shutil
 import tempfile
 import unittest
@@ -131,6 +131,38 @@ class TestFileSensor(unittest.TestCase):
             task._hook = self.hook
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
+    def test_wildcard_empty_directory(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir):
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # No files in dir
+                with pytest.raises(AirflowSensorTimeout):
+                    task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_wildcard_directory_with_files(self):
+        with tempfile.TemporaryDirectory() as temp_dir:
+            with tempfile.TemporaryDirectory(suffix='subdir', dir=temp_dir) as subdir:
+                task = FileSensor(
+                    task_id='test',
+                    filepath=os.path.join(temp_dir, '*dir'),
+                    fs_conn_id='fs_default',
+                    dag=self.dag,
+                    timeout=0,
+                )
+                task._hook = self.hook
+
+                # `touch` the file in subdir
+                open(os.path.join(subdir, 'file'), 'a').close()
+                task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
     def test_wildcared_directory(self):
         temp_dir = tempfile.mkdtemp()
         subdir = tempfile.mkdtemp(dir=temp_dir)
@@ -146,7 +178,7 @@ class TestFileSensor(unittest.TestCase):
         task._hook = self.hook
 
         try:
-            # `touch` the dir
+            # `touch` the file in subdir
             open(subdir + "/file", "a").close()
             task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         finally:

[airflow] 04/19: Fix the triggerer capacity test (#21760)

Posted by ep...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a5dbef9b38b402525005899521b3bfc2b76ed4bb
Author: Mark Norman Francis <no...@201created.com>
AuthorDate: Wed Feb 23 12:38:27 2022 +0000

    Fix the triggerer capacity test (#21760)
    
    Commit 9076b67 changed the triggerer logic to use int not string.
    
    (cherry picked from commit e1fe30c70d0fe9c033db9daf9d4420f7fa815b2d)
---
 tests/cli/commands/test_triggerer_command.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/cli/commands/test_triggerer_command.py b/tests/cli/commands/test_triggerer_command.py
index 6b20e6f..26a46bd 100644
--- a/tests/cli/commands/test_triggerer_command.py
+++ b/tests/cli/commands/test_triggerer_command.py
@@ -43,4 +43,4 @@ class TestTriggererCommand(unittest.TestCase):
         """Ensure that the capacity argument is passed correctly"""
         args = self.parser.parse_args(['triggerer', '--capacity=42'])
         triggerer_command.triggerer(args)
-        mock_scheduler_job.assert_called_once_with(capacity="42")
+        mock_scheduler_job.assert_called_once_with(capacity=42)